Merge branch 'dev'

This commit is contained in:
Peter Taoussanis 2013-06-17 15:13:27 +07:00
commit a5c986672d
11 changed files with 537 additions and 42 deletions

View file

@ -2,7 +2,7 @@ Current [semantic](http://semver.org/) version:
```clojure
[com.taoensso/nippy "1.2.1"] ; Stable
[com.taoensso/nippy "2.0.0-alpha7"] ; Development (notes below)
[com.taoensso/nippy "2.0.0-alpha8"] ; Development (notes below)
```
2.x adds pluggable compression, crypto support (also pluggable), an improved API (including much better error messages), and hugely improved performance. It **is backwards compatible**, but please note that the old `freeze-to-bytes`/`thaw-from-bytes` API has been **deprecated** in favor of `freeze`/`thaw`. **PLEASE REPORT ANY PROBLEMS!**

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 22 KiB

View file

@ -1,4 +1,4 @@
(defproject com.taoensso/nippy "2.0.0-alpha7"
(defproject com.taoensso/nippy "2.0.0-alpha8"
:description "Clojure serialization library"
:url "https://github.com/ptaoussanis/nippy"
:license {:name "Eclipse Public License"
@ -20,4 +20,6 @@
[lein-autoexpect "0.2.5"]
[codox "0.6.4"]]
:min-lein-version "2.0.0"
:warn-on-reflection true)
:warn-on-reflection true
:source-paths ["src/clj"]
:java-source-paths ["src/java"])

View file

@ -6,8 +6,8 @@
(utils :as utils)
(compression :as compression :refer (snappy-compressor))
(encryption :as encryption :refer (aes128-encryptor))])
(:import [java.io DataInputStream DataOutputStream ByteArrayOutputStream
ByteArrayInputStream]
(:import [java.io DataInputStream DataOutputStream]
[com.taoensso FastByteArrayOutputStream FastByteArrayInputStream]
[clojure.lang Keyword BigInt Ratio PersistentQueue PersistentTreeMap
PersistentTreeSet IPersistentList IPersistentVector IPersistentMap
IPersistentSet IPersistentCollection]))
@ -171,7 +171,7 @@
compressor snappy-compressor
encryptor aes128-encryptor}}]]
(when legacy-mode (assert-legacy-args compressor password))
(let [ba (ByteArrayOutputStream.)
(let [ba (FastByteArrayOutputStream.)
stream (DataOutputStream. ba)]
(binding [*print-dup* print-dup?] (freeze-to-stream x stream))
(let [ba (.toByteArray ba)
@ -282,7 +282,7 @@
(let [ba data-ba
ba (if password (encryption/decrypt encryptor password ba) ba)
ba (if compressor (compression/decompress compressor ba) ba)
stream (DataInputStream. (ByteArrayInputStream. ba))]
stream (DataInputStream. (FastByteArrayInputStream. ba))]
(binding [*read-eval* read-eval?] (thaw-from-stream stream)))
(catch Exception e
(cond

View file

@ -6,7 +6,7 @@
;; Remove stuff from stress-data that breaks reader
(def data (dissoc nippy/stress-data :queue :queue-empty :bytes))
(defmacro bench [& body] `(utils/bench 10000 (do ~@body) :warmup-laps 2000))
(defmacro bench* [& body] `(utils/bench 10000 (do ~@body) :warmup-laps 2000))
(defn freeze-reader [x] (binding [*print-dup* false] (pr-str x)))
(defn thaw-reader [x] (binding [*read-eval* false] (read-string x)))
@ -17,52 +17,59 @@
#(freeze % {:password [:cached "p"]})))
(def roundtrip-fast (comp thaw #(freeze % {:compressor nil})))
(defn autobench []
(println "Benchmarking roundtrips")
(println "-----------------------")
(let [results {:defaults (bench (roundtrip-defaults data))
:encrypted (bench (roundtrip-encrypted data))
:fast (bench (roundtrip-fast data))}]
(println results)
results))
(defn bench [{:keys [reader? laps] :or {reader? true laps 1}}]
(println)
(println "Benching (this can take some time)")
(println "----------------------------------")
(dotimes [l laps]
(println)
(println (str "Lap " (inc l) "/" laps "..."))
(comment
(do ; Roundtrip times
(println "Benching (this can take some time)...")
(println "-------------------------------------")
(println
{:reader
{:round (bench (roundtrip-reader data))
:freeze (bench (freeze-reader data))
:thaw (let [frozen (freeze-reader data)] (bench (thaw-reader frozen)))
:data-size (count (.getBytes ^String (freeze-reader data) "UTF-8"))}})
(when reader?
(println
{:reader
{:round (bench* (roundtrip-reader data))
:freeze (bench* (freeze-reader data))
:thaw (let [frozen (freeze-reader data)] (bench* (thaw-reader frozen)))
:data-size (count (.getBytes ^String (freeze-reader data) "UTF-8"))}}))
(println
{:defaults
{:round (bench (roundtrip-defaults data))
:freeze (bench (freeze data))
:thaw (let [frozen (freeze data)] (bench (thaw frozen)))
{:round (bench* (roundtrip-defaults data))
:freeze (bench* (freeze data))
:thaw (let [frozen (freeze data)] (bench* (thaw frozen)))
:data-size (count (freeze data))}})
(println
{:encrypted
{:round (bench (roundtrip-encrypted data))
:freeze (bench (freeze data {:password [:cached "p"]}))
{:round (bench* (roundtrip-encrypted data))
:freeze (bench* (freeze data {:password [:cached "p"]}))
:thaw (let [frozen (freeze data {:password [:cached "p"]})]
(bench (thaw frozen {:password [:cached "p"]})))
(bench* (thaw frozen {:password [:cached "p"]})))
:data-size (count (freeze data {:password [:cached "p"]}))}})
(println
{:fast
{:round (bench (roundtrip-fast data))
:freeze (bench (freeze data {:compressor nil}))
{:round (bench* (roundtrip-fast data))
:freeze (bench* (freeze data {:compressor nil}))
:thaw (let [frozen (freeze data {:compressor nil})]
(bench (thaw frozen)))
:data-size (count (freeze data {:compressor nil}))}})
(bench* (thaw frozen)))
:data-size (count (freeze data {:compressor nil}))}}))
(println "Done! (Time for cake?)"))
(println)
(println "Done! (Time for cake?)")
true)
(comment
;; (bench {:reader? true :laps 2})
;; (bench {:reader? false :laps 1})
;; (bench {:reader? false :laps 2})
;;; 17 June 2013: Clojure 1.5.1, Nippy 2.0.0-alpha6 w/fast io-streams
;; {:reader {:round 49819, :freeze 23601, :thaw 26247, :data-size 22966}}
;; {:defaults {:round 5670, :freeze 3536, :thaw 1919, :data-size 12396}}
;; {:encrypted {:round 9038, :freeze 5111, :thaw 3582, :data-size 12420}}
;; {:fast {:round 5182, :freeze 3177, :thaw 1820, :data-size 13342}}
;;; 16 June 2013: Clojure 1.5.1, Nippy 2.0.0-alpha6
;; {:reader {:freeze 23601, :thaw 26247, :round 49819, :data-size 22966}}
@ -96,7 +103,7 @@
;; {:reader {:freeze 28505, :thaw 36451, :round 59545},
;; :nippy {:freeze 3751, :thaw 4184, :round 7769}}
(println (bench (roundtrip data))) ; Snappy implementations
(println (bench* (roundtrip data))) ; Snappy implementations
;; {:no-snappy [6163 6064 6042 6176] :JNI [6489 6446 6542 6412]
;; :native-array-copy [6569 6419 6414 6590]}
)

View file

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taoensso;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
/*
* This file has been modified from Apache Harmony's ByteArrayInputStream
* implementation. The synchronized methods of the original have been
* replaced by non-synchronized methods. This makes this certain operations
* FASTer, but also *not thread-safe*.
*
* This file remains formatted the same as the Apache Harmony original to
* make patching easier if any bug fixes are made to the Harmony version.
*/
/**
* A specialized {@link InputStream } for reading the contents of a byte array.
*
* @see ByteArrayInputStream
*/
public class FastByteArrayInputStream extends InputStream {
/**
* The {@code byte} array containing the bytes to stream over.
*/
protected byte[] buf;
/**
* The current position within the byte array.
*/
protected int pos;
/**
* The current mark position. Initially set to 0 or the <code>offset</code>
* parameter within the constructor.
*/
protected int mark;
/**
* The total number of bytes initially available in the byte array
* {@code buf}.
*/
protected int count;
/**
* Constructs a new {@code ByteArrayInputStream} on the byte array
* {@code buf}.
*
* @param buf
* the byte array to stream over.
*/
public FastByteArrayInputStream(byte buf[]) {
this.mark = 0;
this.buf = buf;
this.count = buf.length;
}
/**
* Constructs a new {@code ByteArrayInputStream} on the byte array
* {@code buf} with the initial position set to {@code offset} and the
* number of bytes available set to {@code offset} + {@code length}.
*
* @param buf
* the byte array to stream over.
* @param offset
* the initial position in {@code buf} to start streaming from.
* @param length
* the number of bytes available for streaming.
*/
public FastByteArrayInputStream(byte buf[], int offset, int length) {
this.buf = buf;
pos = offset;
mark = offset;
count = offset + length > buf.length ? buf.length : offset + length;
}
/**
* Returns the number of bytes that are available before this stream will
* block. This method returns the number of bytes yet to be read from the
* source byte array.
*
* @return the number of bytes available before blocking.
*/
@Override
public int available() {
return count - pos;
}
/**
* Closes this stream and frees resources associated with this stream.
*
* @throws IOException
* if an I/O error occurs while closing this stream.
*/
@Override
public void close() throws IOException {
// Do nothing on close, this matches JDK behaviour.
}
/**
* Sets a mark position in this ByteArrayInputStream. The parameter
* {@code readlimit} is ignored. Sending {@code reset()} will reposition the
* stream back to the marked position.
*
* @param readlimit
* ignored.
* @see #markSupported()
* @see #reset()
*/
@Override
public void mark(int readlimit) {
mark = pos;
}
/**
* Indicates whether this stream supports the {@code mark()} and
* {@code reset()} methods. Returns {@code true} since this class supports
* these methods.
*
* @return always {@code true}.
* @see #mark(int)
* @see #reset()
*/
@Override
public boolean markSupported() {
return true;
}
/**
* Reads a single byte from the source byte array and returns it as an
* integer in the range from 0 to 255. Returns -1 if the end of the source
* array has been reached.
*
* @return the byte read or -1 if the end of this stream has been reached.
*/
@Override
public int read() {
return pos < count ? buf[pos++] & 0xFF : -1;
}
/**
* Reads at most {@code len} bytes from this stream and stores
* them in byte array {@code b} starting at {@code offset}. This
* implementation reads bytes from the source byte array.
*
* @param b
* the byte array in which to store the bytes read.
* @param offset
* the initial position in {@code b} to store the bytes read from
* this stream.
* @param length
* the maximum number of bytes to store in {@code b}.
* @return the number of bytes actually read or -1 if no bytes were read and
* the end of the stream was encountered.
* @throws IndexOutOfBoundsException
* if {@code offset < 0} or {@code length < 0}, or if
* {@code offset + length} is greater than the size of
* {@code b}.
* @throws NullPointerException
* if {@code b} is {@code null}.
*/
@Override
public int read(byte b[], int offset, int length) {
if (b == null) {
throw new NullPointerException();
}
// avoid int overflow
if (offset < 0 || offset > b.length || length < 0
|| length > b.length - offset) {
throw new IndexOutOfBoundsException();
}
// Are there any bytes available?
if (this.pos >= this.count) {
return -1;
}
if (length == 0) {
return 0;
}
int copylen = this.count - pos < length ? this.count - pos : length;
System.arraycopy(buf, pos, b, offset, copylen);
pos += copylen;
return copylen;
}
/**
* Resets this stream to the last marked location. This implementation
* resets the position to either the marked position, the start position
* supplied in the constructor or 0 if neither has been provided.
*
* @see #mark(int)
*/
@Override
public void reset() {
pos = mark;
}
/**
* Skips {@code count} number of bytes in this InputStream. Subsequent
* {@code read()}s will not return these bytes unless {@code reset()} is
* used. This implementation skips {@code count} number of bytes in the
* target stream. It does nothing and returns 0 if {@code n} is negative.
*
* @param n
* the number of bytes to skip.
* @return the number of bytes actually skipped.
*/
@Override
public long skip(long n) {
if (n <= 0) {
return 0;
}
int temp = pos;
pos = this.count - pos < n ? this.count : (int) (pos + n);
return pos - temp;
}
}

View file

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.taoensso;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
/*
* This file has been modified from Apache Harmony's ByteArrayOutputStream
* implementation. The synchronized methods of the original have been
* replaced by non-synchronized methods. This makes certain operations
* much FASTer, but also *not thread-safe*.
*
* This file remains formatted the same as the Apache Harmony original to
* make patching easier if any bug fixes are made to the Harmony version.
*/
/**
* A specialized {@link OutputStream} for class for writing content to an
* (internal) byte array. As bytes are written to this stream, the byte array
* may be expanded to hold more bytes. When the writing is considered to be
* finished, a copy of the byte array can be requested from the class.
*
* @see ByteArrayOutputStream
*/
public class FastByteArrayOutputStream extends OutputStream {
/**
* The byte array containing the bytes written.
*/
protected byte[] buf;
/**
* The number of bytes written.
*/
protected int count;
/**
* Constructs a new ByteArrayOutputStream with a default size of 32 bytes.
* If more than 32 bytes are written to this instance, the underlying byte
* array will expand.
*/
public FastByteArrayOutputStream() {
buf = new byte[32];
}
/**
* Constructs a new {@code ByteArrayOutputStream} with a default size of
* {@code size} bytes. If more than {@code size} bytes are written to this
* instance, the underlying byte array will expand.
*
* @param size
* initial size for the underlying byte array, must be
* non-negative.
* @throws IllegalArgumentException
* if {@code size} < 0.
*/
public FastByteArrayOutputStream(int size) {
if (size >= 0) {
buf = new byte[size];
} else {
throw new IllegalArgumentException();
}
}
/**
* Closes this stream. This releases system resources used for this stream.
*
* @throws IOException
* if an error occurs while attempting to close this stream.
*/
@Override
public void close() throws IOException {
/**
* Although the spec claims "A closed stream cannot perform output
* operations and cannot be reopened.", this implementation must do
* nothing.
*/
super.close();
}
private void expand(int i) {
/* Can the buffer handle @i more bytes, if not expand it */
if (count + i <= buf.length) {
return;
}
byte[] newbuf = new byte[(count + i) * 2];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
/**
* Resets this stream to the beginning of the underlying byte array. All
* subsequent writes will overwrite any bytes previously stored in this
* stream.
*/
public void reset() {
count = 0;
}
/**
* Returns the total number of bytes written to this stream so far.
*
* @return the number of bytes written to this stream.
*/
public int size() {
return count;
}
/**
* Returns the contents of this ByteArrayOutputStream as a byte array. Any
* changes made to the receiver after returning will not be reflected in the
* byte array returned to the caller.
*
* @return this stream's current contents as a byte array.
*/
public byte[] toByteArray() {
byte[] newArray = new byte[count];
System.arraycopy(buf, 0, newArray, 0, count);
return newArray;
}
/**
* Returns the contents of this ByteArrayOutputStream as a string. Any
* changes made to the receiver after returning will not be reflected in the
* string returned to the caller.
*
* @return this stream's current contents as a string.
*/
@Override
public String toString() {
return new String(buf, 0, count);
}
/**
* Returns the contents of this ByteArrayOutputStream as a string. Each byte
* {@code b} in this stream is converted to a character {@code c} using the
* following function:
* {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is
* deprecated and either {@link #toString()} or {@link #toString(String)}
* should be used.
*
* @param hibyte
* the high byte of each resulting Unicode character.
* @return this stream's current contents as a string with the high byte set
* to {@code hibyte}.
* @deprecated Use {@link #toString()}.
*/
@Deprecated
public String toString(int hibyte) {
char[] newBuf = new char[size()];
for (int i = 0; i < newBuf.length; i++) {
newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff));
}
return new String(newBuf);
}
/**
* Returns the contents of this ByteArrayOutputStream as a string converted
* according to the encoding declared in {@code enc}.
*
* @param enc
* a string representing the encoding to use when translating
* this stream to a string.
* @return this stream's current contents as an encoded string.
* @throws UnsupportedEncodingException
* if the provided encoding is not supported.
*/
public String toString(String enc) throws UnsupportedEncodingException {
return new String(buf, 0, count, enc);
}
/**
* Writes {@code count} bytes from the byte array {@code buffer} starting at
* offset {@code index} to this stream.
*
* @param buffer
* the buffer to be written.
* @param offset
* the initial position in {@code buffer} to retrieve bytes.
* @param len
* the number of bytes of {@code buffer} to write.
* @throws NullPointerException
* if {@code buffer} is {@code null}.
* @throws IndexOutOfBoundsException
* if {@code offset < 0} or {@code len < 0}, or if
* {@code offset + len} is greater than the length of
* {@code buffer}.
*/
@Override
public void write(byte[] buffer, int offset, int len) {
// avoid int overflow
if (offset < 0 || offset > buffer.length || len < 0
|| len > buffer.length - offset) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return;
}
/* Expand if necessary */
expand(len);
System.arraycopy(buffer, offset, buf, this.count, len);
this.count += len;
}
/**
* Writes the specified byte {@code oneByte} to the OutputStream. Only the
* low order byte of {@code oneByte} is written.
*
* @param oneByte
* the byte to be written.
*/
@Override
public void write(int oneByte) {
if (count == buf.length) {
expand(1);
}
buf[count++] = (byte) oneByte;
}
/**
* Takes the contents of this stream and writes it to the output stream
* {@code out}.
*
* @param out
* an OutputStream on which to write the contents of this stream.
* @throws IOException
* if an error occurs while writing to {@code out}.
*/
public void writeTo(OutputStream out) throws IOException {
out.write(buf, 0, count);
}
}

View file

@ -27,4 +27,4 @@
(thaw (org.iq80.snappy.Snappy/uncompress iq80-ba 0 (alength iq80-ba)))
(thaw (org.iq80.snappy.Snappy/uncompress xerial-ba 0 (alength xerial-ba))))))
(expect (benchmarks/autobench)) ; Also tests :cached passwords
(expect (benchmarks/bench {:reader? false})) ; Also tests :cached passwords