From e323896d54a7a44f3d4347f99605c94a7bc4ccb0 Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Mon, 19 Aug 2024 09:23:18 +0200 Subject: [PATCH] [nop] OpenTelemetry handler: move to latch for span contention protection This'll be more reliable (and probably faster) under high load than the small-batch GC. --- src/taoensso/telemere/open_telemetry.clj | 27 ++++++++++++++++++------ 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/taoensso/telemere/open_telemetry.clj b/src/taoensso/telemere/open_telemetry.clj index 82beea0..a1c2a6e 100644 --- a/src/taoensso/telemere/open_telemetry.clj +++ b/src/taoensso/telemere/open_telemetry.clj @@ -13,7 +13,8 @@ (:import [io.opentelemetry.api.common AttributesBuilder Attributes] [io.opentelemetry.api.logs LoggerProvider Severity] - [io.opentelemetry.api.trace TracerProvider Tracer Span])) + [io.opentelemetry.api.trace TracerProvider Tracer Span] + [java.util.concurrent CountDownLatch])) (comment (remove-ns 'taoensso.telemere.open-telemetry) @@ -220,7 +221,7 @@ - `end-buffer_` - latom: #{[ ]} - `gc-buffer_` - latom: #{}" - [tracer spans_ end-buffer_ gc-buffer_ signal] + [tracer spans_ end-buffer_ gc-buffer_ gc-latch_ signal] ;; Notes: ;; - Spans go to `SpanExporter` after `.end` call, ~random order okay @@ -228,6 +229,9 @@ ;; - No API to directly create spans with needed data, so we ~simulate ;; typical usage + (when-let [^java.util.concurrent.CountDownLatch gc-latch (gc-latch_)] + (try (.await gc-latch) (catch InterruptedException _))) + (enc/when-let [root (get signal :root) ; Tracing iff root root-uid (get root :uid) @@ -312,7 +316,7 @@ (def s2 s2))) [@gc-buffer_ @end-buffer_ @spans_] - (handle-tracing! my-tr spans_ end-buffer_ gc-buffer_ s1)) + (handle-tracing! my-tr spans_ end-buffer_ gc-buffer_ (enc/latom nil) s1)) ;;;; Logging @@ -463,6 +467,8 @@ spans_ (when ?tracer (enc/latom {})) ; { } end-buffer1_ (when ?tracer (enc/latom #{})) ; #{[ ]} sgc-buffer1_ (when ?tracer (enc/latom #{})) ; #{} ; Slow GC + gc-latch_ (when ?tracer (enc/latom nil)) ; ?CountDownLatch + stop-tracing! (if-not ?tracer (fn stop-tracing! []) ; Noop @@ -484,9 +490,16 @@ (when-not (empty? uids-to-gc) (let [uids-to-gc (set/intersection uids-to-gc (set (keys (spans_))))] (when-not (empty? uids-to-gc) - ;; Update in small batches to minimize spans_ contention - (doseq [batch (partition-all 10 uids-to-gc)] - (spans_ (fn [old] (reduce dissoc old batch)))))))) + ;; ;; Update in small batches to minimize contention + ;; (doseq [batch (partition-all 16 uids-to-gc)] + ;; (spans_ (fn [old] (reduce dissoc old batch)))) + (let [gc-latch (java.util.concurrent.CountDownLatch. 1)] + (when (compare-and-set! gc-latch_ nil gc-latch) + (try + (spans_ (fn [old] (reduce dissoc old uids-to-gc))) + (finally + (.countDown gc-latch) + (reset! gc-latch_ nil))))))))) move-uids! (fn [src_ dst_] @@ -550,7 +563,7 @@ ([signal] (let [?span (when-let [^io.opentelemetry.api.trace.Tracer tracer ?tracer] - (handle-tracing! tracer spans_ end-buffer1_ sgc-buffer1_ signal))] + (handle-tracing! tracer spans_ end-buffer1_ sgc-buffer1_ gc-latch_ signal))] (when-let [^io.opentelemetry.api.logs.LoggerProvider logger-provider ?logger-provider] (let [{:keys [ns inst level msg_]} signal