From 67633d5fd650197a719d6ad6abdbad7dd014c67d Mon Sep 17 00:00:00 2001 From: Max Penet Date: Sun, 16 Jun 2013 22:27:22 +0200 Subject: [PATCH] try non sync version of byte-array iostreams --- .../taoensso/FastByteArrayInputStream.java | 234 ++++++++++++++++ .../taoensso/FastByteArrayOutputStream.java | 252 ++++++++++++++++++ project.clj | 4 +- src/taoensso/nippy.clj | 10 +- 4 files changed, 494 insertions(+), 6 deletions(-) create mode 100644 java-src/com/taoensso/FastByteArrayInputStream.java create mode 100644 java-src/com/taoensso/FastByteArrayOutputStream.java diff --git a/java-src/com/taoensso/FastByteArrayInputStream.java b/java-src/com/taoensso/FastByteArrayInputStream.java new file mode 100644 index 0000000..40a6d3d --- /dev/null +++ b/java-src/com/taoensso/FastByteArrayInputStream.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taoensso; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/* + * This file has been modified from Apache Harmony's ByteArrayInputStream + * implementation. The synchronized methods of the original have been + * replaced by non-synchronized methods. This makes this certain operations + * FASTer, but also *not thread-safe*. + * + * This file remains formatted the same as the Apache Harmony original to + * make patching easier if any bug fixes are made to the Harmony version. + */ + +/** + * A specialized {@link InputStream } for reading the contents of a byte array. + * + * @see ByteArrayInputStream + */ +public class FastByteArrayInputStream extends InputStream { + /** + * The {@code byte} array containing the bytes to stream over. + */ + protected byte[] buf; + + /** + * The current position within the byte array. + */ + protected int pos; + + /** + * The current mark position. Initially set to 0 or the offset + * parameter within the constructor. + */ + protected int mark; + + /** + * The total number of bytes initially available in the byte array + * {@code buf}. + */ + protected int count; + + /** + * Constructs a new {@code ByteArrayInputStream} on the byte array + * {@code buf}. + * + * @param buf + * the byte array to stream over. + */ + public FastByteArrayInputStream(byte buf[]) { + this.mark = 0; + this.buf = buf; + this.count = buf.length; + } + + /** + * Constructs a new {@code ByteArrayInputStream} on the byte array + * {@code buf} with the initial position set to {@code offset} and the + * number of bytes available set to {@code offset} + {@code length}. + * + * @param buf + * the byte array to stream over. + * @param offset + * the initial position in {@code buf} to start streaming from. + * @param length + * the number of bytes available for streaming. + */ + public FastByteArrayInputStream(byte buf[], int offset, int length) { + this.buf = buf; + pos = offset; + mark = offset; + count = offset + length > buf.length ? buf.length : offset + length; + } + + /** + * Returns the number of bytes that are available before this stream will + * block. This method returns the number of bytes yet to be read from the + * source byte array. + * + * @return the number of bytes available before blocking. + */ + @Override + public int available() { + return count - pos; + } + + /** + * Closes this stream and frees resources associated with this stream. + * + * @throws IOException + * if an I/O error occurs while closing this stream. + */ + @Override + public void close() throws IOException { + // Do nothing on close, this matches JDK behaviour. + } + + /** + * Sets a mark position in this ByteArrayInputStream. The parameter + * {@code readlimit} is ignored. Sending {@code reset()} will reposition the + * stream back to the marked position. + * + * @param readlimit + * ignored. + * @see #markSupported() + * @see #reset() + */ + @Override + public void mark(int readlimit) { + mark = pos; + } + + /** + * Indicates whether this stream supports the {@code mark()} and + * {@code reset()} methods. Returns {@code true} since this class supports + * these methods. + * + * @return always {@code true}. + * @see #mark(int) + * @see #reset() + */ + @Override + public boolean markSupported() { + return true; + } + + /** + * Reads a single byte from the source byte array and returns it as an + * integer in the range from 0 to 255. Returns -1 if the end of the source + * array has been reached. + * + * @return the byte read or -1 if the end of this stream has been reached. + */ + @Override + public int read() { + return pos < count ? buf[pos++] & 0xFF : -1; + } + + /** + * Reads at most {@code len} bytes from this stream and stores + * them in byte array {@code b} starting at {@code offset}. This + * implementation reads bytes from the source byte array. + * + * @param b + * the byte array in which to store the bytes read. + * @param offset + * the initial position in {@code b} to store the bytes read from + * this stream. + * @param length + * the maximum number of bytes to store in {@code b}. + * @return the number of bytes actually read or -1 if no bytes were read and + * the end of the stream was encountered. + * @throws IndexOutOfBoundsException + * if {@code offset < 0} or {@code length < 0}, or if + * {@code offset + length} is greater than the size of + * {@code b}. + * @throws NullPointerException + * if {@code b} is {@code null}. + */ + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + // avoid int overflow + if (offset < 0 || offset > b.length || length < 0 + || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + // Are there any bytes available? + if (this.pos >= this.count) { + return -1; + } + if (length == 0) { + return 0; + } + + int copylen = this.count - pos < length ? this.count - pos : length; + System.arraycopy(buf, pos, b, offset, copylen); + pos += copylen; + return copylen; + } + + /** + * Resets this stream to the last marked location. This implementation + * resets the position to either the marked position, the start position + * supplied in the constructor or 0 if neither has been provided. + * + * @see #mark(int) + */ + @Override + public void reset() { + pos = mark; + } + + /** + * Skips {@code count} number of bytes in this InputStream. Subsequent + * {@code read()}s will not return these bytes unless {@code reset()} is + * used. This implementation skips {@code count} number of bytes in the + * target stream. It does nothing and returns 0 if {@code n} is negative. + * + * @param n + * the number of bytes to skip. + * @return the number of bytes actually skipped. + */ + @Override + public long skip(long n) { + if (n <= 0) { + return 0; + } + int temp = pos; + pos = this.count - pos < n ? this.count : (int) (pos + n); + return pos - temp; + } +} diff --git a/java-src/com/taoensso/FastByteArrayOutputStream.java b/java-src/com/taoensso/FastByteArrayOutputStream.java new file mode 100644 index 0000000..f47c17f --- /dev/null +++ b/java-src/com/taoensso/FastByteArrayOutputStream.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.taoensso; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; + +/* + * This file has been modified from Apache Harmony's ByteArrayOutputStream + * implementation. The synchronized methods of the original have been + * replaced by non-synchronized methods. This makes certain operations + * much FASTer, but also *not thread-safe*. + * + * This file remains formatted the same as the Apache Harmony original to + * make patching easier if any bug fixes are made to the Harmony version. + */ + +/** + * A specialized {@link OutputStream} for class for writing content to an + * (internal) byte array. As bytes are written to this stream, the byte array + * may be expanded to hold more bytes. When the writing is considered to be + * finished, a copy of the byte array can be requested from the class. + * + * @see ByteArrayOutputStream + */ +public class FastByteArrayOutputStream extends OutputStream { + /** + * The byte array containing the bytes written. + */ + protected byte[] buf; + + /** + * The number of bytes written. + */ + protected int count; + + /** + * Constructs a new ByteArrayOutputStream with a default size of 32 bytes. + * If more than 32 bytes are written to this instance, the underlying byte + * array will expand. + */ + public FastByteArrayOutputStream() { + buf = new byte[32]; + } + + /** + * Constructs a new {@code ByteArrayOutputStream} with a default size of + * {@code size} bytes. If more than {@code size} bytes are written to this + * instance, the underlying byte array will expand. + * + * @param size + * initial size for the underlying byte array, must be + * non-negative. + * @throws IllegalArgumentException + * if {@code size} < 0. + */ + public FastByteArrayOutputStream(int size) { + if (size >= 0) { + buf = new byte[size]; + } else { + throw new IllegalArgumentException(); + } + } + + /** + * Closes this stream. This releases system resources used for this stream. + * + * @throws IOException + * if an error occurs while attempting to close this stream. + */ + @Override + public void close() throws IOException { + /** + * Although the spec claims "A closed stream cannot perform output + * operations and cannot be reopened.", this implementation must do + * nothing. + */ + super.close(); + } + + private void expand(int i) { + /* Can the buffer handle @i more bytes, if not expand it */ + if (count + i <= buf.length) { + return; + } + + byte[] newbuf = new byte[(count + i) * 2]; + System.arraycopy(buf, 0, newbuf, 0, count); + buf = newbuf; + } + + /** + * Resets this stream to the beginning of the underlying byte array. All + * subsequent writes will overwrite any bytes previously stored in this + * stream. + */ + public void reset() { + count = 0; + } + + /** + * Returns the total number of bytes written to this stream so far. + * + * @return the number of bytes written to this stream. + */ + public int size() { + return count; + } + + /** + * Returns the contents of this ByteArrayOutputStream as a byte array. Any + * changes made to the receiver after returning will not be reflected in the + * byte array returned to the caller. + * + * @return this stream's current contents as a byte array. + */ + public byte[] toByteArray() { + byte[] newArray = new byte[count]; + System.arraycopy(buf, 0, newArray, 0, count); + return newArray; + } + + /** + * Returns the contents of this ByteArrayOutputStream as a string. Any + * changes made to the receiver after returning will not be reflected in the + * string returned to the caller. + * + * @return this stream's current contents as a string. + */ + + @Override + public String toString() { + return new String(buf, 0, count); + } + + /** + * Returns the contents of this ByteArrayOutputStream as a string. Each byte + * {@code b} in this stream is converted to a character {@code c} using the + * following function: + * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is + * deprecated and either {@link #toString()} or {@link #toString(String)} + * should be used. + * + * @param hibyte + * the high byte of each resulting Unicode character. + * @return this stream's current contents as a string with the high byte set + * to {@code hibyte}. + * @deprecated Use {@link #toString()}. + */ + @Deprecated + public String toString(int hibyte) { + char[] newBuf = new char[size()]; + for (int i = 0; i < newBuf.length; i++) { + newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff)); + } + return new String(newBuf); + } + + /** + * Returns the contents of this ByteArrayOutputStream as a string converted + * according to the encoding declared in {@code enc}. + * + * @param enc + * a string representing the encoding to use when translating + * this stream to a string. + * @return this stream's current contents as an encoded string. + * @throws UnsupportedEncodingException + * if the provided encoding is not supported. + */ + public String toString(String enc) throws UnsupportedEncodingException { + return new String(buf, 0, count, enc); + } + + /** + * Writes {@code count} bytes from the byte array {@code buffer} starting at + * offset {@code index} to this stream. + * + * @param buffer + * the buffer to be written. + * @param offset + * the initial position in {@code buffer} to retrieve bytes. + * @param len + * the number of bytes of {@code buffer} to write. + * @throws NullPointerException + * if {@code buffer} is {@code null}. + * @throws IndexOutOfBoundsException + * if {@code offset < 0} or {@code len < 0}, or if + * {@code offset + len} is greater than the length of + * {@code buffer}. + */ + @Override + public void write(byte[] buffer, int offset, int len) { + // avoid int overflow + if (offset < 0 || offset > buffer.length || len < 0 + || len > buffer.length - offset) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return; + } + + /* Expand if necessary */ + expand(len); + System.arraycopy(buffer, offset, buf, this.count, len); + this.count += len; + } + + /** + * Writes the specified byte {@code oneByte} to the OutputStream. Only the + * low order byte of {@code oneByte} is written. + * + * @param oneByte + * the byte to be written. + */ + @Override + public void write(int oneByte) { + if (count == buf.length) { + expand(1); + } + buf[count++] = (byte) oneByte; + } + + /** + * Takes the contents of this stream and writes it to the output stream + * {@code out}. + * + * @param out + * an OutputStream on which to write the contents of this stream. + * @throws IOException + * if an error occurs while writing to {@code out}. + */ + public void writeTo(OutputStream out) throws IOException { + out.write(buf, 0, count); + } +} diff --git a/project.clj b/project.clj index 8299258..9a48647 100644 --- a/project.clj +++ b/project.clj @@ -20,4 +20,6 @@ [lein-autoexpect "0.2.5"] [codox "0.6.4"]] :min-lein-version "2.0.0" - :warn-on-reflection true) + :warn-on-reflection true + :java-source-paths ["java-src"] + :javac-options ["-target" "1.6" "-source" "1.6"]) diff --git a/src/taoensso/nippy.clj b/src/taoensso/nippy.clj index f802ce4..15d8620 100644 --- a/src/taoensso/nippy.clj +++ b/src/taoensso/nippy.clj @@ -6,8 +6,8 @@ (utils :as utils) (compression :as compression :refer (snappy-compressor)) (encryption :as encryption :refer (aes128-encryptor))]) - (:import [java.io DataInputStream DataOutputStream ByteArrayOutputStream - ByteArrayInputStream] + (:import [java.io DataInputStream DataOutputStream] + [com.taoensso FastByteArrayOutputStream FastByteArrayInputStream] [clojure.lang Keyword BigInt Ratio PersistentQueue PersistentTreeMap PersistentTreeSet IPersistentList IPersistentVector IPersistentMap IPersistentSet IPersistentCollection])) @@ -171,7 +171,7 @@ compressor snappy-compressor encryptor aes128-encryptor}}]] (when legacy-mode (assert-legacy-args compressor password)) - (let [ba (ByteArrayOutputStream.) + (let [ba (FastByteArrayOutputStream.) stream (DataOutputStream. ba)] (binding [*print-dup* print-dup?] (freeze-to-stream x stream)) (let [ba (.toByteArray ba) @@ -282,7 +282,7 @@ (let [ba data-ba ba (if password (encryption/decrypt encryptor password ba) ba) ba (if compressor (compression/decompress compressor ba) ba) - stream (DataInputStream. (ByteArrayInputStream. ba))] + stream (DataInputStream. (FastByteArrayInputStream. ba))] (binding [*read-eval* read-eval?] (thaw-from-stream stream))) (catch Exception e (cond @@ -383,4 +383,4 @@ :or {compressed? true}}] (thaw ba {:legacy-opts {:compressed? compressed?} :read-eval? read-eval? - :password nil})) \ No newline at end of file + :password nil}))