[new] [#153] PoC: transducer support on thaw
Note: also considered (but ultimately rejected) idea of a separate `*thaw-mapfn*` opt that operates directly on every `thaw-from-in!` result. This (transducer) approach is more flexible, and covers the most common use cases just fine. Having both seems excessive.
This commit is contained in:
parent
60bc4e9976
commit
89f98b440f
2 changed files with 90 additions and 20 deletions
|
|
@ -360,6 +360,45 @@
|
|||
|
||||
(enc/defonce ^:dynamic *incl-metadata?* "Include metadata when freezing/thawing?" true)
|
||||
|
||||
(enc/defonce ^:dynamic *thaw-xform*
|
||||
"Experimental, subject to change. Feedback welcome.
|
||||
|
||||
Transducer to use when thawing standard Clojure collection types
|
||||
(vectors, maps, sets, lists, etc.).
|
||||
|
||||
Allows fast+flexible inspection and manipulation of data being thawed.
|
||||
|
||||
Key-val style data structures like maps will provide `MapEntry` args
|
||||
to reducing function. Use `map-entry?`, `key`, `val` utils for these.
|
||||
|
||||
Example transducers:
|
||||
|
||||
(map (fn [x] (println x) x)) ; Print each coll item thawed
|
||||
|
||||
(comp
|
||||
(map (fn [x] (if (= x :secret) :redacted x))) ; Replace secrets
|
||||
(remove (fn [x] ; Remove maps with a truthy :remove?
|
||||
(or
|
||||
(and (map? x) (:remove? x))
|
||||
(and (map-entry? x) (= (key x) :remove?) (val y)))))))
|
||||
|
||||
Note that while this is a very powerful feature, correctly writing
|
||||
and debugging transducers and reducing fns can be tricky.
|
||||
|
||||
To help, if Nippy encounters an errors while applying your xform, it
|
||||
will throw a detailed `ExceptionInfo` with message
|
||||
\"Error thrown via `*thaw-xform*`\" to help you debug."
|
||||
nil)
|
||||
|
||||
(comment
|
||||
(binding [*thaw-xform*
|
||||
(comp
|
||||
(map (fn [x] (println x) x))
|
||||
(map (fn [x] (if (= x 1) 0 x)))
|
||||
(map (fn [x] (/ 1 0))))]
|
||||
|
||||
(thaw (freeze [1 1 0 1 1]))))
|
||||
|
||||
;;;; Java Serializable config
|
||||
;; Unfortunately quite a bit of complexity to do this safely
|
||||
|
||||
|
|
@ -1232,6 +1271,7 @@
|
|||
(opt->bindings :auto-freeze-compressor #'*auto-freeze-compressor*)
|
||||
(opt->bindings :custom-readers #'*custom-readers*)
|
||||
(opt->bindings :incl-metadata? #'*incl-metadata?*)
|
||||
(opt->bindings :thaw-xform #'*thaw-xform*)
|
||||
(opt->bindings :serializable-allowlist
|
||||
(case action
|
||||
:freeze #'*freeze-serializable-allowlist*
|
||||
|
|
@ -1355,13 +1395,34 @@
|
|||
|
||||
(defmacro ^:private editable? [coll] `(instance? clojure.lang.IEditableCollection ~coll))
|
||||
|
||||
(defn- xform* [xform] (enc/catching-xform {:error/msg "Error thrown via `*thaw-xform*`"} xform))
|
||||
|
||||
(defn- transduce-thaw1 [^DataInput in xform n init rf]
|
||||
(let [rf (if xform ((xform* xform) rf) rf)]
|
||||
(rf (enc/reduce-n (fn [acc _] (rf acc (thaw-from-in! in))) init n))))
|
||||
|
||||
(defn- transduce-thaw2 [^DataInput in xform n init rf2 rf1]
|
||||
(if xform
|
||||
(let [rf ((xform* xform) rf1)] (rf (enc/reduce-n (fn [acc _] (rf acc (clojure.lang.MapEntry/create (thaw-from-in! in) (thaw-from-in! in)))) init n)))
|
||||
(let [rf rf2 ] (rf (enc/reduce-n (fn [acc _] (rf acc (thaw-from-in! in) (thaw-from-in! in))) init n)))))
|
||||
|
||||
(defn- read-into [to ^DataInput in ^long n]
|
||||
(if (and (editable? to) (> n 10))
|
||||
(persistent!
|
||||
(enc/reduce-n (fn [acc _] (conj! acc (thaw-from-in! in)))
|
||||
(transient to) n))
|
||||
(transduce-thaw1 in *thaw-xform* n (transient to) (fn rf ([x] (persistent! x)) ([acc x] (conj! acc x))))
|
||||
(transduce-thaw1 in *thaw-xform* n to (fn rf ([x] x) ([acc x] (conj acc x))))))
|
||||
|
||||
(enc/reduce-n (fn [acc _] (conj acc (thaw-from-in! in))) to n)))
|
||||
(declare ^:private read-kvs-into)
|
||||
(defn- read-kvs-depr [to ^DataInput in] (read-kvs-into to in (quot (.readInt in) 2)))
|
||||
(defn- read-kvs-into [to ^DataInput in ^long n]
|
||||
(if (and (editable? to) (> n 10))
|
||||
|
||||
(transduce-thaw2 in *thaw-xform* n (transient to)
|
||||
(fn rf2 ([x] (persistent! x)) ([acc k v] (assoc! acc k v)))
|
||||
(fn rf1 ([x] (persistent! x)) ([acc kv] (assoc! acc (key kv) (val kv)))))
|
||||
|
||||
(transduce-thaw2 in *thaw-xform* n to
|
||||
(fn rf2 ([x] x) ([acc k v] (assoc acc k v)))
|
||||
(fn rf1 ([x] x) ([acc kv] (assoc acc (key kv) (val kv)))))))
|
||||
|
||||
(defn- read-objects [^objects ary ^DataInput in]
|
||||
(enc/reduce-n
|
||||
|
|
@ -1370,17 +1431,6 @@
|
|||
ary)
|
||||
ary (alength ary)))
|
||||
|
||||
(defn- read-kvs-into [to ^DataInput in ^long n]
|
||||
(if (and (editable? to) (> n 10))
|
||||
(persistent!
|
||||
(enc/reduce-n (fn [acc _] (assoc! acc (thaw-from-in! in) (thaw-from-in! in)))
|
||||
(transient to) n))
|
||||
|
||||
(enc/reduce-n (fn [acc _] (assoc acc (thaw-from-in! in) (thaw-from-in! in)))
|
||||
to n)))
|
||||
|
||||
(defn- read-kvs-depr [to ^DataInput in] (read-kvs-into to in (quot (.readInt in) 2)))
|
||||
|
||||
(def ^:private class-method-sig (into-array Class [IPersistentMap]))
|
||||
|
||||
(defn- read-custom! [in prefixed? type-id]
|
||||
|
|
@ -1593,8 +1643,8 @@
|
|||
id-regex (re-pattern (thaw-from-in! in))
|
||||
|
||||
id-vec-0 []
|
||||
id-vec-2 [(thaw-from-in! in) (thaw-from-in! in)]
|
||||
id-vec-3 [(thaw-from-in! in) (thaw-from-in! in) (thaw-from-in! in)]
|
||||
id-vec-2 (read-into [] in 2)
|
||||
id-vec-3 (read-into [] in 3)
|
||||
id-vec-sm* (read-into [] in (read-sm-count* in))
|
||||
id-vec-sm_ (read-into [] in (read-sm-count in))
|
||||
id-vec-md (read-into [] in (read-md-count in))
|
||||
|
|
@ -1758,7 +1808,7 @@
|
|||
(do (throw (ex-info (str "Unrecognized :auto encryptor id: " encryptor-id)
|
||||
{:encryptor-id encryptor-id})))))
|
||||
|
||||
(def ^:private err-msg-unknown-thaw-failure "Decryption/decompression failure, or data unfrozen/damaged.")
|
||||
(def ^:private err-msg-unknown-thaw-failure "Possible decryption/decompression error, unfrozen/damaged data, etc.")
|
||||
(def ^:private err-msg-unrecognized-header "Unrecognized (but apparently well-formed) header. Data frozen with newer Nippy version?")
|
||||
|
||||
(defn fast-thaw
|
||||
|
|
@ -1792,7 +1842,7 @@
|
|||
([^bytes ba
|
||||
{:as opts
|
||||
:keys [v1-compatibility? compressor encryptor password
|
||||
serializable-allowlist incl-metadata?]
|
||||
serializable-allowlist incl-metadata? thaw-xform]
|
||||
:or {compressor :auto
|
||||
encryptor :auto}}]
|
||||
|
||||
|
|
@ -1820,7 +1870,8 @@
|
|||
'*auto-freeze-compressor* *auto-freeze-compressor*
|
||||
'*custom-readers* *custom-readers*
|
||||
'*incl-metadata?* *incl-metadata?*
|
||||
'*thaw-serializable-allowlist* *thaw-serializable-allowlist*)}
|
||||
'*thaw-serializable-allowlist* *thaw-serializable-allowlist*
|
||||
'*thaw-xform* *thaw-xform*)}
|
||||
|
||||
e))))
|
||||
|
||||
|
|
|
|||
|
|
@ -372,6 +372,25 @@
|
|||
|
||||
"Don't try to preserve metadata on vars")])
|
||||
|
||||
;;;; thaw-xform
|
||||
|
||||
(deftest _thaw-xform
|
||||
[(is (= (binding [nippy/*thaw-xform* nil] (thaw (freeze [1 2 :secret 3 4]))) [1 2 :secret 3 4]))
|
||||
(is (= (binding [nippy/*thaw-xform* (map (fn [x] (if (= x :secret) :redacted x)))] (thaw (freeze [1 2 :secret 3 4]))) [1 2 :redacted 3 4]))
|
||||
|
||||
(is (= (binding [nippy/*thaw-xform* (remove (fn [x] (and (map-entry? x) (and (= (key x) :x) (val x)))))]
|
||||
(thaw (freeze {:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2 {:d3 :D3, :x :X}}})))
|
||||
{:a :A, :b :B, :c {}, :d #{:d1 :d2 {:d3 :D3}}}))
|
||||
|
||||
(is (= (binding [nippy/*thaw-xform* (remove (fn [x] (and (map? x) (contains? x :x))))]
|
||||
(thaw (freeze {:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2 {:d3 :D3, :x :X}}})))
|
||||
{:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2}}))
|
||||
|
||||
(is (= (binding [nippy/*thaw-xform* (map (fn [x] (/ 1 0)))] (thaw (freeze []))) []) "rf not run on empty colls")
|
||||
|
||||
(let [ex (enc/throws :default (binding [nippy/*thaw-xform* (map (fn [x] (/ 1 0)))] (thaw (freeze [:a :b]))))]
|
||||
(is (= (-> ex enc/ex-cause enc/ex-cause ex-data :call) '(rf acc in)) "Error thrown via `*thaw-xform*`"))])
|
||||
|
||||
;;;; Benchmarks
|
||||
|
||||
(deftest _benchmarks
|
||||
|
|
|
|||
Loading…
Reference in a new issue