mirror of
https://github.com/corda/corda.git
synced 2025-03-03 12:57:29 +00:00
fix several win32 bugs in SocketSelector
This commit is contained in:
parent
85012ba5b0
commit
4b92017ea9
@ -15,9 +15,6 @@
|
|||||||
# include <errno.h>
|
# include <errno.h>
|
||||||
# include <netdb.h>
|
# include <netdb.h>
|
||||||
# include <sys/select.h>
|
# include <sys/select.h>
|
||||||
|
|
||||||
# undef JNIEXPORT
|
|
||||||
# define JNIEXPORT __attribute__ ((visibility("default")))
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define java_nio_channels_SelectionKey_OP_READ 1L
|
#define java_nio_channels_SelectionKey_OP_READ 1L
|
||||||
@ -492,19 +489,6 @@ Java_java_nio_channels_SocketSelector_natWakeup(JNIEnv *e, jclass, jlong state)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" JNIEXPORT void JNICALL
|
|
||||||
Java_java_nio_channels_SocketSelector_natClearWoken(JNIEnv *e, jclass, jlong state)
|
|
||||||
{
|
|
||||||
SelectorState* s = reinterpret_cast<SelectorState*>(state);
|
|
||||||
if (s->control.connected() and s->control.reader() >= 0) {
|
|
||||||
char c;
|
|
||||||
int r = ::doRead(s->control.reader(), &c, 1);
|
|
||||||
if (r != 1) {
|
|
||||||
throwIOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
extern "C" JNIEXPORT void JNICALL
|
extern "C" JNIEXPORT void JNICALL
|
||||||
Java_java_nio_channels_SocketSelector_natClose(JNIEnv *, jclass, jlong state)
|
Java_java_nio_channels_SocketSelector_natClose(JNIEnv *, jclass, jlong state)
|
||||||
{
|
{
|
||||||
@ -562,8 +546,68 @@ Java_java_nio_channels_SocketSelector_natDoSocketSelect(JNIEnv *e, jclass,
|
|||||||
FD_SET(static_cast<unsigned>(socket), &(s->read));
|
FD_SET(static_cast<unsigned>(socket), &(s->read));
|
||||||
if (max < socket) max = socket;
|
if (max < socket) max = socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef WIN32
|
||||||
|
if (s->control.listener() >= 0) {
|
||||||
|
int socket = s->control.listener();
|
||||||
|
FD_SET(static_cast<unsigned>(socket), &(s->read));
|
||||||
|
if (max < socket) max = socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (not s->control.connected()) {
|
||||||
|
int socket = s->control.writer();
|
||||||
|
FD_SET(static_cast<unsigned>(socket), &(s->write));
|
||||||
|
FD_SET(static_cast<unsigned>(socket), &(s->except));
|
||||||
|
if (max < socket) max = socket;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
timeval time = { interval / 1000, (interval % 1000) * 1000 };
|
timeval time = { interval / 1000, (interval % 1000) * 1000 };
|
||||||
int r = ::select(max + 1, &(s->read), &(s->write), &(s->except), &time);
|
int r = ::select(max + 1, &(s->read), &(s->write), &(s->except), &time);
|
||||||
|
|
||||||
|
#ifdef WIN32
|
||||||
|
if (FD_ISSET(s->control.writer(), &(s->write)) or
|
||||||
|
FD_ISSET(s->control.writer(), &(s->except)))
|
||||||
|
{
|
||||||
|
unsigned socket = s->control.writer();
|
||||||
|
FD_CLR(socket, &(s->write));
|
||||||
|
FD_CLR(socket, &(s->except));
|
||||||
|
|
||||||
|
int error;
|
||||||
|
socklen_t size = sizeof(int);
|
||||||
|
int r = getsockopt(socket, SOL_SOCKET, SO_ERROR,
|
||||||
|
reinterpret_cast<char*>(&error), &size);
|
||||||
|
if (r != 0 or size != sizeof(int) or error != 0) {
|
||||||
|
throwIOException(e);
|
||||||
|
}
|
||||||
|
s->control.setConnected(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (s->control.listener() >= 0 and
|
||||||
|
FD_ISSET(s->control.listener(), &(s->read)))
|
||||||
|
{
|
||||||
|
FD_CLR(static_cast<unsigned>(s->control.listener()), &(s->read));
|
||||||
|
|
||||||
|
s->control.setReader(::doAccept(e, s->control.listener()));
|
||||||
|
s->control.setListener(-1);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (s->control.reader() >= 0 and
|
||||||
|
FD_ISSET(s->control.reader(), &(s->read)))
|
||||||
|
{
|
||||||
|
FD_CLR(static_cast<unsigned>(s->control.reader()), &(s->read));
|
||||||
|
|
||||||
|
char c;
|
||||||
|
int r = 1;
|
||||||
|
while (r == 1) {
|
||||||
|
r = ::doRead(s->control.reader(), &c, 1);
|
||||||
|
}
|
||||||
|
if (r < 0 and not eagain()) {
|
||||||
|
throwIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (r < 0) {
|
if (r < 0) {
|
||||||
if (errno != EINTR) {
|
if (errno != EINTR) {
|
||||||
throwIOException(e);
|
throwIOException(e);
|
||||||
|
@ -4,13 +4,13 @@ import java.io.IOException;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
class SocketSelector extends Selector {
|
class SocketSelector extends Selector {
|
||||||
private static final boolean isWin32;
|
private static final boolean IsWin32;
|
||||||
protected long state;
|
protected long state;
|
||||||
protected final Object lock = new Object();
|
protected final Object lock = new Object();
|
||||||
protected boolean woken = false;
|
protected boolean woken = false;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
isWin32 = false;
|
IsWin32 = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SocketSelector() {
|
public SocketSelector() {
|
||||||
@ -35,7 +35,6 @@ class SocketSelector extends Selector {
|
|||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (woken) {
|
if (woken) {
|
||||||
woken = false;
|
woken = false;
|
||||||
natClearWoken(state);
|
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
@ -44,93 +43,33 @@ class SocketSelector extends Selector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void select(long interval) throws IOException {
|
public synchronized void select(long interval) throws IOException {
|
||||||
if (isWin32) {
|
|
||||||
win32Select(interval);
|
|
||||||
} else {
|
|
||||||
posixSelect(interval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void win32Select(long interval) { }
|
|
||||||
|
|
||||||
private void posixSelect(long interval) throws IOException {
|
|
||||||
selectedKeys.clear();
|
selectedKeys.clear();
|
||||||
|
|
||||||
if (clearWoken()) return;
|
if (clearWoken()) return;
|
||||||
int max=0;
|
int max=0;
|
||||||
for (Iterator<SelectionKey> it = keys.iterator();
|
for (Iterator<SelectionKey> it = keys.iterator();
|
||||||
it.hasNext();) {
|
it.hasNext();)
|
||||||
|
{
|
||||||
SelectionKey key = it.next();
|
SelectionKey key = it.next();
|
||||||
SocketChannel c = (SocketChannel)key.channel();
|
SocketChannel c = (SocketChannel)key.channel();
|
||||||
int socket = c.socketFD();
|
int socket = c.socketFD();
|
||||||
if (! c.isOpen()) {
|
if (! c.isOpen()) {
|
||||||
natSelectClearAll(socket, state);
|
natSelectClearAll(socket, state);
|
||||||
// Equivalent to:
|
|
||||||
//
|
|
||||||
// FD_CLR(socket, &(s->read));
|
|
||||||
// FD_CLR(socket, &(s->write));
|
|
||||||
// FD_CLR(socket, &(s->except));
|
|
||||||
it.remove();
|
it.remove();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
key.readyOps(0);
|
key.readyOps(0);
|
||||||
max = natSelectUpdateInterestSet(socket, key.interestOps(), state, max);
|
max = natSelectUpdateInterestSet(socket, key.interestOps(), state, max);
|
||||||
// Equivalent to:
|
|
||||||
//
|
|
||||||
// if (interest & (SelectionKey::OP_READ | SelectionKey::OP_ACCEPT)) {
|
|
||||||
// FD_SET(socket, &(s->read));
|
|
||||||
// if (max < socket) max = socket;
|
|
||||||
// } else {
|
|
||||||
// FD_CLR(socket, &(s->read));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (interest & SelectionKey::OP_WRITE) {
|
|
||||||
// FD_SET(socket, &(s->write));
|
|
||||||
// if (max < socket) max = socket;
|
|
||||||
// } else {
|
|
||||||
// FD_CLR(socket, &(s->write));
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int r = natDoSocketSelect(state, max, interval);
|
int r = natDoSocketSelect(state, max, interval);
|
||||||
// Equivalent to:
|
|
||||||
//
|
|
||||||
// if (s->control.reader() >= 0) {
|
|
||||||
// unsigned socket = s->control.reader();
|
|
||||||
// FD_SET(socket, &(s->read));
|
|
||||||
// if (max < socket) max = socket;
|
|
||||||
// }
|
|
||||||
// timeval time = { interval / 1000, (interval % 1000) * 1000 };
|
|
||||||
// int r = ::select(max + 1, &(s->read), &(s->write), &(s->except), &time);
|
|
||||||
// if (r < 0) {
|
|
||||||
// if (errno != EINTR) {
|
|
||||||
// throw new IOException(errorString());
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
if (r > 0) {
|
if (r > 0) {
|
||||||
for (SelectionKey key : keys) {
|
for (SelectionKey key : keys) {
|
||||||
SocketChannel c = (SocketChannel)key.channel();
|
SocketChannel c = (SocketChannel)key.channel();
|
||||||
int socket = c.socketFD();
|
int socket = c.socketFD();
|
||||||
int ready = natUpdateReadySet(socket, key.interestOps(), state);
|
int ready = natUpdateReadySet(socket, key.interestOps(), state);
|
||||||
// Equivalent to:
|
|
||||||
//
|
|
||||||
// jint ready = 0;
|
|
||||||
//
|
|
||||||
// if (FD_ISSET(c->socket, &(s->read))) {
|
|
||||||
// if (interest & SelectionKey::OP_READ) {
|
|
||||||
// ready |= SelectionKey::OP_READ;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (interest & SelectionKey::OP_ACCEPT) {
|
|
||||||
// ready |= SelectionKey::OP_ACCEPT;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if ((interest & SelectionKey::OP_WRITE)
|
|
||||||
// and FD_ISSET(c->socket, &(s->write))) {
|
|
||||||
// ready |= SelectionKey::OP_WRITE;
|
|
||||||
// }
|
|
||||||
key.readyOps(ready);
|
key.readyOps(ready);
|
||||||
if (ready != 0) {
|
if (ready != 0) {
|
||||||
selectedKeys.add(key);
|
selectedKeys.add(key);
|
||||||
@ -146,7 +85,6 @@ class SocketSelector extends Selector {
|
|||||||
|
|
||||||
private static native long natInit();
|
private static native long natInit();
|
||||||
private static native void natWakeup(long state);
|
private static native void natWakeup(long state);
|
||||||
private static native void natClearWoken(long state);
|
|
||||||
private static native void natClose(long state);
|
private static native void natClose(long state);
|
||||||
private static native void natSelectClearAll(int socket, long state);
|
private static native void natSelectClearAll(int socket, long state);
|
||||||
private static native int natSelectUpdateInterestSet(int socket,
|
private static native int natSelectUpdateInterestSet(int socket,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user