diff --git a/java/src/main/java/org/msgpack/buffer/VectoredByteBuffer.java b/java/src/main/java/org/msgpack/buffer/VectoredByteBuffer.java index 62b765ed..bb182036 100644 --- a/java/src/main/java/org/msgpack/buffer/VectoredByteBuffer.java +++ b/java/src/main/java/org/msgpack/buffer/VectoredByteBuffer.java @@ -1,5 +1,5 @@ // -// MessagePack-RPC for Java +// MessagePack for Java // // Copyright (C) 2010 FURUHASHI Sadayuki // @@ -17,60 +17,96 @@ // package org.msgpack.buffer; -import java.io.*; -import java.util.*; -import java.nio.*; -import java.nio.channels.*; -import org.msgpack.*; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteChannel { private List vec = new ArrayList(); private ByteBuffer internalBuffer; private ByteBuffer lastInternalBuffer; - private int bufferAllocSize; - private int referenceThreshold = 32; + private int chunkSize; + private int referenceThreshold; public VectoredByteBuffer() { this(32*1024); } - public VectoredByteBuffer(int size) { - this.bufferAllocSize = size; - internalBuffer = ByteBuffer.allocateDirect(size); + public VectoredByteBuffer(int chunkSize) { + this(chunkSize, 32); + } + + public VectoredByteBuffer(int chunkSize, int referenceThreshold) { + this.chunkSize = chunkSize; + this.referenceThreshold = referenceThreshold; + internalBuffer = ByteBuffer.allocateDirect(chunkSize); } - public synchronized void close() { - vec.clear(); - lastInternalBuffer = null; + public void setChunkSize(int chunkSize) { + this.chunkSize = chunkSize; } + public int getChunkSize(int chunkSize) { + return this.chunkSize; + } + + public void setReferenceThreshold(int referenceThreshold) { + this.referenceThreshold = referenceThreshold; + } + + public int getReferenceThreshold(int referenceThreshold) { + return this.referenceThreshold; + } + + + @Override + public void close() { + reset(); + } + + @Override public boolean isOpen() { return true; // FIXME? } - public void write(int b) throws IOException { - byte[] ba = new byte[1]; - ba[0] = (byte)b; - write(ba); + public synchronized void reset() { + vec.clear(); + lastInternalBuffer = null; } - public void write(byte[] b) throws IOException { + + public void write(byte[] b) { write(b, 0, b.length); } - public void write(byte[] b, int off, int len) throws IOException { - if(len > referenceThreshold) { + public void write(byte[] b, int off, int len) { + if(off < 0 || len < 0 || b.length < off+len) { + throw new IndexOutOfBoundsException(); + } + if(referenceThreshold >= 0 && len > referenceThreshold) { writeReference(b, off, len); } else { writeCopy(b, off, len); } } - public int write(ByteBuffer src) throws IOException { + public void write(int b) { + byte[] ba = new byte[1]; + ba[0] = (byte)b; + write(ba); + } + + @Override + public int write(ByteBuffer src) { int slen = src.remaining(); - if(slen > referenceThreshold) { + if(referenceThreshold >= 0 && slen > referenceThreshold) { writeCopy(src); } else { writeReference(src); @@ -78,11 +114,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC return slen; } - public synchronized long write(ByteBuffer[] srcs) throws IOException { + @Override + public synchronized long write(ByteBuffer[] srcs) { return write(srcs, 0, srcs.length); } - public synchronized long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + @Override + public synchronized long write(ByteBuffer[] srcs, int offset, int length) { + if(offset < 0 || length < 0 || srcs.length < offset+length) { + throw new IndexOutOfBoundsException(); + } long total = 0; for(int i=offset; offset < length; offset++) { ByteBuffer src = srcs[i]; @@ -91,14 +132,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC return total; } - private synchronized void writeCopy(byte[] b, int off, int len) throws IOException { + private synchronized void writeCopy(byte[] b, int off, int len) { int ipos = internalBuffer.position(); if(internalBuffer.capacity() - ipos < len) { // allocate new buffer - int nsize = bufferAllocSize > len ? bufferAllocSize : len; + int nsize = chunkSize > len ? chunkSize : len; internalBuffer = ByteBuffer.allocateDirect(nsize); ipos = 0; } else if(internalBuffer == lastInternalBuffer) { + // optimization: concatenates to the last buffer instead + // of adding new reference ByteBuffer dup = vec.get(vec.size()-1); int dpos = dup.position(); internalBuffer.put(b, off, len); @@ -117,15 +160,17 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC lastInternalBuffer = internalBuffer; } - private synchronized void writeCopy(ByteBuffer src) throws IOException { + private synchronized void writeCopy(ByteBuffer src) { int slen = src.remaining(); int ipos = internalBuffer.position(); if(internalBuffer.capacity() - ipos < slen) { // allocate new buffer - int nsize = bufferAllocSize > slen ? bufferAllocSize : slen; + int nsize = chunkSize > slen ? chunkSize : slen; internalBuffer = ByteBuffer.allocateDirect(nsize); ipos = 0; } else if(internalBuffer == lastInternalBuffer) { + // optimization: concatenates to the last buffer instead + // of adding new reference ByteBuffer dup = vec.get(vec.size()-1); int dpos = dup.position(); internalBuffer.put(src); @@ -144,18 +189,19 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC lastInternalBuffer = internalBuffer; } - private synchronized void writeReference(byte[] b, int off, int len) throws IOException { + private synchronized void writeReference(byte[] b, int off, int len) { ByteBuffer buf = ByteBuffer.wrap(b, off, len); vec.add(buf); lastInternalBuffer = null; } - private synchronized void writeReference(ByteBuffer src) throws IOException { + private synchronized void writeReference(ByteBuffer src) { ByteBuffer buf = src.duplicate(); vec.add(buf); lastInternalBuffer = null; } + public synchronized void writeTo(java.io.OutputStream out) throws IOException { byte[] tmpbuf = null; for(int i=0; i < vec.size(); i++) { @@ -206,25 +252,14 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC return total; } - public boolean markSupported () { - return false; - } - - public synchronized int read() { - byte[] ba = new byte[1]; - if(read(ba) >= 1) { - return ba[0]; - } else { - return -1; - } - } - public synchronized int read(byte[] b) { return read(b, 0, b.length); } public synchronized int read(byte[] b, int off, int len) { - // FIXME check arguments + if(off < 0 || len < 0 || b.length < off+len) { + throw new IndexOutOfBoundsException(); + } int start = len; while(!vec.isEmpty()) { ByteBuffer r = vec.get(0); @@ -242,6 +277,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC return start - len; } + public synchronized int read() { + byte[] ba = new byte[1]; + if(read(ba) >= 1) { + return ba[0]; + } else { + return -1; + } + } + + @Override public synchronized int read(ByteBuffer dst) { int len = dst.remaining(); int start = len; @@ -266,11 +311,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC return start - len; } + @Override public synchronized long read(ByteBuffer[] dsts) { return read(dsts, 0, dsts.length); } + @Override public synchronized long read(ByteBuffer[] dsts, int offset, int length) { + if(offset < 0 || length < 0 || dsts.length < offset+length) { + throw new IndexOutOfBoundsException(); + } long total = 0; for(int i=offset; i < length; i++) { ByteBuffer dst = dsts[i]; @@ -298,6 +348,9 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC } public synchronized long skip(long len) { + if(len <= 0) { + return 0; + } long start = len; while(!vec.isEmpty()) { ByteBuffer r = vec.get(0); @@ -315,57 +368,98 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC } - private final static class OutputStream extends java.io.OutputStream { + public final static class OutputStream extends java.io.OutputStream { private VectoredByteBuffer vbb; OutputStream(VectoredByteBuffer vbb) { this.vbb = vbb; } - public void write(byte[] b) throws IOException { + @Override + public void write(byte[] b) { vbb.write(b); } - public void write(byte[] b, int off, int len) throws IOException { + @Override + public void write(byte[] b, int off, int len) { vbb.write(b, off, len); } - public void write(int b) throws IOException { + @Override + public void write(int b) { vbb.write(b); } + + public int write(ByteBuffer src) { + return vbb.write(src); + } + + public long write(ByteBuffer[] srcs) { + return vbb.write(srcs); + } + + public long write(ByteBuffer[] srcs, int offset, int length) { + return vbb.write(srcs, offset, length); + } + + public void writeTo(OutputStream out) throws IOException { + vbb.writeTo(out); + } + + public byte[] toByteArray() { + return vbb.toByteArray(); + } } - - private final static class InputStream extends java.io.InputStream { + public final static class InputStream extends java.io.InputStream { private VectoredByteBuffer vbb; InputStream(VectoredByteBuffer vbb) { this.vbb = vbb; } - public int available() throws IOException { + @Override + public int available() { return vbb.available(); } - public int read(byte[] b) throws IOException { + @Override + public int read(byte[] b) { return vbb.read(b); } - public int read(byte[] b, int off, int len) throws IOException { + @Override + public int read(byte[] b, int off, int len) { return vbb.read(b, off, len); } - public int read() throws IOException { + @Override + public int read() { return vbb.read(); } + + public int read(ByteBuffer dst) { + return vbb.read(dst); + } + + public long read(ByteBuffer[] dsts, int offset, int length) { + return vbb.read(dsts, offset, length); + } + + public long read(GatheringByteChannel to) throws IOException { + return vbb.read(to); + } + + public long skip(long len) { + return vbb.skip(len); + } } - - public java.io.OutputStream outputStream() { + public OutputStream outputStream() { return new OutputStream(this); } - public java.io.InputStream inputStream() { + public InputStream inputStream() { return new InputStream(this); } }