mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
fix several blocking SocketChannel bugs
In java-nio.cpp, we can't use GetPrimitiveArrayCritical when reading from or writing to blocking sockets since it may block the rest of the VM indefinitely. In SelectableChannel.java, we can't use a null test on SelectableChannel.key to determine whether the channel is open since it might never be registered with a Selector. According to the Sun documentation, a SelectableChannel is open as soon as it's created, so that's what we now implement.
This commit is contained in:
@ -428,13 +428,32 @@ Java_java_nio_channels_SocketChannel_natRead(JNIEnv *e,
|
|||||||
jint socket,
|
jint socket,
|
||||||
jbyteArray buffer,
|
jbyteArray buffer,
|
||||||
jint offset,
|
jint offset,
|
||||||
jint length)
|
jint length,
|
||||||
|
jboolean blocking)
|
||||||
{
|
{
|
||||||
|
int r;
|
||||||
|
if (blocking) {
|
||||||
|
uint8_t* buf = static_cast<uint8_t*>(allocate(e, length));
|
||||||
|
if (buf) {
|
||||||
|
r = ::doRead(socket, buf, length);
|
||||||
|
if (r > 0) {
|
||||||
|
e->SetByteArrayRegion
|
||||||
|
(buffer, offset, r, reinterpret_cast<jbyte*>(buf));
|
||||||
|
}
|
||||||
|
free(buf);
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
jboolean isCopy;
|
jboolean isCopy;
|
||||||
uint8_t *buf = static_cast<uint8_t*>
|
uint8_t* buf = static_cast<uint8_t*>
|
||||||
(e->GetPrimitiveArrayCritical(buffer, &isCopy));
|
(e->GetPrimitiveArrayCritical(buffer, &isCopy));
|
||||||
int r = ::doRead(socket, buf + offset, length);
|
|
||||||
|
r = ::doRead(socket, buf + offset, length);
|
||||||
|
|
||||||
e->ReleasePrimitiveArrayCritical(buffer, buf, 0);
|
e->ReleasePrimitiveArrayCritical(buffer, buf, 0);
|
||||||
|
}
|
||||||
|
|
||||||
if (r < 0) {
|
if (r < 0) {
|
||||||
if (eagain()) {
|
if (eagain()) {
|
||||||
return 0;
|
return 0;
|
||||||
@ -453,13 +472,30 @@ Java_java_nio_channels_SocketChannel_natWrite(JNIEnv *e,
|
|||||||
jint socket,
|
jint socket,
|
||||||
jbyteArray buffer,
|
jbyteArray buffer,
|
||||||
jint offset,
|
jint offset,
|
||||||
jint length)
|
jint length,
|
||||||
|
jboolean blocking)
|
||||||
{
|
{
|
||||||
|
int r;
|
||||||
|
if (blocking) {
|
||||||
|
uint8_t* buf = static_cast<uint8_t*>(allocate(e, length));
|
||||||
|
if (buf) {
|
||||||
|
e->GetByteArrayRegion
|
||||||
|
(buffer, offset, length, reinterpret_cast<jbyte*>(buf));
|
||||||
|
r = ::doWrite(socket, buf, length);
|
||||||
|
free(buf);
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
jboolean isCopy;
|
jboolean isCopy;
|
||||||
uint8_t *buf = static_cast<uint8_t*>
|
uint8_t* buf = static_cast<uint8_t*>
|
||||||
(e->GetPrimitiveArrayCritical(buffer, &isCopy));
|
(e->GetPrimitiveArrayCritical(buffer, &isCopy));
|
||||||
int r = ::doWrite(socket, buf + offset, length);
|
|
||||||
|
r = ::doWrite(socket, buf + offset, length);
|
||||||
|
|
||||||
e->ReleasePrimitiveArrayCritical(buffer, buf, 0);
|
e->ReleasePrimitiveArrayCritical(buffer, buf, 0);
|
||||||
|
}
|
||||||
|
|
||||||
if (r < 0) {
|
if (r < 0) {
|
||||||
if (eagain()) {
|
if (eagain()) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -15,6 +15,7 @@ import java.nio.ByteBuffer;
|
|||||||
|
|
||||||
public abstract class SelectableChannel implements Channel {
|
public abstract class SelectableChannel implements Channel {
|
||||||
private SelectionKey key;
|
private SelectionKey key;
|
||||||
|
private boolean open = true;
|
||||||
|
|
||||||
abstract int socketFD();
|
abstract int socketFD();
|
||||||
|
|
||||||
@ -30,12 +31,11 @@ public abstract class SelectableChannel implements Channel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean isOpen() {
|
public boolean isOpen() {
|
||||||
return key != null;
|
return open;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (key != null) {
|
open = false;
|
||||||
key = null;
|
key = null;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,7 @@ public class SocketChannel extends SelectableChannel
|
|||||||
byte[] array = b.array();
|
byte[] array = b.array();
|
||||||
if (array == null) throw new NullPointerException();
|
if (array == null) throw new NullPointerException();
|
||||||
|
|
||||||
int r = natRead(socket, array, b.arrayOffset() + b.position(), b.remaining());
|
int r = natRead(socket, array, b.arrayOffset() + b.position(), b.remaining(), blocking);
|
||||||
if (r > 0) {
|
if (r > 0) {
|
||||||
b.position(b.position() + r);
|
b.position(b.position() + r);
|
||||||
}
|
}
|
||||||
@ -108,7 +108,7 @@ public class SocketChannel extends SelectableChannel
|
|||||||
byte[] array = b.array();
|
byte[] array = b.array();
|
||||||
if (array == null) throw new NullPointerException();
|
if (array == null) throw new NullPointerException();
|
||||||
|
|
||||||
int w = natWrite(socket, array, b.arrayOffset() + b.position(), b.remaining());
|
int w = natWrite(socket, array, b.arrayOffset() + b.position(), b.remaining(), blocking);
|
||||||
if (w > 0) {
|
if (w > 0) {
|
||||||
b.position(b.position() + w);
|
b.position(b.position() + w);
|
||||||
}
|
}
|
||||||
@ -139,9 +139,9 @@ public class SocketChannel extends SelectableChannel
|
|||||||
throws IOException;
|
throws IOException;
|
||||||
private static native boolean natFinishConnect(int socket)
|
private static native boolean natFinishConnect(int socket)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
private static native int natRead(int socket, byte[] buffer, int offset, int length)
|
private static native int natRead(int socket, byte[] buffer, int offset, int length, boolean blocking)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
private static native int natWrite(int socket, byte[] buffer, int offset, int length)
|
private static native int natWrite(int socket, byte[] buffer, int offset, int length, boolean blocking)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
private static native void natThrowWriteError(int socket) throws IOException;
|
private static native void natThrowWriteError(int socket) throws IOException;
|
||||||
private static native void natCloseSocket(int socket);
|
private static native void natCloseSocket(int socket);
|
||||||
|
Reference in New Issue
Block a user