mirror of
https://github.com/taoensso/telemere.git
synced 2025-12-22 19:51:12 +00:00
[nop] OpenTelemetry handler: move to latch for span contention protection
This'll be more reliable (and probably faster) under high load than the small-batch GC.
This commit is contained in:
parent
67cb4941bf
commit
e323896d54
1 changed files with 20 additions and 7 deletions
|
|
@ -13,7 +13,8 @@
|
||||||
(:import
|
(:import
|
||||||
[io.opentelemetry.api.common AttributesBuilder Attributes]
|
[io.opentelemetry.api.common AttributesBuilder Attributes]
|
||||||
[io.opentelemetry.api.logs LoggerProvider Severity]
|
[io.opentelemetry.api.logs LoggerProvider Severity]
|
||||||
[io.opentelemetry.api.trace TracerProvider Tracer Span]))
|
[io.opentelemetry.api.trace TracerProvider Tracer Span]
|
||||||
|
[java.util.concurrent CountDownLatch]))
|
||||||
|
|
||||||
(comment
|
(comment
|
||||||
(remove-ns 'taoensso.telemere.open-telemetry)
|
(remove-ns 'taoensso.telemere.open-telemetry)
|
||||||
|
|
@ -220,7 +221,7 @@
|
||||||
- `end-buffer_` - latom: #{[<uid> <end-inst>]}
|
- `end-buffer_` - latom: #{[<uid> <end-inst>]}
|
||||||
- `gc-buffer_` - latom: #{<uid>}"
|
- `gc-buffer_` - latom: #{<uid>}"
|
||||||
|
|
||||||
[tracer spans_ end-buffer_ gc-buffer_ signal]
|
[tracer spans_ end-buffer_ gc-buffer_ gc-latch_ signal]
|
||||||
|
|
||||||
;; Notes:
|
;; Notes:
|
||||||
;; - Spans go to `SpanExporter` after `.end` call, ~random order okay
|
;; - Spans go to `SpanExporter` after `.end` call, ~random order okay
|
||||||
|
|
@ -228,6 +229,9 @@
|
||||||
;; - No API to directly create spans with needed data, so we ~simulate
|
;; - No API to directly create spans with needed data, so we ~simulate
|
||||||
;; typical usage
|
;; typical usage
|
||||||
|
|
||||||
|
(when-let [^java.util.concurrent.CountDownLatch gc-latch (gc-latch_)]
|
||||||
|
(try (.await gc-latch) (catch InterruptedException _)))
|
||||||
|
|
||||||
(enc/when-let
|
(enc/when-let
|
||||||
[root (get signal :root) ; Tracing iff root
|
[root (get signal :root) ; Tracing iff root
|
||||||
root-uid (get root :uid)
|
root-uid (get root :uid)
|
||||||
|
|
@ -312,7 +316,7 @@
|
||||||
(def s2 s2)))
|
(def s2 s2)))
|
||||||
|
|
||||||
[@gc-buffer_ @end-buffer_ @spans_]
|
[@gc-buffer_ @end-buffer_ @spans_]
|
||||||
(handle-tracing! my-tr spans_ end-buffer_ gc-buffer_ s1))
|
(handle-tracing! my-tr spans_ end-buffer_ gc-buffer_ (enc/latom nil) s1))
|
||||||
|
|
||||||
;;;; Logging
|
;;;; Logging
|
||||||
|
|
||||||
|
|
@ -463,6 +467,8 @@
|
||||||
spans_ (when ?tracer (enc/latom {})) ; {<uid> <Span_>}
|
spans_ (when ?tracer (enc/latom {})) ; {<uid> <Span_>}
|
||||||
end-buffer1_ (when ?tracer (enc/latom #{})) ; #{[<uid> <end-inst>]}
|
end-buffer1_ (when ?tracer (enc/latom #{})) ; #{[<uid> <end-inst>]}
|
||||||
sgc-buffer1_ (when ?tracer (enc/latom #{})) ; #{<uid>} ; Slow GC
|
sgc-buffer1_ (when ?tracer (enc/latom #{})) ; #{<uid>} ; Slow GC
|
||||||
|
gc-latch_ (when ?tracer (enc/latom nil)) ; ?CountDownLatch
|
||||||
|
|
||||||
stop-tracing!
|
stop-tracing!
|
||||||
(if-not ?tracer
|
(if-not ?tracer
|
||||||
(fn stop-tracing! []) ; Noop
|
(fn stop-tracing! []) ; Noop
|
||||||
|
|
@ -484,9 +490,16 @@
|
||||||
(when-not (empty? uids-to-gc)
|
(when-not (empty? uids-to-gc)
|
||||||
(let [uids-to-gc (set/intersection uids-to-gc (set (keys (spans_))))]
|
(let [uids-to-gc (set/intersection uids-to-gc (set (keys (spans_))))]
|
||||||
(when-not (empty? uids-to-gc)
|
(when-not (empty? uids-to-gc)
|
||||||
;; Update in small batches to minimize spans_ contention
|
;; ;; Update in small batches to minimize contention
|
||||||
(doseq [batch (partition-all 10 uids-to-gc)]
|
;; (doseq [batch (partition-all 16 uids-to-gc)]
|
||||||
(spans_ (fn [old] (reduce dissoc old batch))))))))
|
;; (spans_ (fn [old] (reduce dissoc old batch))))
|
||||||
|
(let [gc-latch (java.util.concurrent.CountDownLatch. 1)]
|
||||||
|
(when (compare-and-set! gc-latch_ nil gc-latch)
|
||||||
|
(try
|
||||||
|
(spans_ (fn [old] (reduce dissoc old uids-to-gc)))
|
||||||
|
(finally
|
||||||
|
(.countDown gc-latch)
|
||||||
|
(reset! gc-latch_ nil)))))))))
|
||||||
|
|
||||||
move-uids!
|
move-uids!
|
||||||
(fn [src_ dst_]
|
(fn [src_ dst_]
|
||||||
|
|
@ -550,7 +563,7 @@
|
||||||
([signal]
|
([signal]
|
||||||
(let [?span
|
(let [?span
|
||||||
(when-let [^io.opentelemetry.api.trace.Tracer tracer ?tracer]
|
(when-let [^io.opentelemetry.api.trace.Tracer tracer ?tracer]
|
||||||
(handle-tracing! tracer spans_ end-buffer1_ sgc-buffer1_ signal))]
|
(handle-tracing! tracer spans_ end-buffer1_ sgc-buffer1_ gc-latch_ signal))]
|
||||||
|
|
||||||
(when-let [^io.opentelemetry.api.logs.LoggerProvider logger-provider ?logger-provider]
|
(when-let [^io.opentelemetry.api.logs.LoggerProvider logger-provider ?logger-provider]
|
||||||
(let [{:keys [ns inst level msg_]} signal
|
(let [{:keys [ns inst level msg_]} signal
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue