java: updates VectoredByteBuffer

This commit is contained in:
FURUHASHI Sadayuki 2010-11-29 18:37:47 +09:00
parent 0014b7fdb9
commit dfdceb4258

View File

@ -1,5 +1,5 @@
// //
// MessagePack-RPC for Java // MessagePack for Java
// //
// Copyright (C) 2010 FURUHASHI Sadayuki // Copyright (C) 2010 FURUHASHI Sadayuki
// //
@ -17,60 +17,96 @@
// //
package org.msgpack.buffer; package org.msgpack.buffer;
import java.io.*; import java.io.InputStream;
import java.util.*; import java.io.OutputStream;
import java.nio.*; import java.io.IOException;
import java.nio.channels.*; import java.util.List;
import org.msgpack.*; import java.util.ArrayList;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteChannel { public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteChannel {
private List<ByteBuffer> vec = new ArrayList<ByteBuffer>(); private List<ByteBuffer> vec = new ArrayList<ByteBuffer>();
private ByteBuffer internalBuffer; private ByteBuffer internalBuffer;
private ByteBuffer lastInternalBuffer; private ByteBuffer lastInternalBuffer;
private int bufferAllocSize; private int chunkSize;
private int referenceThreshold = 32; private int referenceThreshold;
public VectoredByteBuffer() { public VectoredByteBuffer() {
this(32*1024); this(32*1024);
} }
public VectoredByteBuffer(int size) { public VectoredByteBuffer(int chunkSize) {
this.bufferAllocSize = size; this(chunkSize, 32);
internalBuffer = ByteBuffer.allocateDirect(size); }
public VectoredByteBuffer(int chunkSize, int referenceThreshold) {
this.chunkSize = chunkSize;
this.referenceThreshold = referenceThreshold;
internalBuffer = ByteBuffer.allocateDirect(chunkSize);
} }
public synchronized void close() { public void setChunkSize(int chunkSize) {
vec.clear(); this.chunkSize = chunkSize;
lastInternalBuffer = null;
} }
public int getChunkSize(int chunkSize) {
return this.chunkSize;
}
public void setReferenceThreshold(int referenceThreshold) {
this.referenceThreshold = referenceThreshold;
}
public int getReferenceThreshold(int referenceThreshold) {
return this.referenceThreshold;
}
@Override
public void close() {
reset();
}
@Override
public boolean isOpen() { public boolean isOpen() {
return true; // FIXME? return true; // FIXME?
} }
public void write(int b) throws IOException { public synchronized void reset() {
byte[] ba = new byte[1]; vec.clear();
ba[0] = (byte)b; lastInternalBuffer = null;
write(ba);
} }
public void write(byte[] b) throws IOException {
public void write(byte[] b) {
write(b, 0, b.length); write(b, 0, b.length);
} }
public void write(byte[] b, int off, int len) throws IOException { public void write(byte[] b, int off, int len) {
if(len > referenceThreshold) { if(off < 0 || len < 0 || b.length < off+len) {
throw new IndexOutOfBoundsException();
}
if(referenceThreshold >= 0 && len > referenceThreshold) {
writeReference(b, off, len); writeReference(b, off, len);
} else { } else {
writeCopy(b, off, len); writeCopy(b, off, len);
} }
} }
public int write(ByteBuffer src) throws IOException { public void write(int b) {
byte[] ba = new byte[1];
ba[0] = (byte)b;
write(ba);
}
@Override
public int write(ByteBuffer src) {
int slen = src.remaining(); int slen = src.remaining();
if(slen > referenceThreshold) { if(referenceThreshold >= 0 && slen > referenceThreshold) {
writeCopy(src); writeCopy(src);
} else { } else {
writeReference(src); writeReference(src);
@ -78,11 +114,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
return slen; return slen;
} }
public synchronized long write(ByteBuffer[] srcs) throws IOException { @Override
public synchronized long write(ByteBuffer[] srcs) {
return write(srcs, 0, srcs.length); return write(srcs, 0, srcs.length);
} }
public synchronized long write(ByteBuffer[] srcs, int offset, int length) throws IOException { @Override
public synchronized long write(ByteBuffer[] srcs, int offset, int length) {
if(offset < 0 || length < 0 || srcs.length < offset+length) {
throw new IndexOutOfBoundsException();
}
long total = 0; long total = 0;
for(int i=offset; offset < length; offset++) { for(int i=offset; offset < length; offset++) {
ByteBuffer src = srcs[i]; ByteBuffer src = srcs[i];
@ -91,14 +132,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
return total; return total;
} }
private synchronized void writeCopy(byte[] b, int off, int len) throws IOException { private synchronized void writeCopy(byte[] b, int off, int len) {
int ipos = internalBuffer.position(); int ipos = internalBuffer.position();
if(internalBuffer.capacity() - ipos < len) { if(internalBuffer.capacity() - ipos < len) {
// allocate new buffer // allocate new buffer
int nsize = bufferAllocSize > len ? bufferAllocSize : len; int nsize = chunkSize > len ? chunkSize : len;
internalBuffer = ByteBuffer.allocateDirect(nsize); internalBuffer = ByteBuffer.allocateDirect(nsize);
ipos = 0; ipos = 0;
} else if(internalBuffer == lastInternalBuffer) { } else if(internalBuffer == lastInternalBuffer) {
// optimization: concatenates to the last buffer instead
// of adding new reference
ByteBuffer dup = vec.get(vec.size()-1); ByteBuffer dup = vec.get(vec.size()-1);
int dpos = dup.position(); int dpos = dup.position();
internalBuffer.put(b, off, len); internalBuffer.put(b, off, len);
@ -117,15 +160,17 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
lastInternalBuffer = internalBuffer; lastInternalBuffer = internalBuffer;
} }
private synchronized void writeCopy(ByteBuffer src) throws IOException { private synchronized void writeCopy(ByteBuffer src) {
int slen = src.remaining(); int slen = src.remaining();
int ipos = internalBuffer.position(); int ipos = internalBuffer.position();
if(internalBuffer.capacity() - ipos < slen) { if(internalBuffer.capacity() - ipos < slen) {
// allocate new buffer // allocate new buffer
int nsize = bufferAllocSize > slen ? bufferAllocSize : slen; int nsize = chunkSize > slen ? chunkSize : slen;
internalBuffer = ByteBuffer.allocateDirect(nsize); internalBuffer = ByteBuffer.allocateDirect(nsize);
ipos = 0; ipos = 0;
} else if(internalBuffer == lastInternalBuffer) { } else if(internalBuffer == lastInternalBuffer) {
// optimization: concatenates to the last buffer instead
// of adding new reference
ByteBuffer dup = vec.get(vec.size()-1); ByteBuffer dup = vec.get(vec.size()-1);
int dpos = dup.position(); int dpos = dup.position();
internalBuffer.put(src); internalBuffer.put(src);
@ -144,18 +189,19 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
lastInternalBuffer = internalBuffer; lastInternalBuffer = internalBuffer;
} }
private synchronized void writeReference(byte[] b, int off, int len) throws IOException { private synchronized void writeReference(byte[] b, int off, int len) {
ByteBuffer buf = ByteBuffer.wrap(b, off, len); ByteBuffer buf = ByteBuffer.wrap(b, off, len);
vec.add(buf); vec.add(buf);
lastInternalBuffer = null; lastInternalBuffer = null;
} }
private synchronized void writeReference(ByteBuffer src) throws IOException { private synchronized void writeReference(ByteBuffer src) {
ByteBuffer buf = src.duplicate(); ByteBuffer buf = src.duplicate();
vec.add(buf); vec.add(buf);
lastInternalBuffer = null; lastInternalBuffer = null;
} }
public synchronized void writeTo(java.io.OutputStream out) throws IOException { public synchronized void writeTo(java.io.OutputStream out) throws IOException {
byte[] tmpbuf = null; byte[] tmpbuf = null;
for(int i=0; i < vec.size(); i++) { for(int i=0; i < vec.size(); i++) {
@ -206,25 +252,14 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
return total; return total;
} }
public boolean markSupported () {
return false;
}
public synchronized int read() {
byte[] ba = new byte[1];
if(read(ba) >= 1) {
return ba[0];
} else {
return -1;
}
}
public synchronized int read(byte[] b) { public synchronized int read(byte[] b) {
return read(b, 0, b.length); return read(b, 0, b.length);
} }
public synchronized int read(byte[] b, int off, int len) { public synchronized int read(byte[] b, int off, int len) {
// FIXME check arguments if(off < 0 || len < 0 || b.length < off+len) {
throw new IndexOutOfBoundsException();
}
int start = len; int start = len;
while(!vec.isEmpty()) { while(!vec.isEmpty()) {
ByteBuffer r = vec.get(0); ByteBuffer r = vec.get(0);
@ -242,6 +277,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
return start - len; return start - len;
} }
public synchronized int read() {
byte[] ba = new byte[1];
if(read(ba) >= 1) {
return ba[0];
} else {
return -1;
}
}
@Override
public synchronized int read(ByteBuffer dst) { public synchronized int read(ByteBuffer dst) {
int len = dst.remaining(); int len = dst.remaining();
int start = len; int start = len;
@ -266,11 +311,16 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
return start - len; return start - len;
} }
@Override
public synchronized long read(ByteBuffer[] dsts) { public synchronized long read(ByteBuffer[] dsts) {
return read(dsts, 0, dsts.length); return read(dsts, 0, dsts.length);
} }
@Override
public synchronized long read(ByteBuffer[] dsts, int offset, int length) { public synchronized long read(ByteBuffer[] dsts, int offset, int length) {
if(offset < 0 || length < 0 || dsts.length < offset+length) {
throw new IndexOutOfBoundsException();
}
long total = 0; long total = 0;
for(int i=offset; i < length; i++) { for(int i=offset; i < length; i++) {
ByteBuffer dst = dsts[i]; ByteBuffer dst = dsts[i];
@ -298,6 +348,9 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
} }
public synchronized long skip(long len) { public synchronized long skip(long len) {
if(len <= 0) {
return 0;
}
long start = len; long start = len;
while(!vec.isEmpty()) { while(!vec.isEmpty()) {
ByteBuffer r = vec.get(0); ByteBuffer r = vec.get(0);
@ -315,57 +368,98 @@ public class VectoredByteBuffer implements GatheringByteChannel, ScatteringByteC
} }
private final static class OutputStream extends java.io.OutputStream { public final static class OutputStream extends java.io.OutputStream {
private VectoredByteBuffer vbb; private VectoredByteBuffer vbb;
OutputStream(VectoredByteBuffer vbb) { OutputStream(VectoredByteBuffer vbb) {
this.vbb = vbb; this.vbb = vbb;
} }
public void write(byte[] b) throws IOException { @Override
public void write(byte[] b) {
vbb.write(b); vbb.write(b);
} }
public void write(byte[] b, int off, int len) throws IOException { @Override
public void write(byte[] b, int off, int len) {
vbb.write(b, off, len); vbb.write(b, off, len);
} }
public void write(int b) throws IOException { @Override
public void write(int b) {
vbb.write(b); vbb.write(b);
} }
public int write(ByteBuffer src) {
return vbb.write(src);
}
public long write(ByteBuffer[] srcs) {
return vbb.write(srcs);
}
public long write(ByteBuffer[] srcs, int offset, int length) {
return vbb.write(srcs, offset, length);
}
public void writeTo(OutputStream out) throws IOException {
vbb.writeTo(out);
}
public byte[] toByteArray() {
return vbb.toByteArray();
}
} }
public final static class InputStream extends java.io.InputStream {
private final static class InputStream extends java.io.InputStream {
private VectoredByteBuffer vbb; private VectoredByteBuffer vbb;
InputStream(VectoredByteBuffer vbb) { InputStream(VectoredByteBuffer vbb) {
this.vbb = vbb; this.vbb = vbb;
} }
public int available() throws IOException { @Override
public int available() {
return vbb.available(); return vbb.available();
} }
public int read(byte[] b) throws IOException { @Override
public int read(byte[] b) {
return vbb.read(b); return vbb.read(b);
} }
public int read(byte[] b, int off, int len) throws IOException { @Override
public int read(byte[] b, int off, int len) {
return vbb.read(b, off, len); return vbb.read(b, off, len);
} }
public int read() throws IOException { @Override
public int read() {
return vbb.read(); return vbb.read();
} }
public int read(ByteBuffer dst) {
return vbb.read(dst);
}
public long read(ByteBuffer[] dsts, int offset, int length) {
return vbb.read(dsts, offset, length);
}
public long read(GatheringByteChannel to) throws IOException {
return vbb.read(to);
}
public long skip(long len) {
return vbb.skip(len);
}
} }
public OutputStream outputStream() {
public java.io.OutputStream outputStream() {
return new OutputStream(this); return new OutputStream(this);
} }
public java.io.InputStream inputStream() { public InputStream inputStream() {
return new InputStream(this); return new InputStream(this);
} }
} }