Clean up new DataInput/Output API

This commit is contained in:
Peter Taoussanis 2014-01-22 14:14:26 +07:00
parent 87fcd3a9c6
commit 1d2daf206b
2 changed files with 154 additions and 146 deletions

View file

@ -111,6 +111,8 @@ Deserialize it:
Couldn't be simpler!
See also the lower-level `freeze-to-out!` and `thaw-from-in!` fns for operating on `DataOutput` and `DataInput` types directly.
### Encryption (currently in **ALPHA**)
Nippy v2+ also gives you **dead simple data encryption**. Add a single option to your usual freeze/thaw calls like so:
@ -128,12 +130,12 @@ There's two default forms of encryption on offer: `:salted` and `:cached`. Each
(defrecord MyType [data])
(nippy/extend-freeze MyType 1 ; A unique type id ∈[1, 128]
[x data-output-stream]
(.writeUTF data-output-stream (:data x)))
[x data-output]
(.writeUTF data-output (:data x)))
(nippy/extend-thaw 1 ; Same type id
[data-input-stream]
(->MyType (.readUTF data-input-stream)))
[data-input]
(->MyType (.readUTF data-input)))
(nippy/thaw (nippy/freeze (->MyType "Joe"))) => #taoensso.nippy.MyType{:data "Joe"}
```

View file

@ -92,31 +92,31 @@
(defprotocol Freezable
"Be careful about extending to interfaces, Ref. http://goo.gl/6gGRlU."
(freeze-to-stream* [this stream]))
(freeze-to-out* [this out]))
(defmacro write-id [s id] `(.writeByte ~s ~id))
(defmacro ^:private write-bytes [s ba]
`(let [s# ~s ba# ~ba]
(defmacro write-id [out id] `(.writeByte ~out ~id))
(defmacro ^:private write-bytes [out ba]
`(let [out# ~out, ba# ~ba]
(let [size# (alength ba#)]
(.writeInt s# size#)
(.write s# ba# 0 size#))))
(.writeInt out# size#)
(.write out# ba# 0 size#))))
(defmacro ^:private write-biginteger [s x] `(write-bytes ~s (.toByteArray ~x)))
(defmacro ^:private write-utf8 [s x] `(write-bytes ~s (.getBytes ~x "UTF-8")))
(defmacro ^:private freeze-to-stream
"Like `freeze-to-stream*` but with metadata support."
[s x]
`(let [x# ~x s# ~s]
(defmacro ^:private write-biginteger [out x] `(write-bytes ~out (.toByteArray ~x)))
(defmacro ^:private write-utf8 [out x] `(write-bytes ~out (.getBytes ~x "UTF-8")))
(defmacro ^:private freeze-to-out
"Like `freeze-to-out*` but with metadata support."
[out x]
`(let [out# ~out, x# ~x]
(when-let [m# (meta x#)]
(write-id s# ~id-meta)
(freeze-to-stream* m# s#))
(freeze-to-stream* x# s#)))
(write-id out# ~id-meta)
(freeze-to-out* m# out#))
(freeze-to-out* x# out#)))
(defmacro ^:private freezer [type id & body]
`(extend-type ~type
Freezable
(~'freeze-to-stream* [~'x ~(with-meta 's {:tag 'DataOutput})]
(write-id ~'s ~id)
(~'freeze-to-out* [~'x ~(with-meta 'out {:tag 'DataOutput})]
(write-id ~'out ~id)
~@body)))
(defmacro ^:private freezer-coll [type id & body]
@ -125,34 +125,34 @@
(when (instance? ISeq ~type)
(println (format "DEBUG - freezer-coll: %s for %s" ~type (type ~'x)))))
(if (counted? ~'x)
(do (.writeInt ~'s (count ~'x))
(doseq [i# ~'x] (freeze-to-stream ~'s i#)))
(let [bas# (ByteArrayOutputStream.)
s# (DataOutputStream. bas#)
cnt# (reduce (fn [cnt# i#]
(freeze-to-stream s# i#)
(unchecked-inc cnt#))
0 ~'x)
(do (.writeInt ~'out (count ~'x))
(doseq [i# ~'x] (freeze-to-out ~'out i#)))
(let [bas# (ByteArrayOutputStream.)
sout# (DataOutputStream. bas#)
cnt# (reduce (fn [cnt# i#]
(freeze-to-out sout# i#)
(unchecked-inc cnt#))
0 ~'x)
ba# (.toByteArray bas#)]
(.writeInt ~'s cnt#)
(.write ~'s ba# 0 (alength ba#))))))
(.writeInt ~'out cnt#)
(.write ~'out ba# 0 (alength ba#))))))
(defmacro ^:private freezer-kvs [type id & body]
`(freezer ~type ~id
(.writeInt ~'s (* 2 (count ~'x)))
(.writeInt ~'out (* 2 (count ~'x)))
(doseq [kv# ~'x]
(freeze-to-stream ~'s (key kv#))
(freeze-to-stream ~'s (val kv#)))))
(freeze-to-out ~'out (key kv#))
(freeze-to-out ~'out (val kv#)))))
(freezer (Class/forName "[B") id-bytes (write-bytes s ^bytes x))
(freezer (Class/forName "[B") id-bytes (write-bytes out ^bytes x))
(freezer nil id-nil)
(freezer Boolean id-boolean (.writeBoolean s x))
(freezer Boolean id-boolean (.writeBoolean out x))
(freezer Character id-char (.writeChar s (int x)))
(freezer String id-string (write-utf8 s x))
(freezer Keyword id-keyword (write-utf8 s (if-let [ns (namespace x)]
(str ns "/" (name x))
(name x))))
(freezer Character id-char (.writeChar out (int x)))
(freezer String id-string (write-utf8 out x))
(freezer Keyword id-keyword (write-utf8 out (if-let [ns (namespace x)]
(str ns "/" (name x))
(name x))))
(freezer-coll PersistentQueue id-queue)
(freezer-coll PersistentTreeSet id-sorted-set)
@ -169,34 +169,34 @@
(freezer-coll ISeq id-seq)
(freezer IRecord id-record
(write-utf8 s (.getName (class x))) ; Reflect
(freeze-to-stream s (into {} x)))
(write-utf8 out (.getName (class x))) ; Reflect
(freeze-to-out out (into {} x)))
(freezer Byte id-byte (.writeByte s x))
(freezer Short id-short (.writeShort s x))
(freezer Integer id-integer (.writeInt s x))
(freezer Long id-long (.writeLong s x))
(freezer BigInt id-bigint (write-biginteger s (.toBigInteger x)))
(freezer BigInteger id-bigint (write-biginteger s x))
(freezer Byte id-byte (.writeByte out x))
(freezer Short id-short (.writeShort out x))
(freezer Integer id-integer (.writeInt out x))
(freezer Long id-long (.writeLong out x))
(freezer BigInt id-bigint (write-biginteger out (.toBigInteger x)))
(freezer BigInteger id-bigint (write-biginteger out x))
(freezer Float id-float (.writeFloat s x))
(freezer Double id-double (.writeDouble s x))
(freezer Float id-float (.writeFloat out x))
(freezer Double id-double (.writeDouble out x))
(freezer BigDecimal id-bigdec
(write-biginteger s (.unscaledValue x))
(.writeInt s (.scale x)))
(write-biginteger out (.unscaledValue x))
(.writeInt out (.scale x)))
(freezer Ratio id-ratio
(write-biginteger s (.numerator x))
(write-biginteger s (.denominator x)))
(write-biginteger out (.numerator x))
(write-biginteger out (.denominator x)))
(freezer Date id-date (.writeLong s (.getTime x)))
(freezer Date id-date (.writeLong out (.getTime x)))
(freezer UUID id-uuid
(.writeLong s (.getMostSignificantBits x))
(.writeLong s (.getLeastSignificantBits x)))
(.writeLong out (.getMostSignificantBits x))
(.writeLong out (.getLeastSignificantBits x)))
(def ^:dynamic *final-freeze-fallback* "Alpha - subject to change." nil)
(defn freeze-fallback-as-str "Alpha-subject to change." [x s]
(freeze-to-stream* {:nippy/unfreezable (pr-str x) :type (type x)} s))
(defn freeze-fallback-as-str "Alpha-subject to change." [x out]
(freeze-to-out* {:nippy/unfreezable (pr-str x) :type (type x)} out))
(comment
(require '[clojure.core.async :as async])
@ -207,23 +207,23 @@
;; interfering with higher-level implementations, Ref. http://goo.gl/6f7SKl
(extend-type Object
Freezable
(freeze-to-stream* [x ^DataOutput s]
(freeze-to-out* [x ^DataOutput out]
(cond
(utils/serializable? x) ; Fallback #1: Java's Serializable interface
(do (when-debug-mode
(println (format "DEBUG - Serializable fallback: %s" (type x))))
(write-id s id-serializable)
(write-utf8 s (.getName (class x))) ; Reflect
(.writeObject (ObjectOutputStream. s) x))
(write-id out id-serializable)
(write-utf8 out (.getName (class x))) ; Reflect
(.writeObject (ObjectOutputStream. out) x))
(utils/readable? x) ; Fallback #2: Clojure's Reader
(do (when-debug-mode
(println (format "DEBUG - Reader fallback: %s" (type x))))
(write-id s id-reader)
(write-utf8 s (pr-str x)))
(write-id out id-reader)
(write-utf8 out (pr-str x)))
:else ; Fallback #3: *final-freeze-fallback*
(if-let [ffb *final-freeze-fallback*] (ffb x s)
(if-let [ffb *final-freeze-fallback*] (ffb x out)
(throw (Exception. (format "Unfreezable type: %s %s"
(type x) (str x))))))))
@ -240,10 +240,10 @@
(declare assert-legacy-args) ; Deprecated
(defn freeze-to-stream!
(defn freeze-to-out!
"Low-level API. Serializes arg (any Clojure data type) to a DataOutput."
[^DataOutput data-output x & _]
(freeze-to-stream data-output x))
(freeze-to-out data-output x))
(defn freeze
"Serializes arg (any Clojure data type) to a byte array. For custom types
@ -255,9 +255,9 @@
(when (:legacy-mode opts) ; Deprecated
(assert-legacy-args compressor password))
(let [skip-header? (or skip-header? (:legacy-mode opts)) ; Deprecated
bas (ByteArrayOutputStream.)
ds (DataOutputStream. bas)]
(freeze-to-stream! ds x)
bas (ByteArrayOutputStream.)
sout (DataOutputStream. bas)]
(freeze-to-out! sout x)
(let [ba (.toByteArray bas)
ba (if compressor (compression/compress compressor ba) ba)
ba (if password (encryption/encrypt encryptor password ba) ba)]
@ -267,29 +267,29 @@
;;;; Thawing
(declare thaw-from-stream)
(declare thaw-from-in)
(defmacro ^:private read-bytes [s]
`(let [s# ~s
size# (.readInt s#)
(defmacro ^:private read-bytes [in]
`(let [in# ~in
size# (.readInt in#)
ba# (byte-array size#)]
(.readFully s# ba# 0 size#) ba#))
(.readFully in# ba# 0 size#) ba#))
(defmacro ^:private read-biginteger [s] `(BigInteger. (read-bytes ~s)))
(defmacro ^:private read-utf8 [s] `(String. (read-bytes ~s) "UTF-8"))
(defmacro ^:private read-biginteger [in] `(BigInteger. (read-bytes ~in)))
(defmacro ^:private read-utf8 [in] `(String. (read-bytes ~in) "UTF-8"))
(defmacro ^:private read-coll [s coll]
`(let [s# ~s] (utils/repeatedly-into ~coll (.readInt s#) (thaw-from-stream s#))))
(defmacro ^:private read-coll [in coll]
`(let [in# ~in] (utils/repeatedly-into ~coll (.readInt in#) (thaw-from-in in#))))
(defmacro ^:private read-kvs [s coll]
`(let [s# ~s] (utils/repeatedly-into ~coll (/ (.readInt s#) 2)
[(thaw-from-stream s#) (thaw-from-stream s#)])))
(defmacro ^:private read-kvs [in coll]
`(let [in# ~in] (utils/repeatedly-into ~coll (/ (.readInt in#) 2)
[(thaw-from-in in#) (thaw-from-in in#)])))
(declare ^:private custom-readers)
(defn- thaw-from-stream
[^DataInput s]
(let [type-id (.readByte s)]
(defn- thaw-from-in
[^DataInput in]
(let [type-id (.readByte in)]
(try
(when-debug-mode
(println (format "DEBUG - thawing type-id: %s" type-id)))
@ -297,76 +297,76 @@
(utils/case-eval type-id
id-reader
(let [edn (read-utf8 s)]
(let [edn (read-utf8 in)]
(try (edn/read-string {:readers *data-readers*} edn)
(catch Exception _ {:nippy/unthawable edn
:type :reader})))
id-serializable
(let [class-name (read-utf8 s)]
(let [class-name (read-utf8 in)]
(try (let [;; .readObject _before_ Class/forName: it'll always read
;; all data before throwing
object (.readObject (ObjectInputStream. s))
object (.readObject (ObjectInputStream. in))
class ^Class (Class/forName class-name)]
(cast class object))
(catch Exception _ {:nippy/unthawable class-name
:type :serializable})))
id-bytes (read-bytes s)
id-bytes (read-bytes in)
id-nil nil
id-boolean (.readBoolean s)
id-boolean (.readBoolean in)
id-char (.readChar s)
id-string (read-utf8 s)
id-keyword (keyword (read-utf8 s))
id-char (.readChar in)
id-string (read-utf8 in)
id-keyword (keyword (read-utf8 in))
id-queue (read-coll s (PersistentQueue/EMPTY))
id-sorted-set (read-coll s (sorted-set))
id-sorted-map (read-kvs s (sorted-map))
id-queue (read-coll in (PersistentQueue/EMPTY))
id-sorted-set (read-coll in (sorted-set))
id-sorted-map (read-kvs in (sorted-map))
id-list (into '() (rseq (read-coll s [])))
id-vector (read-coll s [])
id-set (read-coll s #{})
id-map (read-kvs s {})
id-seq (seq (read-coll s []))
id-list (into '() (rseq (read-coll in [])))
id-vector (read-coll in [])
id-set (read-coll in #{})
id-map (read-kvs in {})
id-seq (seq (read-coll in []))
id-meta (let [m (thaw-from-stream s)] (with-meta (thaw-from-stream s) m))
id-meta (let [m (thaw-from-in in)] (with-meta (thaw-from-in in) m))
id-byte (.readByte s)
id-short (.readShort s)
id-integer (.readInt s)
id-long (.readLong s)
id-bigint (bigint (read-biginteger s))
id-byte (.readByte in)
id-short (.readShort in)
id-integer (.readInt in)
id-long (.readLong in)
id-bigint (bigint (read-biginteger in))
id-float (.readFloat s)
id-double (.readDouble s)
id-bigdec (BigDecimal. (read-biginteger s) (.readInt s))
id-float (.readFloat in)
id-double (.readDouble in)
id-bigdec (BigDecimal. (read-biginteger in) (.readInt in))
id-ratio (/ (bigint (read-biginteger s))
(bigint (read-biginteger s)))
id-ratio (/ (bigint (read-biginteger in))
(bigint (read-biginteger in)))
id-record
(let [class ^Class (Class/forName (read-utf8 s))
(let [class ^Class (Class/forName (read-utf8 in))
meth-sig (into-array Class [IPersistentMap])
method ^Method (.getMethod class "create" meth-sig)]
(.invoke method class (into-array Object [(thaw-from-stream s)])))
(.invoke method class (into-array Object [(thaw-from-in in)])))
id-date (Date. (.readLong s))
id-uuid (UUID. (.readLong s) (.readLong s))
id-date (Date. (.readLong in))
id-uuid (UUID. (.readLong in) (.readLong in))
;;; DEPRECATED
id-old-reader (edn/read-string (.readUTF s))
id-old-string (.readUTF s)
id-old-reader (edn/read-string (.readUTF in))
id-old-string (.readUTF in)
id-old-map (apply hash-map (utils/repeatedly-into []
(* 2 (.readInt s)) (thaw-from-stream s)))
id-old-keyword (keyword (.readUTF s))
(* 2 (.readInt in)) (thaw-from-in in)))
id-old-keyword (keyword (.readUTF in))
(if-not (neg? type-id)
(throw (Exception. (str "Unknown type ID: " type-id)))
;; Custom types
(if-let [reader (get @custom-readers type-id)]
(try (reader s)
(try (reader in)
(catch Exception e
(throw (Exception. (str "Reader exception for custom type ID: "
(- type-id)) e))))
@ -376,11 +376,11 @@
(catch Exception e
(throw (Exception. (format "Thaw failed against type-id: %s" type-id) e))))))
(defn thaw-from-stream!
(defn thaw-from-in!
"Low-level API. Deserializes a frozen object from given DataInput to its
original Clojure data type."
[data-input & _]
(thaw-from-stream data-input))
(thaw-from-in data-input))
(defn- try-parse-header [ba]
(when-let [[head-ba data-ba] (utils/ba-split ba 4)]
@ -418,8 +418,8 @@
(let [ba data-ba
ba (if password (encryption/decrypt encryptor password ba) ba)
ba (if compressor (compression/decompress compressor ba) ba)
stream (DataInputStream. (ByteArrayInputStream. ba))]
(thaw-from-stream! stream))
sin (DataInputStream. (ByteArrayInputStream. ba))]
(thaw-from-in! sin))
(catch Exception e
(cond
@ -475,49 +475,49 @@
Extends Nippy to support freezing of a custom type (ideally concrete) with
id [1, 128]:
(defrecord MyType [data])
(extend-freeze MyType 1 [x data-output-stream]
(.writeUTF [data-output-stream] (:data x)))"
[type custom-type-id [x stream] & body]
(extend-freeze MyType 1 [x data-output]
(.writeUTF [data-output] (:data x)))"
[type custom-type-id [x out] & body]
(assert (and (>= custom-type-id 1) (<= custom-type-id 128)))
`(extend-type ~type
Freezable
(~'freeze-to-stream* [~x ~(with-meta stream {:tag 'java.io.DataOutput})]
(write-id ~stream ~(int (- custom-type-id)))
(~'freeze-to-out* [~x ~(with-meta out {:tag 'java.io.DataOutput})]
(write-id ~out ~(int (- custom-type-id)))
~@body)))
(defonce custom-readers (atom {})) ; {<custom-type-id> (fn [data-input-stream]) ...}
(defonce custom-readers (atom {})) ; {<custom-type-id> (fn [data-input]) ...}
(defmacro extend-thaw
"Alpha - subject to change.
Extends Nippy to support thawing of a custom type with id [1, 128]:
(extend-thaw 1 [data-input-stream]
(->MyType (.readUTF data-input-stream)))"
[custom-type-id [stream] & body]
(extend-thaw 1 [data-input]
(->MyType (.readUTF data-input)))"
[custom-type-id [in] & body]
(assert (and (>= custom-type-id 1) (<= custom-type-id 128)))
`(swap! custom-readers assoc ~(int (- custom-type-id))
(fn [~(with-meta stream {:tag 'java.io.DataInput})]
(fn [~(with-meta in {:tag 'java.io.DataInput})]
~@body)))
(comment (defrecord MyType [data])
(extend-freeze MyType 1 [x s] (.writeUTF s (:data x)))
(extend-thaw 1 [s] (->MyType (.readUTF s)))
(extend-freeze MyType 1 [x out] (.writeUTF out (:data x)))
(extend-thaw 1 [in] (->MyType (.readUTF in)))
(thaw (freeze (->MyType "Joe"))))
;;; Some useful custom types - EXPERIMENTAL
(defrecord Compressable-LZMA2 [value])
(extend-freeze Compressable-LZMA2 128 [x st]
(extend-freeze Compressable-LZMA2 128 [x out]
(let [[_ ^bytes ba] (-> (freeze (:value x) {:compressor nil})
(utils/ba-split 4))
ba-len (alength ba)
compress? (> ba-len 1024)]
(.writeBoolean st compress?)
(if-not compress? (write-bytes st ba)
(.writeBoolean out compress?)
(if-not compress? (write-bytes out ba)
(let [ba* (compression/compress compression/lzma2-compressor ba)]
(write-bytes st ba*)))))
(write-bytes out ba*)))))
(extend-thaw 128 [st]
(let [compressed? (.readBoolean st)
ba (read-bytes st)]
(extend-thaw 128 [in]
(let [compressed? (.readBoolean in)
ba (read-bytes in)]
(thaw (wrap-header ba {:compressed? compressed? :encrypted? false})
{:compressor compression/lzma2-compressor})))
@ -625,6 +625,12 @@
;;;; Deprecated API
(def freeze-to-stream! "DEPRECATED: Use `freeze-to-out!` instead."
freeze-to-out!)
(def thaw-from-stream! "DEPRECATED: Use `thaw-from-in!` instead."
thaw-from-in!)
(defn- assert-legacy-args [compressor password]
(when password
(throw (AssertionError. "Encryption not supported in legacy mode.")))