mirror of
https://github.com/taoensso/telemere.git
synced 2025-12-21 11:11:11 +00:00
[new] Add TCP, UDP socket handlers
This commit is contained in:
parent
8cc0a6dbc3
commit
49b6da2cf2
2 changed files with 233 additions and 0 deletions
106
src/taoensso/telemere/sockets.clj
Normal file
106
src/taoensso/telemere/sockets.clj
Normal file
|
|
@ -0,0 +1,106 @@
|
||||||
|
(ns taoensso.telemere.sockets
|
||||||
|
"Basic TCP/UDP socket handlers."
|
||||||
|
(:require
|
||||||
|
[taoensso.encore :as enc :refer [have have?]]
|
||||||
|
[taoensso.telemere.utils :as utils])
|
||||||
|
|
||||||
|
(:import
|
||||||
|
[java.net Socket InetAddress]
|
||||||
|
[java.net DatagramSocket DatagramPacket InetSocketAddress]
|
||||||
|
[java.io PrintWriter]))
|
||||||
|
|
||||||
|
(comment
|
||||||
|
(require '[taoensso.telemere :as tel])
|
||||||
|
(remove-ns 'taoensso.telemere.sockets)
|
||||||
|
(:api (enc/interns-overview)))
|
||||||
|
|
||||||
|
;;;; Implementation
|
||||||
|
|
||||||
|
;;;; Handlers
|
||||||
|
|
||||||
|
(defn handler:tcp-socket
|
||||||
|
"Experimental, subject to change. Feedback welcome!
|
||||||
|
|
||||||
|
Returns a (fn handler [signal]) that:
|
||||||
|
- Takes a Telemere signal.
|
||||||
|
- Sends formatted signal string to specified TCP socket.
|
||||||
|
|
||||||
|
Options:
|
||||||
|
`host` - Destination TCP socket hostname string
|
||||||
|
`port` - Destination TCP socket port int
|
||||||
|
|
||||||
|
`:socket-opts` - {:keys [ssl? connect-timeout-msecs]}
|
||||||
|
`:format-signal-fn`- (fn [signal]) => output, see `help:signal-formatters`.
|
||||||
|
|
||||||
|
Limitations:
|
||||||
|
- Failed writes will be retried only once.
|
||||||
|
- Writes lock on a single underlying socket, so IO won't benefit from adding
|
||||||
|
extra handler threads. Let me know if there's demand for socket pooling."
|
||||||
|
|
||||||
|
([host port] (handler:tcp-socket host port nil))
|
||||||
|
([host port
|
||||||
|
{:keys [socket-opts format-signal-fn]
|
||||||
|
:or {format-signal-fn (utils/format-signal->str-fn)}}]
|
||||||
|
|
||||||
|
(let [sw (utils/tcp-socket-writer host port socket-opts)]
|
||||||
|
(defn a-handler:tcp-socket
|
||||||
|
([] (sw)) ; Shut down
|
||||||
|
([signal]
|
||||||
|
(when-let [output (format-signal-fn signal)]
|
||||||
|
(sw (str output utils/newline))))))))
|
||||||
|
|
||||||
|
(defn handler:udp-socket
|
||||||
|
"Experimental, subject to change. Feedback welcome!
|
||||||
|
|
||||||
|
Returns a (fn handler [signal]) that:
|
||||||
|
- Takes a Telemere signal.
|
||||||
|
- Sends formatted signal string to specified UDP socket.
|
||||||
|
|
||||||
|
Options:
|
||||||
|
`host` - Destination UDP socket hostname string
|
||||||
|
`port` - Destination UDP socket port int
|
||||||
|
|
||||||
|
`:format-signal-fn` - (fn [signal]) => output, see `help:signal-formatters`.
|
||||||
|
`:max-packet-bytes` - Max packet size (in bytes) before truncating output (default 512)
|
||||||
|
`:truncation-warning-fn` - Optional (fn [{:keys [max actual signal]}]) to call whenever
|
||||||
|
output is truncated. Should be appropriately rate-limited!
|
||||||
|
|
||||||
|
Limitations:
|
||||||
|
- Due to UDP limitations, truncates output to `max-packet-bytes`!
|
||||||
|
- Failed writes will be retried only once.
|
||||||
|
- Writes lock on a single underlying socket, so IO won't benefit from adding
|
||||||
|
extra handler threads. Let me know if there's demand for socket pooling.
|
||||||
|
- No DTLS (Datagram Transport Layer Security) support,
|
||||||
|
please let me know if there's demand."
|
||||||
|
|
||||||
|
([host port] (handler:udp-socket host port nil))
|
||||||
|
([host port
|
||||||
|
{:keys [max-packet-bytes truncation-warning-fn format-signal-fn]
|
||||||
|
:or
|
||||||
|
{max-packet-bytes 512
|
||||||
|
format-signal-fn (utils/format-signal->str-fn)}}]
|
||||||
|
|
||||||
|
(let [max-packet-bytes (int max-packet-bytes)
|
||||||
|
socket (DatagramSocket.) ; No need to change socket once created
|
||||||
|
lock (Object.)]
|
||||||
|
|
||||||
|
(.connect socket (InetSocketAddress. (str host) (int port)))
|
||||||
|
|
||||||
|
(defn a-handler:udp-socket
|
||||||
|
([] (.close socket)) ; Shut down
|
||||||
|
([signal]
|
||||||
|
(when-let [output (format-signal-fn signal)]
|
||||||
|
(let [ba (enc/str->utf8-ba (str output utils/newline))
|
||||||
|
ba-len (alength ba)
|
||||||
|
packet (DatagramPacket. ba (min ba-len max-packet-bytes))]
|
||||||
|
|
||||||
|
(when (and truncation-warning-fn (> ba-len max-packet-bytes))
|
||||||
|
;; Fn should be appropriately rate-limited
|
||||||
|
(truncation-warning-fn {:max max-packet-bytes, :actual ba-len, :signal signal}))
|
||||||
|
|
||||||
|
(locking lock
|
||||||
|
(try
|
||||||
|
(.send (DatagramSocket.) packet)
|
||||||
|
(catch Exception _ ; Retry once
|
||||||
|
(Thread/sleep 250)
|
||||||
|
(.send (DatagramSocket.) packet)))))))))))
|
||||||
|
|
@ -261,6 +261,133 @@
|
||||||
|
|
||||||
(comment (def fw1 (file-writer "test.txt" true)) (fw1 "x") (fw1))
|
(comment (def fw1 (file-writer "test.txt" true)) (fw1 "x") (fw1))
|
||||||
|
|
||||||
|
;;;; Sockets
|
||||||
|
|
||||||
|
#?(:clj
|
||||||
|
(defn- default-socket-fn
|
||||||
|
"Returns conected `java.net.Socket`, or throws."
|
||||||
|
^java.net.Socket [host port connect-timeout-msecs]
|
||||||
|
(let [addr (java.net.InetSocketAddress. ^String host (int port))
|
||||||
|
socket (java.net.Socket.)]
|
||||||
|
|
||||||
|
(if connect-timeout-msecs
|
||||||
|
(.connect socket addr (int connect-timeout-msecs))
|
||||||
|
(.connect socket addr))
|
||||||
|
|
||||||
|
socket)))
|
||||||
|
|
||||||
|
#?(:clj
|
||||||
|
(let [factory_ (delay (javax.net.ssl.SSLSocketFactory/getDefault))]
|
||||||
|
(defn- default-ssl-socket-fn
|
||||||
|
"Returns connected SSL `java.net.Socket`, or throws."
|
||||||
|
^java.net.Socket [^java.net.Socket socket ^String host port]
|
||||||
|
(.createSocket ^javax.net.ssl.SSLSocketFactory @factory_
|
||||||
|
socket host (int port) true))))
|
||||||
|
|
||||||
|
#?(:clj
|
||||||
|
(defn tcp-socket-writer
|
||||||
|
"Experimental, subject to change. Feedback welcome!
|
||||||
|
|
||||||
|
Connects to specified TCP socket and returns a stateful fn of 2 arities:
|
||||||
|
[content] => Writes given content to socket, or no-ops if closed.
|
||||||
|
[] => Closes the writer.
|
||||||
|
|
||||||
|
Useful for basic handlers that write to a TCP socket, etc.
|
||||||
|
|
||||||
|
Options:
|
||||||
|
`:ssl?` - Use SSL/TLS?
|
||||||
|
`:connect-timeout-msecs` - Connection timeout (default 3000 msecs)
|
||||||
|
`:socket-fn` - (fn [host port timeout]) => `java.net.Socket`
|
||||||
|
`:ssl-socket-fn` - (fn [socket host port]) => `java.net.Socket`
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
- Writer should be manually closed after use (with zero-arity call).
|
||||||
|
- Flushes after every write.
|
||||||
|
- Will retry failed writes once, then drop.
|
||||||
|
- Thread safe, locks on single socket stream.
|
||||||
|
- Advanced users may want a custom implementation using a connection
|
||||||
|
pool and/or more sophisticated retry semantics, etc."
|
||||||
|
|
||||||
|
[host port
|
||||||
|
{:keys
|
||||||
|
[ssl? connect-timeout-msecs,
|
||||||
|
socket-fn ssl-socket-fn] :as opts
|
||||||
|
|
||||||
|
:or
|
||||||
|
{connect-timeout-msecs 3000
|
||||||
|
socket-fn default-socket-fn
|
||||||
|
ssl-socket-fn default-ssl-socket-fn}}]
|
||||||
|
|
||||||
|
(let [new-conn! ; => [<java.net.Socket> <java.io.OutputStream>], or throws
|
||||||
|
(fn []
|
||||||
|
(try
|
||||||
|
(let [^java.net.Socket socket
|
||||||
|
(let [socket (socket-fn host port connect-timeout-msecs)]
|
||||||
|
(if ssl?
|
||||||
|
(ssl-socket-fn socket host port)
|
||||||
|
(do socket)))]
|
||||||
|
|
||||||
|
[socket (.getOutputStream socket)])
|
||||||
|
|
||||||
|
(catch Exception ex
|
||||||
|
(throw (ex-info "Failed to create connection" opts ex)))))
|
||||||
|
|
||||||
|
conn_ (volatile! (new-conn!))
|
||||||
|
open?_ (enc/latom true)
|
||||||
|
|
||||||
|
close!
|
||||||
|
(fn []
|
||||||
|
(when (compare-and-set! open?_ true false)
|
||||||
|
(when-let [[^java.net.Socket socket] (.deref conn_)]
|
||||||
|
(.close socket)
|
||||||
|
(vreset! conn_ nil)
|
||||||
|
true)))
|
||||||
|
|
||||||
|
reset!
|
||||||
|
(fn []
|
||||||
|
(close!)
|
||||||
|
(vreset! conn_ (new-conn!))
|
||||||
|
(reset! open?_ true)
|
||||||
|
true)
|
||||||
|
|
||||||
|
write-ba!
|
||||||
|
(fn [^bytes ba-content]
|
||||||
|
(when-let [[_ ^java.io.OutputStream output] (.deref conn_)]
|
||||||
|
(.write output ba-content)
|
||||||
|
(.flush output)
|
||||||
|
true))
|
||||||
|
|
||||||
|
conn-okay!
|
||||||
|
(let [rl (enc/rate-limiter-once-per 250)]
|
||||||
|
(fn []
|
||||||
|
(or
|
||||||
|
(rl)
|
||||||
|
(when-let [[^java.net.Socket socket] (.deref conn_)]
|
||||||
|
(and
|
||||||
|
(not (.isClosed socket))
|
||||||
|
(do (.isConnected socket))))
|
||||||
|
(throw (java.io.IOException. "Bad connection")))))
|
||||||
|
|
||||||
|
lock (Object.)]
|
||||||
|
|
||||||
|
(fn a-tcp-socket-writer
|
||||||
|
([] (when (open?_) (locking lock (close!))))
|
||||||
|
([content-or-action]
|
||||||
|
(case content-or-action ; Undocumented, for dev/testing
|
||||||
|
:writer/open? (open?_)
|
||||||
|
:writer/reset! (locking lock (reset!))
|
||||||
|
:writer/state {:conn (.deref conn_)}
|
||||||
|
(when (open?_)
|
||||||
|
(let [content content-or-action
|
||||||
|
ba (enc/str->utf8-ba (str content))]
|
||||||
|
(locking lock
|
||||||
|
(try
|
||||||
|
(conn-okay!)
|
||||||
|
(write-ba! ba)
|
||||||
|
(catch Exception _ ; Retry once
|
||||||
|
(reset!)
|
||||||
|
(write-ba! ba))))))))))))
|
||||||
|
|
||||||
;;;; Formatters
|
;;;; Formatters
|
||||||
|
|
||||||
(defn format-nsecs-fn
|
(defn format-nsecs-fn
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue