From 89f98b440f0faaca5de04db34c4133b249d0ba7d Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Tue, 1 Aug 2023 14:46:22 +0200 Subject: [PATCH] [new] [#153] PoC: transducer support on thaw Note: also considered (but ultimately rejected) idea of a separate `*thaw-mapfn*` opt that operates directly on every `thaw-from-in!` result. This (transducer) approach is more flexible, and covers the most common use cases just fine. Having both seems excessive. --- src/taoensso/nippy.clj | 91 +++++++++++++++++++++++++++-------- test/taoensso/nippy_tests.clj | 19 ++++++++ 2 files changed, 90 insertions(+), 20 deletions(-) diff --git a/src/taoensso/nippy.clj b/src/taoensso/nippy.clj index 34aa983..942ab43 100644 --- a/src/taoensso/nippy.clj +++ b/src/taoensso/nippy.clj @@ -360,6 +360,45 @@ (enc/defonce ^:dynamic *incl-metadata?* "Include metadata when freezing/thawing?" true) +(enc/defonce ^:dynamic *thaw-xform* + "Experimental, subject to change. Feedback welcome. + + Transducer to use when thawing standard Clojure collection types + (vectors, maps, sets, lists, etc.). + + Allows fast+flexible inspection and manipulation of data being thawed. + + Key-val style data structures like maps will provide `MapEntry` args + to reducing function. Use `map-entry?`, `key`, `val` utils for these. + + Example transducers: + + (map (fn [x] (println x) x)) ; Print each coll item thawed + + (comp + (map (fn [x] (if (= x :secret) :redacted x))) ; Replace secrets + (remove (fn [x] ; Remove maps with a truthy :remove? + (or + (and (map? x) (:remove? x)) + (and (map-entry? x) (= (key x) :remove?) (val y))))))) + + Note that while this is a very powerful feature, correctly writing + and debugging transducers and reducing fns can be tricky. + + To help, if Nippy encounters an errors while applying your xform, it + will throw a detailed `ExceptionInfo` with message + \"Error thrown via `*thaw-xform*`\" to help you debug." + nil) + +(comment + (binding [*thaw-xform* + (comp + (map (fn [x] (println x) x)) + (map (fn [x] (if (= x 1) 0 x))) + (map (fn [x] (/ 1 0))))] + + (thaw (freeze [1 1 0 1 1])))) + ;;;; Java Serializable config ;; Unfortunately quite a bit of complexity to do this safely @@ -1232,6 +1271,7 @@ (opt->bindings :auto-freeze-compressor #'*auto-freeze-compressor*) (opt->bindings :custom-readers #'*custom-readers*) (opt->bindings :incl-metadata? #'*incl-metadata?*) + (opt->bindings :thaw-xform #'*thaw-xform*) (opt->bindings :serializable-allowlist (case action :freeze #'*freeze-serializable-allowlist* @@ -1355,13 +1395,34 @@ (defmacro ^:private editable? [coll] `(instance? clojure.lang.IEditableCollection ~coll)) +(defn- xform* [xform] (enc/catching-xform {:error/msg "Error thrown via `*thaw-xform*`"} xform)) + +(defn- transduce-thaw1 [^DataInput in xform n init rf] + (let [rf (if xform ((xform* xform) rf) rf)] + (rf (enc/reduce-n (fn [acc _] (rf acc (thaw-from-in! in))) init n)))) + +(defn- transduce-thaw2 [^DataInput in xform n init rf2 rf1] + (if xform + (let [rf ((xform* xform) rf1)] (rf (enc/reduce-n (fn [acc _] (rf acc (clojure.lang.MapEntry/create (thaw-from-in! in) (thaw-from-in! in)))) init n))) + (let [rf rf2 ] (rf (enc/reduce-n (fn [acc _] (rf acc (thaw-from-in! in) (thaw-from-in! in))) init n))))) + (defn- read-into [to ^DataInput in ^long n] (if (and (editable? to) (> n 10)) - (persistent! - (enc/reduce-n (fn [acc _] (conj! acc (thaw-from-in! in))) - (transient to) n)) + (transduce-thaw1 in *thaw-xform* n (transient to) (fn rf ([x] (persistent! x)) ([acc x] (conj! acc x)))) + (transduce-thaw1 in *thaw-xform* n to (fn rf ([x] x) ([acc x] (conj acc x)))))) - (enc/reduce-n (fn [acc _] (conj acc (thaw-from-in! in))) to n))) +(declare ^:private read-kvs-into) +(defn- read-kvs-depr [to ^DataInput in] (read-kvs-into to in (quot (.readInt in) 2))) +(defn- read-kvs-into [to ^DataInput in ^long n] + (if (and (editable? to) (> n 10)) + + (transduce-thaw2 in *thaw-xform* n (transient to) + (fn rf2 ([x] (persistent! x)) ([acc k v] (assoc! acc k v))) + (fn rf1 ([x] (persistent! x)) ([acc kv] (assoc! acc (key kv) (val kv))))) + + (transduce-thaw2 in *thaw-xform* n to + (fn rf2 ([x] x) ([acc k v] (assoc acc k v))) + (fn rf1 ([x] x) ([acc kv] (assoc acc (key kv) (val kv))))))) (defn- read-objects [^objects ary ^DataInput in] (enc/reduce-n @@ -1370,17 +1431,6 @@ ary) ary (alength ary))) -(defn- read-kvs-into [to ^DataInput in ^long n] - (if (and (editable? to) (> n 10)) - (persistent! - (enc/reduce-n (fn [acc _] (assoc! acc (thaw-from-in! in) (thaw-from-in! in))) - (transient to) n)) - - (enc/reduce-n (fn [acc _] (assoc acc (thaw-from-in! in) (thaw-from-in! in))) - to n))) - -(defn- read-kvs-depr [to ^DataInput in] (read-kvs-into to in (quot (.readInt in) 2))) - (def ^:private class-method-sig (into-array Class [IPersistentMap])) (defn- read-custom! [in prefixed? type-id] @@ -1593,8 +1643,8 @@ id-regex (re-pattern (thaw-from-in! in)) id-vec-0 [] - id-vec-2 [(thaw-from-in! in) (thaw-from-in! in)] - id-vec-3 [(thaw-from-in! in) (thaw-from-in! in) (thaw-from-in! in)] + id-vec-2 (read-into [] in 2) + id-vec-3 (read-into [] in 3) id-vec-sm* (read-into [] in (read-sm-count* in)) id-vec-sm_ (read-into [] in (read-sm-count in)) id-vec-md (read-into [] in (read-md-count in)) @@ -1758,7 +1808,7 @@ (do (throw (ex-info (str "Unrecognized :auto encryptor id: " encryptor-id) {:encryptor-id encryptor-id}))))) -(def ^:private err-msg-unknown-thaw-failure "Decryption/decompression failure, or data unfrozen/damaged.") +(def ^:private err-msg-unknown-thaw-failure "Possible decryption/decompression error, unfrozen/damaged data, etc.") (def ^:private err-msg-unrecognized-header "Unrecognized (but apparently well-formed) header. Data frozen with newer Nippy version?") (defn fast-thaw @@ -1792,7 +1842,7 @@ ([^bytes ba {:as opts :keys [v1-compatibility? compressor encryptor password - serializable-allowlist incl-metadata?] + serializable-allowlist incl-metadata? thaw-xform] :or {compressor :auto encryptor :auto}}] @@ -1820,7 +1870,8 @@ '*auto-freeze-compressor* *auto-freeze-compressor* '*custom-readers* *custom-readers* '*incl-metadata?* *incl-metadata?* - '*thaw-serializable-allowlist* *thaw-serializable-allowlist*)} + '*thaw-serializable-allowlist* *thaw-serializable-allowlist* + '*thaw-xform* *thaw-xform*)} e)))) diff --git a/test/taoensso/nippy_tests.clj b/test/taoensso/nippy_tests.clj index 7ace056..644132d 100644 --- a/test/taoensso/nippy_tests.clj +++ b/test/taoensso/nippy_tests.clj @@ -372,6 +372,25 @@ "Don't try to preserve metadata on vars")]) +;;;; thaw-xform + +(deftest _thaw-xform + [(is (= (binding [nippy/*thaw-xform* nil] (thaw (freeze [1 2 :secret 3 4]))) [1 2 :secret 3 4])) + (is (= (binding [nippy/*thaw-xform* (map (fn [x] (if (= x :secret) :redacted x)))] (thaw (freeze [1 2 :secret 3 4]))) [1 2 :redacted 3 4])) + + (is (= (binding [nippy/*thaw-xform* (remove (fn [x] (and (map-entry? x) (and (= (key x) :x) (val x)))))] + (thaw (freeze {:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2 {:d3 :D3, :x :X}}}))) + {:a :A, :b :B, :c {}, :d #{:d1 :d2 {:d3 :D3}}})) + + (is (= (binding [nippy/*thaw-xform* (remove (fn [x] (and (map? x) (contains? x :x))))] + (thaw (freeze {:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2 {:d3 :D3, :x :X}}}))) + {:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2}})) + + (is (= (binding [nippy/*thaw-xform* (map (fn [x] (/ 1 0)))] (thaw (freeze []))) []) "rf not run on empty colls") + + (let [ex (enc/throws :default (binding [nippy/*thaw-xform* (map (fn [x] (/ 1 0)))] (thaw (freeze [:a :b]))))] + (is (= (-> ex enc/ex-cause enc/ex-cause ex-data :call) '(rf acc in)) "Error thrown via `*thaw-xform*`"))]) + ;;;; Benchmarks (deftest _benchmarks