From 49b6da2cf228aa2f9c446a1f94a190919a2c5f59 Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Mon, 29 Apr 2024 09:11:31 +0200 Subject: [PATCH] [new] Add TCP, UDP socket handlers --- src/taoensso/telemere/sockets.clj | 106 +++++++++++++++++++++++++ src/taoensso/telemere/utils.cljc | 127 ++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 src/taoensso/telemere/sockets.clj diff --git a/src/taoensso/telemere/sockets.clj b/src/taoensso/telemere/sockets.clj new file mode 100644 index 0000000..61b9203 --- /dev/null +++ b/src/taoensso/telemere/sockets.clj @@ -0,0 +1,106 @@ +(ns taoensso.telemere.sockets + "Basic TCP/UDP socket handlers." + (:require + [taoensso.encore :as enc :refer [have have?]] + [taoensso.telemere.utils :as utils]) + + (:import + [java.net Socket InetAddress] + [java.net DatagramSocket DatagramPacket InetSocketAddress] + [java.io PrintWriter])) + +(comment + (require '[taoensso.telemere :as tel]) + (remove-ns 'taoensso.telemere.sockets) + (:api (enc/interns-overview))) + +;;;; Implementation + +;;;; Handlers + +(defn handler:tcp-socket + "Experimental, subject to change. Feedback welcome! + + Returns a (fn handler [signal]) that: + - Takes a Telemere signal. + - Sends formatted signal string to specified TCP socket. + + Options: + `host` - Destination TCP socket hostname string + `port` - Destination TCP socket port int + + `:socket-opts` - {:keys [ssl? connect-timeout-msecs]} + `:format-signal-fn`- (fn [signal]) => output, see `help:signal-formatters`. + + Limitations: + - Failed writes will be retried only once. + - Writes lock on a single underlying socket, so IO won't benefit from adding + extra handler threads. Let me know if there's demand for socket pooling." + + ([host port] (handler:tcp-socket host port nil)) + ([host port + {:keys [socket-opts format-signal-fn] + :or {format-signal-fn (utils/format-signal->str-fn)}}] + + (let [sw (utils/tcp-socket-writer host port socket-opts)] + (defn a-handler:tcp-socket + ([] (sw)) ; Shut down + ([signal] + (when-let [output (format-signal-fn signal)] + (sw (str output utils/newline)))))))) + +(defn handler:udp-socket + "Experimental, subject to change. Feedback welcome! + + Returns a (fn handler [signal]) that: + - Takes a Telemere signal. + - Sends formatted signal string to specified UDP socket. + + Options: + `host` - Destination UDP socket hostname string + `port` - Destination UDP socket port int + + `:format-signal-fn` - (fn [signal]) => output, see `help:signal-formatters`. + `:max-packet-bytes` - Max packet size (in bytes) before truncating output (default 512) + `:truncation-warning-fn` - Optional (fn [{:keys [max actual signal]}]) to call whenever + output is truncated. Should be appropriately rate-limited! + + Limitations: + - Due to UDP limitations, truncates output to `max-packet-bytes`! + - Failed writes will be retried only once. + - Writes lock on a single underlying socket, so IO won't benefit from adding + extra handler threads. Let me know if there's demand for socket pooling. + - No DTLS (Datagram Transport Layer Security) support, + please let me know if there's demand." + + ([host port] (handler:udp-socket host port nil)) + ([host port + {:keys [max-packet-bytes truncation-warning-fn format-signal-fn] + :or + {max-packet-bytes 512 + format-signal-fn (utils/format-signal->str-fn)}}] + + (let [max-packet-bytes (int max-packet-bytes) + socket (DatagramSocket.) ; No need to change socket once created + lock (Object.)] + + (.connect socket (InetSocketAddress. (str host) (int port))) + + (defn a-handler:udp-socket + ([] (.close socket)) ; Shut down + ([signal] + (when-let [output (format-signal-fn signal)] + (let [ba (enc/str->utf8-ba (str output utils/newline)) + ba-len (alength ba) + packet (DatagramPacket. ba (min ba-len max-packet-bytes))] + + (when (and truncation-warning-fn (> ba-len max-packet-bytes)) + ;; Fn should be appropriately rate-limited + (truncation-warning-fn {:max max-packet-bytes, :actual ba-len, :signal signal})) + + (locking lock + (try + (.send (DatagramSocket.) packet) + (catch Exception _ ; Retry once + (Thread/sleep 250) + (.send (DatagramSocket.) packet))))))))))) diff --git a/src/taoensso/telemere/utils.cljc b/src/taoensso/telemere/utils.cljc index 7970826..5711a14 100644 --- a/src/taoensso/telemere/utils.cljc +++ b/src/taoensso/telemere/utils.cljc @@ -261,6 +261,133 @@ (comment (def fw1 (file-writer "test.txt" true)) (fw1 "x") (fw1)) +;;;; Sockets + +#?(:clj + (defn- default-socket-fn + "Returns conected `java.net.Socket`, or throws." + ^java.net.Socket [host port connect-timeout-msecs] + (let [addr (java.net.InetSocketAddress. ^String host (int port)) + socket (java.net.Socket.)] + + (if connect-timeout-msecs + (.connect socket addr (int connect-timeout-msecs)) + (.connect socket addr)) + + socket))) + +#?(:clj + (let [factory_ (delay (javax.net.ssl.SSLSocketFactory/getDefault))] + (defn- default-ssl-socket-fn + "Returns connected SSL `java.net.Socket`, or throws." + ^java.net.Socket [^java.net.Socket socket ^String host port] + (.createSocket ^javax.net.ssl.SSLSocketFactory @factory_ + socket host (int port) true)))) + +#?(:clj + (defn tcp-socket-writer + "Experimental, subject to change. Feedback welcome! + + Connects to specified TCP socket and returns a stateful fn of 2 arities: + [content] => Writes given content to socket, or no-ops if closed. + [] => Closes the writer. + + Useful for basic handlers that write to a TCP socket, etc. + + Options: + `:ssl?` - Use SSL/TLS? + `:connect-timeout-msecs` - Connection timeout (default 3000 msecs) + `:socket-fn` - (fn [host port timeout]) => `java.net.Socket` + `:ssl-socket-fn` - (fn [socket host port]) => `java.net.Socket` + + Notes: + - Writer should be manually closed after use (with zero-arity call). + - Flushes after every write. + - Will retry failed writes once, then drop. + - Thread safe, locks on single socket stream. + - Advanced users may want a custom implementation using a connection + pool and/or more sophisticated retry semantics, etc." + + [host port + {:keys + [ssl? connect-timeout-msecs, + socket-fn ssl-socket-fn] :as opts + + :or + {connect-timeout-msecs 3000 + socket-fn default-socket-fn + ssl-socket-fn default-ssl-socket-fn}}] + + (let [new-conn! ; => [ ], or throws + (fn [] + (try + (let [^java.net.Socket socket + (let [socket (socket-fn host port connect-timeout-msecs)] + (if ssl? + (ssl-socket-fn socket host port) + (do socket)))] + + [socket (.getOutputStream socket)]) + + (catch Exception ex + (throw (ex-info "Failed to create connection" opts ex))))) + + conn_ (volatile! (new-conn!)) + open?_ (enc/latom true) + + close! + (fn [] + (when (compare-and-set! open?_ true false) + (when-let [[^java.net.Socket socket] (.deref conn_)] + (.close socket) + (vreset! conn_ nil) + true))) + + reset! + (fn [] + (close!) + (vreset! conn_ (new-conn!)) + (reset! open?_ true) + true) + + write-ba! + (fn [^bytes ba-content] + (when-let [[_ ^java.io.OutputStream output] (.deref conn_)] + (.write output ba-content) + (.flush output) + true)) + + conn-okay! + (let [rl (enc/rate-limiter-once-per 250)] + (fn [] + (or + (rl) + (when-let [[^java.net.Socket socket] (.deref conn_)] + (and + (not (.isClosed socket)) + (do (.isConnected socket)))) + (throw (java.io.IOException. "Bad connection"))))) + + lock (Object.)] + + (fn a-tcp-socket-writer + ([] (when (open?_) (locking lock (close!)))) + ([content-or-action] + (case content-or-action ; Undocumented, for dev/testing + :writer/open? (open?_) + :writer/reset! (locking lock (reset!)) + :writer/state {:conn (.deref conn_)} + (when (open?_) + (let [content content-or-action + ba (enc/str->utf8-ba (str content))] + (locking lock + (try + (conn-okay!) + (write-ba! ba) + (catch Exception _ ; Retry once + (reset!) + (write-ba! ba)))))))))))) + ;;;; Formatters (defn format-nsecs-fn