[mod] Handler work, tweak util and handler param style

Incl.:

  - Consistently use map opts for all (even mandatory) args.
    Makes future extensions/changes easier.
This commit is contained in:
Peter Taoussanis 2024-05-06 10:12:09 +02:00
parent 0ff8dafaf3
commit 9000af14f3
6 changed files with 88 additions and 86 deletions

View file

@ -10,8 +10,6 @@
(remove-ns 'taoensso.telemere.consoles) (remove-ns 'taoensso.telemere.consoles)
(:api (enc/interns-overview))) (:api (enc/interns-overview)))
;;;; Handlers
#?(:clj #?(:clj
(defn ^:public handler:console (defn ^:public handler:console
"Experimental, subject to change. "Experimental, subject to change.

View file

@ -320,7 +320,7 @@
(let [main-path path (let [main-path path
main-file (utils/as-file main-path) main-file (utils/as-file main-path)
fw (utils/file-writer main-file true) fw (utils/file-writer {:file main-file, :append? true})
>max-file-size? >max-file-size?
(when max-file-size (when max-file-size

View file

@ -11,8 +11,6 @@
(remove-ns 'taoensso.telemere.postal) (remove-ns 'taoensso.telemere.postal)
(:api (enc/interns-overview))) (:api (enc/interns-overview)))
;;;; Implementation
(defn signal-subject-fn (defn signal-subject-fn
"Experimental, subject to change. "Experimental, subject to change.
Returns a (fn format [signal]) that: Returns a (fn format [signal]) that:
@ -43,7 +41,14 @@
(comment ((signal-subject-fn) (tel/with-signal (tel/event! ::ev-id1 #_{:postal/subject "My subject"})))) (comment ((signal-subject-fn) (tel/with-signal (tel/event! ::ev-id1 #_{:postal/subject "My subject"}))))
;;;; Handler (def default-dispatch-opts
{:min-level :info
:rate-limit
[[5 (enc/msecs :mins 1)]
[10 (enc/msecs :mins 15)]
[15 (enc/msecs :hours 1)]
[30 (enc/msecs :hours 6)]
]})
(defn handler:postal (defn handler:postal
"Experimental, subject to change. "Experimental, subject to change.
@ -56,18 +61,24 @@
Useful for emailing important alerts to admins, etc. Useful for emailing important alerts to admins, etc.
NB can incur financial costs!! Default handler dispatch options (override when calling `add-handler!`):
See tips section re: protecting against unexpected costs. `:min-level` - `:info`
`:rate-limit` -
[[5 (enc/msecs :mins 1)] ; Max 5 emails in 1 min
[10 (enc/msecs :mins 15)] ; Max 10 emails in 15 mins
[15 (enc/msecs :hours 1)] ; Max 15 emails in 1 hour
[30 (enc/msecs :hours 6)] ; Max 30 emails in 6 hours
]
Options: Options:
`:postal/conn-opts` - Map of connection opts given to `postal/send-message` `:conn-opts` - Map of connection opts given to `postal/send-message`
Examples: Examples:
{:host \"mail.isp.net\", :user \"jsmith\", :pass \"a-secret\"}, {:host \"mail.isp.net\", :user \"jsmith\", :pass \"a-secret\"},
{:host \"smtp.gmail.com\", :user \"jsmith@gmail.com\", :pass \"a-secret\" :port 587 :tls true}, {:host \"smtp.gmail.com\", :user \"jsmith@gmail.com\", :pass \"a-secret\" :port 587 :tls true},
{:host \"email-smtp.us-east-1.amazonaws.com\", :port 587, :tls true {:host \"email-smtp.us-east-1.amazonaws.com\", :port 587, :tls true,
:user \"AKIAIDTP........\" :pass \"AikCFhx1P.......\"} :user \"AKIAIDTP........\", :pass \"AikCFhx1P.......\"}
`:postal/msg-opts` - Map of message options given to `postal/send-message` `:msg-opts` - Map of message opts given to `postal/send-message`
Examples: Examples:
{:from \"foo@example.com\", :to \"bar@example.com\"}, {:from \"foo@example.com\", :to \"bar@example.com\"},
{:from \"Alice <foo@example.com\", :to \"Bob <bar@example.com>\"}, {:from \"Alice <foo@example.com\", :to \"Bob <bar@example.com>\"},
@ -81,59 +92,42 @@
see `format-signal-fn` or `pr-signal-fn` see `format-signal-fn` or `pr-signal-fn`
Tips: Tips:
- Sending emails can incur financial costs! - Ref. <https://github.com/drewr/postal> for more info on `postal` options.
Use appropriate dispatch filtering options when calling `add-handler!` to prevent - Sending emails can be slow, and can incur financial costs!
handler from sending unnecessary emails! Use appropriate handler dispatch options for async handling and rate limiting, etc."
At least ALWAYS set an appropriate `:rate-limit` option, e.g.:
(add-handler! :my-postal-handler (handler:postal {<my-handler-opts})
{:rate-limit {\"Max 1 per min\" [1 (enc/msecs :mins 1)]
\"Max 3 per 15 mins\" [3 (enc/msecs :mins 15)]
\"Max 5 per hour\" [5 (enc/msecs :hours 1)]}, ...}), etc.
- Sending emails is slow!
Use appropriate async dispatch options when calling `add-handler!` to prevent
handler from blocking signal creator calls, e.g.:
(add-handler! :my-postal-handler (handler:postal {<my-handler-opts>})
{:async {:mode :dropping, :buffer-size 128, :n-threads 4} ...}), etc.
- Ref. <https://github.com/drewr/postal> for more info on `postal` options."
;; ([] (handler:postal nil)) ;; ([] (handler:postal nil))
([{:keys ([{:keys [conn-opts msg-opts, subject-fn body-fn]
[postal/conn-opts
postal/msg-opts
subject-fn
body-fn]
:or :or
{subject-fn (signal-subject-fn) {subject-fn (signal-subject-fn)
body-fn (utils/format-signal-fn)}}] body-fn (utils/format-signal-fn)}}]
(when-not conn-opts (throw (ex-info "No `:postal/conn-opts` was given" {}))) (when-not (map? conn-opts) (throw (ex-info "Expected `:conn-opts` map" (enc/typed-val conn-opts))))
(when-not msg-opts (throw (ex-info "No `:postal/msg-opts` was given" {}))) (when-not (map? msg-opts) (throw (ex-info "Expected `:msg-opts` map" (enc/typed-val msg-opts))))
(let [] (let [handler-fn
(defn a-handler:postal (fn a-handler:postal
([]) ; Shut down (no-op) ([]) ; Shut down (no-op)
([signal] ([signal]
(enc/when-let [subject (subject-fn signal) (enc/when-let [subject (subject-fn signal)
body (body-fn signal)] body (body-fn signal)]
(let [msg (let [msg
(assoc msg-opts (assoc msg-opts
:subject (str subject) :subject (str subject)
:body :body
(if (string? body) (if (string? body)
[{:type "text/plain; charset=utf-8" [{:type "text/plain; charset=utf-8"
:content (str body)}] :content (str body)}]
body)) body))
[result ex] [result ex]
(try (try
[(postal/send-message conn-opts msg) nil] [(postal/send-message conn-opts msg) nil]
(catch Exception ex [nil ex])) (catch Exception ex [nil ex]))
success? (= (get result :code) 0)] success? (= (get result :code) 0)]
(when-not success? (when-not success?
(throw (ex-info "Failed to send email" result ex)))))))))) (throw (ex-info "Failed to send email" result ex)))))))]
(with-meta handler-fn default-dispatch-opts))))

View file

@ -14,10 +14,6 @@
(remove-ns 'taoensso.telemere.sockets) (remove-ns 'taoensso.telemere.sockets)
(:api (enc/interns-overview))) (:api (enc/interns-overview)))
;;;; Implementation
;;;; Handlers
(defn handler:tcp-socket (defn handler:tcp-socket
"Experimental, subject to change. "Experimental, subject to change.
@ -28,10 +24,12 @@
Can output signals as human or machine-readable (edn, JSON) strings. Can output signals as human or machine-readable (edn, JSON) strings.
Options: Options:
`host` - Destination TCP socket hostname string `:socket-opts` - {:keys [host port ssl? connect-timeout-msecs]}
`port` - Destination TCP socket port int `:host` - Destination TCP socket hostname string
`:port` - Destination TCP socket port int
`:ssl?` - Use SSL/TLS (default false)
`:connect-timeout-msecs` - Connection timeout (default 3000 msecs)
`:socket-opts` - {:keys [ssl? connect-timeout-msecs]}
`:output-fn` - (fn [signal]) => string, see `format-signal-fn` or `pr-signal-fn` `:output-fn` - (fn [signal]) => string, see `format-signal-fn` or `pr-signal-fn`
Limitations: Limitations:
@ -39,13 +37,12 @@
- Writes lock on a single underlying socket, so IO won't benefit from adding - Writes lock on a single underlying socket, so IO won't benefit from adding
extra handler threads. Let me know if there's demand for socket pooling." extra handler threads. Let me know if there's demand for socket pooling."
([host port] (handler:tcp-socket host port nil)) ;; ([] (handler:tcp-socket nil))
([host port ([{:keys [socket-opts output-fn]
{:keys [socket-opts output-fn]
:or {output-fn (utils/format-signal-fn)}}] :or {output-fn (utils/format-signal-fn)}}]
(let [sw (utils/tcp-socket-writer host port socket-opts)] (let [sw (utils/tcp-socket-writer socket-opts)]
(defn a-handler:tcp-socket (fn a-handler:tcp-socket
([] (sw)) ; Shut down ([] (sw)) ; Shut down
([signal] ([signal]
(when-let [output (output-fn signal)] (when-let [output (output-fn signal)]
@ -61,11 +58,12 @@
Can output signals as human or machine-readable (edn, JSON) strings. Can output signals as human or machine-readable (edn, JSON) strings.
Options: Options:
`host` - Destination UDP socket hostname string `:socket-opts` - {:keys [host port max-packet-bytes]}
`port` - Destination UDP socket port int `:host` - Destination UDP socket hostname string
`:port` - Destination UDP socket port int
`:max-packet-bytes` - Max packet size (in bytes) before truncating output (default 512)
`:output-fn` - (fn [signal]) => string, see `format-signal-fn` or `pr-signal-fn` `:output-fn` - (fn [signal]) => string, see `format-signal-fn` or `pr-signal-fn`
`:max-packet-bytes` - Max packet size (in bytes) before truncating output (default 512)
`:truncation-warning-fn` - Optional (fn [{:keys [max actual signal]}]) to call whenever `:truncation-warning-fn` - Optional (fn [{:keys [max actual signal]}]) to call whenever
output is truncated. Should be appropriately rate-limited! output is truncated. Should be appropriately rate-limited!
@ -77,20 +75,26 @@
- No DTLS (Datagram Transport Layer Security) support, - No DTLS (Datagram Transport Layer Security) support,
please let me know if there's demand." please let me know if there's demand."
([host port] (handler:udp-socket host port nil)) ;; ([] (handler:udp-socket nil))
([host port ([{:keys [socket-opts output-fn truncation-warning-fn]
{:keys [output-fn max-packet-bytes truncation-warning-fn]
:or :or
{output-fn (utils/format-signal-fn) {socket-opts {:max-packet-bytes 512}
max-packet-bytes 512}}] output-fn (utils/format-signal-fn)}}]
(let [{:keys [host port max-packet-bytes]
:or {max-packet-bytes 512}} socket-opts
max-packet-bytes (int max-packet-bytes)
(let [max-packet-bytes (int max-packet-bytes)
socket (DatagramSocket.) ; No need to change socket once created socket (DatagramSocket.) ; No need to change socket once created
lock (Object.)] lock (Object.)]
(when-not (string? host) (throw (ex-info "Expected `:host` string" (enc/typed-val host))))
(when-not (int? port) (throw (ex-info "Expected `:port` int" (enc/typed-val port))))
(.connect socket (InetSocketAddress. (str host) (int port))) (.connect socket (InetSocketAddress. (str host) (int port)))
(defn a-handler:udp-socket (fn a-handler:udp-socket
([] (.close socket)) ; Shut down ([] (.close socket)) ; Shut down
([signal] ([signal]
(when-let [output (output-fn signal)] (when-let [output (output-fn signal)]

View file

@ -205,7 +205,11 @@
- Flushes after every write. - Flushes after every write.
- Thread safe, locks on single file stream." - Thread safe, locks on single file stream."
[file append?] [{:keys [file append?]
:or {append? true}}]
(when-not file (throw (ex-info "Expected `:file` value" (enc/typed-val file))))
(let [file (writeable-file! file) (let [file (writeable-file! file)
stream_ (volatile! (file-stream file append?)) stream_ (volatile! (file-stream file append?))
open?_ (enc/latom true) open?_ (enc/latom true)
@ -258,7 +262,7 @@
(reset!) (reset!)
(write-ba! ba)))))))))))) (write-ba! ba))))))))))))
(comment (def fw1 (file-writer "test.txt" true)) (fw1 "x") (fw1)) (comment (def fw1 (file-writer {:file "test.txt"})) (fw1 "x") (fw1))
;;;; Sockets ;;;; Sockets
@ -306,9 +310,8 @@
- Advanced users may want a custom implementation using a connection - Advanced users may want a custom implementation using a connection
pool and/or more sophisticated retry semantics, etc." pool and/or more sophisticated retry semantics, etc."
[host port [{:keys
{:keys [host port, ssl? connect-timeout-msecs,
[ssl? connect-timeout-msecs,
socket-fn ssl-socket-fn] :as opts socket-fn ssl-socket-fn] :as opts
:or :or
@ -316,6 +319,9 @@
socket-fn default-socket-fn socket-fn default-socket-fn
ssl-socket-fn default-ssl-socket-fn}}] ssl-socket-fn default-ssl-socket-fn}}]
(when-not (string? host) (throw (ex-info "Expected `:host` string" (enc/typed-val host))))
(when-not (int? port) (throw (ex-info "Expected `:port` int" (enc/typed-val port))))
(let [new-conn! ; => [<java.net.Socket> <java.io.OutputStream>], or throws (let [new-conn! ; => [<java.net.Socket> <java.io.OutputStream>], or throws
(fn [] (fn []
(try (try

View file

@ -644,7 +644,7 @@
#?(:clj #?(:clj
(testing "File writer" (testing "File writer"
(let [f (java.io.File/createTempFile "file-writer-test" ".txt") (let [f (java.io.File/createTempFile "file-writer-test" ".txt")
fw (utils/file-writer f false)] fw (utils/file-writer {:file f, :append? false})]
[(is (true? (fw "1"))) [(is (true? (fw "1")))
(is (true? (.delete f))) (is (true? (.delete f)))