[new] Add :zstd compressor, new compressor backend

Also switch to https://github.com/airlift/aircompressor for faster
and combined implementations of: LZ4, Snappy
This commit is contained in:
Peter Taoussanis 2023-09-25 15:16:48 +02:00
parent c8f30e171d
commit 6ad5aebd1a
5 changed files with 175 additions and 235 deletions

View file

@ -10,9 +10,7 @@
:dependencies
[[org.clojure/tools.reader "1.3.6"]
[com.taoensso/encore "3.68.0"]
[org.iq80.snappy/snappy "0.4"]
[org.tukaani/xz "1.9"]
[org.lz4/lz4-java "1.8.0"]
[io.airlift/aircompressor "0.25"]]
:profiles
@ -37,8 +35,7 @@
:dependencies
[[org.clojure/test.check "1.1.1"]
[org.clojure/data.fressian "1.0.0"]
[org.xerial.snappy/snappy-java "1.1.10.4"]]}
[org.clojure/data.fressian "1.0.0"]]}
:graal-tests
{:dependencies [[org.clojure/clojure "1.11.1"]

View file

@ -56,8 +56,8 @@
"Final byte of 4-byte Nippy header stores version-dependent metadata"
;; Currently
;; - 5 compressors, #{nil :snappy :lz4 :lzma2 :else}
;; - 4 encryptors, #{nil :aes128-cbc-sha512 :aes128-gcm-sha512 :else}
;; - 6x compressors: #{nil :zstd :lz4 #_:lzo :lzma2 :snappy :else}
;; - 4x encryptors: #{nil :aes128-cbc-sha512 :aes128-gcm-sha512 :else}
{(byte 0) {:version 1 :compressor-id nil :encryptor-id nil}
(byte 2) {:version 1 :compressor-id nil :encryptor-id :aes128-cbc-sha512}
@ -69,7 +69,6 @@
(byte 15) {:version 1 :compressor-id :snappy :encryptor-id :aes128-gcm-sha512}
(byte 7) {:version 1 :compressor-id :snappy :encryptor-id :else}
;;; :lz4 used for both lz4 and lz4hc compressor (the two are compatible)
(byte 8) {:version 1 :compressor-id :lz4 :encryptor-id nil}
(byte 9) {:version 1 :compressor-id :lz4 :encryptor-id :aes128-cbc-sha512}
(byte 16) {:version 1 :compressor-id :lz4 :encryptor-id :aes128-gcm-sha512}
@ -80,6 +79,11 @@
(byte 17) {:version 1 :compressor-id :lzma2 :encryptor-id :aes128-gcm-sha512}
(byte 13) {:version 1 :compressor-id :lzma2 :encryptor-id :else}
(byte 20) {:version 1 :compressor-id :zstd :encryptor-id nil}
(byte 21) {:version 1 :compressor-id :zstd :encryptor-id :aes128-cbc-sha512}
(byte 22) {:version 1 :compressor-id :zstd :encryptor-id :aes128-gcm-sha512}
(byte 23) {:version 1 :compressor-id :zstd :encryptor-id :else}
(byte 5) {:version 1 :compressor-id :else :encryptor-id nil}
(byte 18) {:version 1 :compressor-id :else :encryptor-id :aes128-cbc-sha512}
(byte 19) {:version 1 :compressor-id :else :encryptor-id :aes128-gcm-sha512}
@ -330,10 +334,12 @@
(enc/defaliases
compression/compress
compression/decompress
compression/snappy-compressor
compression/lzma2-compressor
compression/zstd-compressor
compression/lz4-compressor
compression/lz4hc-compressor
#_compression/lzo-compressor
compression/snappy-compressor
compression/lzma2-compressor
encryption/encrypt
encryption/decrypt

View file

@ -3,7 +3,8 @@
(:require
[clojure.data.fressian :as fress]
[taoensso.encore :as enc]
[taoensso.nippy :as nippy]))
[taoensso.nippy :as nippy]
[taoensso.nippy.compression :as compr]))
;;;; Reader
@ -124,3 +125,55 @@
:nippy/encrypted {:round 3390, :freeze 1807, :thaw 1583, :size 16468}
:nippy/default {:round 2845, :freeze 1513, :thaw 1332, :size 16440}
:nippy/fast {:round 2634, :freeze 1338, :thaw 1296, :size 28454}})
;;;; Compressors
(let [_ (require '[taoensso.nippy :as nippy])
data (nippy/freeze nippy/stress-data-comparable {:compressor nil})]
(defn bench1-compressor
[{:keys [laps warmup] :or {laps 1e4, warmup 2e4}} compressor]
(let [data-compressed (compr/compress compressor data)
time-compress (enc/bench laps {:warmup-laps warmup} (compr/compress compressor data))
time-decompress (enc/bench laps {:warmup-laps warmup} (compr/decompress compressor data-compressed))]
{:round (+ time-compress time-decompress)
:compress time-compress
:decompress time-decompress
:ratio (enc/round2 (/ (count data-compressed) (count data)))}))
(defn bench-compressors [bench1-opts lzma-opts]
(merge
(let [bench1 #(bench1-compressor bench1-opts %)]
{:zstd/prepended (bench1 (compr/->ZstdCompressor true))
:zstd/unprepended (bench1 (compr/->ZstdCompressor false))
:lz4 (bench1 (compr/->LZ4Compressor))
:lzo (bench1 (compr/->LZOCompressor))
:snappy/prepended (bench1 (compr/->SnappyCompressor true))
:snappy/unprepended (bench1 (compr/->SnappyCompressor false))})
(let [bench1 #(bench1-compressor (merge bench1-opts lzma-opts) %)]
{:lzma2/level0 (bench1 (compr/->LZMA2Compressor 0))
:lzma2/level3 (bench1 (compr/->LZMA2Compressor 3))
:lzma2/level6 (bench1 (compr/->LZMA2Compressor 6))
:lzma2/level9 (bench1 (compr/->LZMA2Compressor 9))}))))
(comment
(bench-compressors
{:laps 1e4 :warmup 2e4}
{:laps 1e2 :warmup 2e2})
;; 2023 Aug 1, 2020 Apple MBP M1
;; [org.tukaani/xz "1.9"]
;; [io.airlift/aircompressor "0.25"]
{:zstd/prepended {:round 1672, :compress 1279, :decompress 393, :ratio 0.53}
:zstd/unprepended {:round 1668, :compress 1271, :decompress 397, :ratio 0.53}
:lz4 {:round 269, :compress 238, :decompress 31, :ratio 0.58}
:lzo {:round 259, :compress 216, :decompress 43, :ratio 0.58}
:snappy/prepended {:round 339, :compress 205, :decompress 134, :ratio 0.58}
:snappy/unprepended {:round 340, :compress 206, :decompress 134, :ratio 0.58}
:lzma2/level0 {:round 30300, :compress 18500, :decompress 11800, :ratio 0.4}
:lzma2/level3 {:round 49200, :compress 35700, :decompress 13500, :ratio 0.4}
:lzma2/level6 {:round 102600, :compress 86700, :decompress 15900, :ratio 0.41}
:lzma2/level9 {:round 434800, :compress 394700, :decompress 40100, :ratio 0.41}})

View file

@ -7,10 +7,6 @@
ByteArrayInputStream ByteArrayOutputStream
DataInputStream DataOutputStream]))
;;;; TODO
;; - NB always prepend uncompressed length?
;; - Consider (enc based)? dynamic uint from Tempel?
;;;; Interface
(defprotocol ICompressor
@ -20,9 +16,9 @@
(def ^:const standard-header-ids
"These support `:auto` thaw."
#{:snappy-v1 :lz4-v1 :lzma2-v1})
#{:zstd :lz4 #_:lzo :lzma2 :snappy})
;;;; Utils
;;;; Misc utils
(defn- int-size->ba ^bytes [size]
(let [ba (byte-array 4)
@ -38,9 +34,10 @@
(comment (ba->int-size (int-size->ba 3737)))
;;;; Airlift
(defn- airlift-compress
^bytes [^io.airlift.compress.Compressor c ^bytes ba prepend-size?]
(let [in-len (alength ba)
max-out-len (.maxCompressedLength c in-len)]
@ -88,167 +85,48 @@
ba-out 0 out-len)
ba-out)))
;;;; Snappy
(deftype SnappyCompressorV1 []
ICompressor
(header-id [_] :snappy-v1)
(compress [_ ba] (org.iq80.snappy.Snappy/compress ba))
(decompress [_ ba] (org.iq80.snappy.Snappy/uncompress ba 0 (alength ^bytes ba))))
(do
(enc/def* ^:private airlift-snappy-compressor_ (enc/thread-local (io.airlift.compress.snappy.SnappyCompressor.)))
(enc/def* ^:private airlift-snappy-decompressor_ (enc/thread-local (io.airlift.compress.snappy.SnappyDecompressor.)))
(deftype SnappyCompressorV2 [prepend-size?]
ICompressor
(header-id [_] :snappy-v2)
(compress [_ ba] (airlift-compress @airlift-snappy-compressor_ ba prepend-size?))
(decompress [_ ba] (airlift-decompress @airlift-snappy-decompressor_ ba
(when-not prepend-size?
(io.airlift.compress.snappy.SnappyDecompressor/getUncompressedLength ba 0))))))
(comment
(let [vba (vec (range 64))
ba (byte-array vba)
v1 (SnappyCompressorV1.)
v2 (SnappyCompressorV2. false)
v2p (SnappyCompressorV2. true)]
[(every? true? (for [c [v1 v2], d [v1 v2]] (= vba (->> ba (compress c) (decompress d) vec))))
(enc/qb 1e6 ; [306.24 271.6 296.76]
(->> ba (compress v1) (decompress v1))
(->> ba (compress v2) (decompress v2))
(->> ba (compress v2p) (decompress v2p)))]))
;;;; LZ4
(def ^:private lz4-factory_ (delay (net.jpountz.lz4.LZ4Factory/fastestInstance)))
(deftype LZ4CompressorV1 [compressor_ decompressor_]
ICompressor
(header-id [_] :lz4-v1)
(compress [_ ba]
(let [^net.jpountz.lz4.LZ4Compressor compressor @compressor_
len-uncomp (alength ^bytes ba)
max-len-comp (.maxCompressedLength compressor len-uncomp)
ba-comp (byte-array max-len-comp)
len-comp (.compress compressor ^bytes ba 0 len-uncomp
ba-comp 0 max-len-comp)
baos (ByteArrayOutputStream. (+ len-comp 4))
dos (DataOutputStream. baos)]
(.writeInt dos len-uncomp) ; Prepend with uncompressed length
(.write dos ba-comp 0 len-comp)
(.toByteArray baos)))
(decompress [_ ba]
(let [^net.jpountz.lz4.LZ4Decompressor decompressor @decompressor_
bais (ByteArrayInputStream. ba)
dis (DataInputStream. bais)
len-uncomp (.readInt dis)
len-comp (- (alength ^bytes ba) 4)
ba-uncomp (byte-array len-uncomp)]
(.decompress decompressor ba 4 ba-uncomp 0 len-uncomp)
ba-uncomp)))
(do
(enc/def* ^:private airlift-lz4-compressor_ (enc/thread-local (io.airlift.compress.lz4.Lz4Compressor.)))
(enc/def* ^:private airlift-lz4-decompressor_ (enc/thread-local (io.airlift.compress.lz4.Lz4Decompressor.)))
(deftype LZ4CompressorV2 []
ICompressor
(header-id [_] :lz4-v2)
(compress [_ ba] (airlift-compress @airlift-lz4-compressor_ ba true))
(decompress [_ ba] (airlift-decompress @airlift-lz4-decompressor_ ba nil))))
(comment
(let [vba (vec (range 64))
ba (byte-array vba)
v2 (LZ4CompressorV2.)
v1 (LZ4CompressorV1.
;; (delay (.highCompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_))
;; (delay (.fastDecompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_))
(do (delay (.fastCompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_)))
(do (delay (.fastDecompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_))))]
[(every? true? (for [c [v1 v2], d [v1 v2]] (= vba (->> ba (compress c) (decompress d) vec))))
(enc/qb 1e6 ; [638.55 284.19]
(->> ba (compress v1) (decompress v1))
(->> ba (compress v2) (decompress v2)))]))
;;;; LZMA
(deftype LZMA2Compressor [compression-level]
;; Compression level ∈ℕ[0,9] (low->high) with 6 LZMA2 default (we use 0)
ICompressor
(header-id [_] :lzma2)
(compress [_ ba]
(let [baos (ByteArrayOutputStream.)
dos (DataOutputStream. baos)
;;
len-decomp (alength ^bytes ba)
;; Prefix with uncompressed length:
_ (.writeInt dos len-decomp)
xzs (org.tukaani.xz.XZOutputStream. baos
(org.tukaani.xz.LZMA2Options. compression-level))]
(.write xzs ^bytes ba)
(.close xzs)
(.toByteArray baos)))
(decompress [_ ba]
(let [bais (ByteArrayInputStream. ba)
dis (DataInputStream. bais)
;;
len-decomp (.readInt dis)
ba (byte-array len-decomp)
xzs (org.tukaani.xz.XZInputStream. bais)]
(.read xzs ba 0 len-decomp)
(if (== -1 (.read xzs)) ; Good practice as extra safety measure
nil
(throw (ex-info "LZMA2 Decompress failed: corrupt data?" {:ba ba})))
ba)))
;;;; LZO
(do
(enc/def* ^:private airlift-lzo-compressor_ (enc/thread-local (io.airlift.compress.lzo.LzoCompressor.)))
(enc/def* ^:private airlift-lzo-decompressor_ (enc/thread-local (io.airlift.compress.lzo.LzoDecompressor.)))
(deftype LzoCompressor []
ICompressor
(header-id [_] :snappy-v2)
(compress [_ ba] (airlift-compress @airlift-lzo-compressor_ ba true))
(decompress [_ ba] (airlift-decompress @airlift-lzo-decompressor_ ba nil))))
;;;; Zstd
(do
(enc/def* ^:private airlift-zstd-compressor_ (enc/thread-local (io.airlift.compress.zstd.ZstdCompressor.)))
(enc/def* ^:private airlift-zstd-decompressor_ (enc/thread-local (io.airlift.compress.zstd.ZstdDecompressor.)))
(deftype LzoCompressor [prepend-size?]
(deftype ZstdCompressor [prepend-size?]
ICompressor
(header-id [_] :snappy-v2)
(header-id [_] :zstd)
(compress [_ ba] (airlift-compress @airlift-zstd-compressor_ ba prepend-size?))
(decompress [_ ba] (airlift-decompress @airlift-zstd-decompressor_ ba
(when-not prepend-size?
(io.airlift.compress.zstd.ZstdDecompressor/getDecompressedSize ba
0 (alength ^bytes ba)))))))
;;;;;;;;;;;;
(do
(enc/def* ^:private airlift-lz4-compressor_ (enc/thread-local (io.airlift.compress.lz4.Lz4Compressor.)))
(enc/def* ^:private airlift-lz4-decompressor_ (enc/thread-local (io.airlift.compress.lz4.Lz4Decompressor.)))
(deftype LZ4Compressor []
ICompressor
(header-id [_] :lz4)
(compress [_ ba] (airlift-compress @airlift-lz4-compressor_ ba true))
(decompress [_ ba] (airlift-decompress @airlift-lz4-decompressor_ ba nil))))
(def snappy-compressor
"Default org.iq80.snappy.Snappy compressor:
Ratio: low.
Write speed: very high.
Read speed: very high.
(do
(enc/def* ^:private airlift-lzo-compressor_ (enc/thread-local (io.airlift.compress.lzo.LzoCompressor.)))
(enc/def* ^:private airlift-lzo-decompressor_ (enc/thread-local (io.airlift.compress.lzo.LzoDecompressor.)))
(deftype LZOCompressor []
ICompressor
(header-id [_] :lzo)
(compress [_ ba] (airlift-compress @airlift-lzo-compressor_ ba true))
(decompress [_ ba] (airlift-decompress @airlift-lzo-decompressor_ ba nil))))
A good general-purpose compressor."
(SnappyCompressor.))
(do
(enc/def* ^:private airlift-snappy-compressor_ (enc/thread-local (io.airlift.compress.snappy.SnappyCompressor.)))
(enc/def* ^:private airlift-snappy-decompressor_ (enc/thread-local (io.airlift.compress.snappy.SnappyDecompressor.)))
(deftype SnappyCompressor [prepend-size?]
ICompressor
(header-id [_] :snappy)
(compress [_ ba] (airlift-compress @airlift-snappy-compressor_ ba prepend-size?))
(decompress [_ ba] (airlift-decompress @airlift-snappy-decompressor_ ba
(when-not prepend-size?
(io.airlift.compress.snappy.SnappyDecompressor/getUncompressedLength ba 0))))))
;;;; LZMA2
(deftype LZMA2Compressor [compression-level]
;; Compression level ∈ℕ[0,9] (low->high) with 6 LZMA2 default (we use 0)
@ -280,56 +158,59 @@
(throw (ex-info "LZMA2 Decompress failed: corrupt data?" {:ba ba})))
ba)))
(def lzma2-compressor
"Default org.tukaani.xz.LZMA2 compressor:
Ratio: high.
Write speed: _very_ slow (also currently single-threaded).
Read speed: slow.
;;;; Public API
A specialized compressor for large, low-write data in space-sensitive
environments."
(LZMA2Compressor. 0))
(def zstd-compressor
"Default `Zstd` (`Zstandard`) compressor:
- Compression ratio: `B` (0.53 on reference benchmark).
- Compression speed: `C` (1300 msecs on reference benchmark).
- Decompression speed: `B` (400 msecs on reference benchmark).
Good general-purpose compressor, balances ratio & speed.
See `taoensso.nippy.benchmarks` for detailed comparative benchmarks."
(ZstdCompressor. false))
(def lz4-compressor
"Default net.jpountz.lz4 compressor:
Ratio: low.
Write speed: very high.
Read speed: very high.
"Default `LZ4` compressor:
- Compression ratio: `C` (0.58 on reference benchmark).
- Compression speed: `A` (238 msecs on reference benchmark).
- Decompression speed: `A+` (31 msecs on reference benchmark).
A good general-purpose compressor, competitive with Snappy.
Good general-purpose compressor, favours speed.
See `taoensso.nippy.benchmarks` for detailed comparative benchmarks."
(LZ4Compressor.))
Thanks to Max Penet (@mpenet) for our first implementation,
Ref. https://github.com/mpenet/nippy-lz4"
(LZ4Compressor.
(delay (.fastCompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_))
(delay (.fastDecompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_))))
(def lzo-compressor
"Default `LZO` compressor:
- Compression ratio: `C` (0.58 on reference benchmark).
- Compression speed: `A` (216 msecs on reference benchmark).
- Decompression speed: `A` (43 msecs on reference benchmark).
(def lz4hc-compressor
"Like `lz4-compressor` but trades some write speed for ratio."
(LZ4Compressor.
(delay (.highCompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_))
(delay (.fastDecompressor ^net.jpountz.lz4.LZ4Factory @lz4-factory_))))
Good general-purpose compressor, favours speed.
See `taoensso.nippy.benchmarks` for detailed comparative benchmarks."
(LZOCompressor.))
(comment
(def ba-bench (.getBytes (apply str (repeatedly 1000 rand)) "UTF-8"))
(defn bench1 [compressor]
{:time
(enc/bench 1e4 {:nlaps-warmup 25e3}
(->> ba-bench (compress compressor) (decompress compressor)))
(def snappy-compressor
"Default `Snappy` compressor:
- Compression ratio: `C` (0.58 on reference benchmark).
- Compression speed: `A+` (206 msecs on reference benchmark).
- Decompression speed: `B` (134 msecs on reference benchmark).
:ratio
(enc/round2
(/
(count (compress compressor ba-bench))
(count ba-bench)))})
Good general-purpose compressor, favours speed.
See `taoensso.nippy.benchmarks` for detailed comparative benchmarks."
(SnappyCompressor. false))
{:snappy (bench1 snappy-compressor)
:lzma2 (bench1 lzma2-compressor) ; Slow!
:lz4 (bench1 lz4-compressor)
:lz4hc (bench1 lz4hc-compressor)}
(def lzma2-compressor
"Default `LZMA2` compressor:
- Compression ratio: `A+` (0.4 on reference benchmark).
- Compression speed: `E` (18.5 secs on reference benchmark).
- Decompression speed: `D` (11.8 secs on reference benchmark).
;; 2023 Sep 12, 2020 Apple MBP M1
{:snappy {:time 1111, :ratio 0.85},
:lzma2 {:time 23980, :ratio 0.49},
:lz4 {:time 494, :ratio 0.82},
:lz4hc {:time 2076, :ratio 0.76}})
Specialized compressor, strongly favours ratio.
See `taoensso.nippy.benchmarks` for detailed comparative benchmarks."
(LZMA2Compressor. 0))
(enc/def* ^:no-doc lz4hc-compressor
"Different LZ4 modes no longer supported, prefer `lz4-compressor`."
{:deprecated "vX.Y.Z (YYYY-MM-DD)"}
(LZ4Compressor.))

View file

@ -6,6 +6,8 @@
[clojure.test.check.properties :as tc-props]
[taoensso.encore :as enc :refer []]
[taoensso.nippy :as nippy :refer [freeze thaw]]
[taoensso.nippy.compression :as compr]
[taoensso.nippy.crypto :as crypto]
[taoensso.nippy.benchmarks :as benchmarks]))
(comment
@ -59,29 +61,16 @@
test-data)))
(is (= test-data ((comp #(thaw % {:compressor nippy/lz4-compressor})
#(freeze % {:compressor nippy/lz4hc-compressor}))
#(freeze % {:compressor nippy/lz4-compressor}))
test-data)))
(is (= test-data ((comp #(thaw % {:compressor nippy/zstd-compressor})
#(freeze % {:compressor nippy/zstd-compressor}))
test-data)))
(is (enc/throws? Exception (thaw (freeze test-data {:password "malformed"}))))
(is (enc/throws? Exception (thaw (freeze test-data {:password [:salted "p"]})
{;; Necessary to prevent against JVM segfault due to
;; https://goo.gl/t0OUIo:
:v1-compatibility? false})))
(is (enc/throws? Exception (thaw (freeze test-data {:password [:salted "p"]})
{:v1-compatibility? false ; Ref. https://goo.gl/t0OUIo
:compressor nil})))
(is
(let [^bytes raw-ba (freeze test-data {:compressor nil})
^bytes xerial-ba (org.xerial.snappy.Snappy/compress raw-ba)
^bytes iq80-ba (org.iq80.snappy.Snappy/compress raw-ba)]
(= (thaw raw-ba)
(thaw (org.xerial.snappy.Snappy/uncompress xerial-ba))
(thaw (org.xerial.snappy.Snappy/uncompress iq80-ba))
(thaw (org.iq80.snappy.Snappy/uncompress iq80-ba 0 (alength iq80-ba)))
(thaw (org.iq80.snappy.Snappy/uncompress xerial-ba 0 (alength xerial-ba)))))
"Snappy lib compatibility (for legacy versions of Nippy)")
(is (enc/throws? Exception (thaw (freeze test-data {:password [:salted "p"]}))))
(is (enc/throws? Exception (thaw (freeze test-data {:password [:salted "p"]}))))
(is
(= "payload"
@ -384,6 +373,20 @@
(let [ex (enc/throws :default (binding [nippy/*thaw-xform* (map (fn [x] (/ 1 0)))] (thaw (freeze [:a :b]))))]
(is (= (-> ex enc/ex-cause enc/ex-cause ex-data :call) '(rf acc in)) "Error thrown via `*thaw-xform*`"))])
;;;; Compressors
(deftest _compressors
(doseq [c [compr/zstd-compressor
compr/lz4-compressor
compr/lzo-compressor
compr/snappy-compressor
compr/lzma2-compressor]]
(dotimes [_ 100]
(is
(nil? (enc/catching (compr/decompress c (crypto/rand-bytes 1024))))
"Decompression never core dumps, even against invalid data"))))
;;;; Benchmarks
(deftest _benchmarks (is (benchmarks/bench {})))