mirror of
https://github.com/corda/corda.git
synced 2025-01-07 13:38:47 +00:00
implement blocking mode for SocketChannel and ServerSocketChannel
This commit is contained in:
parent
c2e9b3ed76
commit
80d4385cb8
@ -154,17 +154,19 @@ eagain()
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
makeNonblocking(JNIEnv* e, int d)
|
setBlocking(JNIEnv* e, int d, bool blocking)
|
||||||
{
|
{
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
u_long a = 1;
|
u_long a = (blocking ? 0 : 1);
|
||||||
int r = ioctlsocket(d, FIONBIO, &a);
|
int r = ioctlsocket(d, FIONBIO, &a);
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
throwIOException(e);
|
throwIOException(e);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
int r = fcntl(d, F_SETFL, fcntl(d, F_GETFL) | O_NONBLOCK);
|
int r = fcntl(d, F_SETFL, (blocking
|
||||||
|
? (fcntl(d, F_GETFL) & (~O_NONBLOCK))
|
||||||
|
: (fcntl(d, F_GETFL) | O_NONBLOCK)));
|
||||||
if (r < 0) {
|
if (r < 0) {
|
||||||
throwIOException(e);
|
throwIOException(e);
|
||||||
return false;
|
return false;
|
||||||
@ -231,7 +233,6 @@ doAccept(JNIEnv* e, int s)
|
|||||||
socklen_t length = sizeof(address);
|
socklen_t length = sizeof(address);
|
||||||
int r = ::accept(s, &address, &length);
|
int r = ::accept(s, &address, &length);
|
||||||
if (r >= 0) {
|
if (r >= 0) {
|
||||||
makeNonblocking(e, r);
|
|
||||||
return r;
|
return r;
|
||||||
} else {
|
} else {
|
||||||
throwIOException(e);
|
throwIOException(e);
|
||||||
@ -260,7 +261,7 @@ doWrite(int fd, const void* buffer, size_t count)
|
|||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
makeSocket(JNIEnv* e, bool blocking = false)
|
makeSocket(JNIEnv* e)
|
||||||
{
|
{
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
static bool wsaInitialized = false;
|
static bool wsaInitialized = false;
|
||||||
@ -279,8 +280,6 @@ makeSocket(JNIEnv* e, bool blocking = false)
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (not blocking) makeNonblocking(e, s);
|
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,6 +310,15 @@ Java_java_nio_channels_ServerSocketChannel_natDoListen(JNIEnv *e,
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" JNIEXPORT void JNICALL
|
||||||
|
Java_java_nio_channels_SocketChannel_configureBlocking(JNIEnv *e,
|
||||||
|
jclass,
|
||||||
|
jint socket,
|
||||||
|
jboolean blocking)
|
||||||
|
{
|
||||||
|
setBlocking(e, socket, blocking);
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" JNIEXPORT void JNICALL
|
extern "C" JNIEXPORT void JNICALL
|
||||||
Java_java_nio_channels_SocketChannel_natSetTcpNoDelay(JNIEnv *e,
|
Java_java_nio_channels_SocketChannel_natSetTcpNoDelay(JNIEnv *e,
|
||||||
jclass,
|
jclass,
|
||||||
@ -325,11 +333,14 @@ Java_java_nio_channels_SocketChannel_natDoConnect(JNIEnv *e,
|
|||||||
jclass,
|
jclass,
|
||||||
jstring host,
|
jstring host,
|
||||||
jint port,
|
jint port,
|
||||||
|
jboolean blocking,
|
||||||
jbooleanArray retVal)
|
jbooleanArray retVal)
|
||||||
{
|
{
|
||||||
int s = makeSocket(e);
|
int s = makeSocket(e);
|
||||||
if (e->ExceptionOccurred()) return 0;
|
if (e->ExceptionOccurred()) return 0;
|
||||||
|
|
||||||
|
setBlocking(e, s, blocking);
|
||||||
|
|
||||||
sockaddr_in address;
|
sockaddr_in address;
|
||||||
init(e, &address, host, port);
|
init(e, &address, host, port);
|
||||||
if (e->ExceptionOccurred()) return 0;
|
if (e->ExceptionOccurred()) return 0;
|
||||||
@ -425,7 +436,8 @@ class Pipe {
|
|||||||
address.sin_family = AF_INET;
|
address.sin_family = AF_INET;
|
||||||
address.sin_port = 0;
|
address.sin_port = 0;
|
||||||
address.sin_addr.s_addr = inet_addr("127.0.0.1"); //INADDR_LOOPBACK;
|
address.sin_addr.s_addr = inet_addr("127.0.0.1"); //INADDR_LOOPBACK;
|
||||||
listener_ = makeSocket(e, false);
|
listener_ = makeSocket(e);
|
||||||
|
setBlocking(e, listener_, false);
|
||||||
::doListen(e, listener_, &address);
|
::doListen(e, listener_, &address);
|
||||||
|
|
||||||
socklen_t length = sizeof(sockaddr_in);
|
socklen_t length = sizeof(sockaddr_in);
|
||||||
@ -435,7 +447,8 @@ class Pipe {
|
|||||||
throwIOException(e);
|
throwIOException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
writer_ = makeSocket(e, true);
|
writer_ = makeSocket(e);
|
||||||
|
setBlocking(e, writer_, true);
|
||||||
connected_ = ::doConnect(e, writer_, &address);
|
connected_ = ::doConnect(e, writer_, &address);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -485,8 +498,8 @@ class Pipe {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (makeNonblocking(e, pipe[0])) {
|
if (setBlocking(e, pipe[0], false)) {
|
||||||
makeNonblocking(e, pipe[1]);
|
setBlocking(e, pipe[1], false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ public class ServerSocketChannel extends SelectableChannel {
|
|||||||
return new ServerSocketChannel();
|
return new ServerSocketChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
public SelectableChannel configureBlocking(boolean v) {
|
public SelectableChannel configureBlocking(boolean v) throws IOException {
|
||||||
return channel.configureBlocking(v);
|
return channel.configureBlocking(v);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,6 +61,7 @@ public class ServerSocketChannel extends SelectableChannel {
|
|||||||
throw new IllegalArgumentException();
|
throw new IllegalArgumentException();
|
||||||
}
|
}
|
||||||
channel.socket = doListen(a.getHostName(), a.getPort());
|
channel.socket = doListen(a.getHostName(), a.getPort());
|
||||||
|
channel.configureBlocking(channel.isBlocking());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,21 +24,29 @@ public class SocketChannel extends SelectableChannel
|
|||||||
|
|
||||||
int socket = InvalidSocket;
|
int socket = InvalidSocket;
|
||||||
boolean connected = false;
|
boolean connected = false;
|
||||||
|
boolean blocking = true;
|
||||||
|
|
||||||
public static SocketChannel open() {
|
public static SocketChannel open() {
|
||||||
return new SocketChannel();
|
return new SocketChannel();
|
||||||
}
|
}
|
||||||
|
|
||||||
public SelectableChannel configureBlocking(boolean v) {
|
public SelectableChannel configureBlocking(boolean v) throws IOException {
|
||||||
if (v) throw new IllegalArgumentException();
|
blocking = v;
|
||||||
|
if (socket != InvalidSocket) {
|
||||||
|
configureBlocking(socket, v);
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isBlocking() {
|
||||||
|
return blocking;
|
||||||
|
}
|
||||||
|
|
||||||
public Socket socket() {
|
public Socket socket() {
|
||||||
return new Handle();
|
return new Handle();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean connect(SocketAddress address) throws Exception {
|
public boolean connect(SocketAddress address) throws IOException {
|
||||||
InetSocketAddress a;
|
InetSocketAddress a;
|
||||||
try {
|
try {
|
||||||
a = (InetSocketAddress) address;
|
a = (InetSocketAddress) address;
|
||||||
@ -46,6 +54,7 @@ public class SocketChannel extends SelectableChannel
|
|||||||
throw new UnsupportedAddressTypeException();
|
throw new UnsupportedAddressTypeException();
|
||||||
}
|
}
|
||||||
socket = doConnect(a.getHostName(), a.getPort());
|
socket = doConnect(a.getHostName(), a.getPort());
|
||||||
|
configureBlocking(blocking);
|
||||||
return connected;
|
return connected;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,11 +65,11 @@ public class SocketChannel extends SelectableChannel
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int doConnect(String host, int port) throws Exception {
|
private int doConnect(String host, int port) throws IOException {
|
||||||
if (host == null) throw new NullPointerException();
|
if (host == null) throw new NullPointerException();
|
||||||
|
|
||||||
boolean b[] = new boolean[1];
|
boolean b[] = new boolean[1];
|
||||||
int s = natDoConnect(host, port, b);
|
int s = natDoConnect(host, port, blocking, b);
|
||||||
connected = b[0];
|
connected = b[0];
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
@ -109,11 +118,14 @@ public class SocketChannel extends SelectableChannel
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static native void configureBlocking(int socket, boolean blocking)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
private static native void natSetTcpNoDelay(int socket, boolean on)
|
private static native void natSetTcpNoDelay(int socket, boolean on)
|
||||||
throws SocketException;
|
throws SocketException;
|
||||||
|
|
||||||
private static native int natDoConnect(String host, int port, boolean[] connected)
|
private static native int natDoConnect(String host, int port, boolean blocking, boolean[] connected)
|
||||||
throws Exception;
|
throws IOException;
|
||||||
private static native int natRead(int socket, byte[] buffer, int offset, int length)
|
private static native int natRead(int socket, byte[] buffer, int offset, int length)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
private static native int natWrite(int socket, byte[] buffer, int offset, int length)
|
private static native int natWrite(int socket, byte[] buffer, int offset, int length)
|
||||||
|
Loading…
Reference in New Issue
Block a user