From 98269286e5eaee7dc60ff60469a8cdf8eb6a29d8 Mon Sep 17 00:00:00 2001 From: Eric Scharff Date: Fri, 5 Oct 2007 15:32:56 -0600 Subject: [PATCH] Implemented a basic NIO socket channel interface. Non-blocking socket channels and server socket channels are implemented. This version works but only when libnative is linked with g++ (because of C++ object creation code that fails without this linking) --- classpath/java-nio.cpp | 565 ++++++++++++++++++ classpath/java/net/InetSocketAddress.java | 19 + classpath/java/nio/ByteBuffer.java | 4 +- classpath/java/nio/channels/Channel.java | 8 + .../nio/channels/ReadableByteChannel.java | 8 + .../java/nio/channels/SelectableChannel.java | 28 + classpath/java/nio/channels/SelectionKey.java | 68 +++ classpath/java/nio/channels/Selector.java | 38 ++ .../nio/channels/ServerSocketChannel.java | 39 ++ .../java/nio/channels/SocketChannel.java | 78 +++ .../java/nio/channels/SocketSelector.java | 159 +++++ .../nio/channels/WritableByteChannel.java | 8 + 12 files changed, 1021 insertions(+), 1 deletion(-) create mode 100644 classpath/java-nio.cpp create mode 100644 classpath/java/net/InetSocketAddress.java create mode 100644 classpath/java/nio/channels/Channel.java create mode 100644 classpath/java/nio/channels/ReadableByteChannel.java create mode 100644 classpath/java/nio/channels/SelectableChannel.java create mode 100644 classpath/java/nio/channels/SelectionKey.java create mode 100644 classpath/java/nio/channels/Selector.java create mode 100644 classpath/java/nio/channels/ServerSocketChannel.java create mode 100644 classpath/java/nio/channels/SocketChannel.java create mode 100644 classpath/java/nio/channels/SocketSelector.java create mode 100644 classpath/java/nio/channels/WritableByteChannel.java diff --git a/classpath/java-nio.cpp b/classpath/java-nio.cpp new file mode 100644 index 0000000000..0957bde259 --- /dev/null +++ b/classpath/java-nio.cpp @@ -0,0 +1,565 @@ +#include +#include +#include +#include +#include + +#ifdef WIN32 +# include +#else +# include +# include +# include +# include +#endif + +#include "jni.h" +#include "jni-util.h" + +#undef JNIEXPORT +#define JNIEXPORT __attribute__ ((visibility("default"))) + +#define java_nio_channels_SelectionKey_OP_READ 1L +#define java_nio_channels_SelectionKey_OP_WRITE 4L +#define java_nio_channels_SelectionKey_OP_ACCEPT 16L + +namespace { + +inline const char* +errorString(int e) +{ + return strerror(e); +} + +inline const char* +errorString() +{ +#ifdef WIN32 + const unsigned size = 64; + char buffer[size]; + snprintf(buffer, size, "wsa code: %d", WSAGetLastError()); + return JvNewStringLatin1(buffer); +#else + return errorString(errno); +#endif +} + +void +throwIOException(JNIEnv* e) +{ + throwNew(e, "java/io/IOException", errorString()); +} + +void +init(JNIEnv* e, sockaddr_in* address, jstring hostString, jint port) +{ + const char* chars = e->GetStringUTFChars(hostString, 0); + if (chars) { + hostent* host = gethostbyname(chars); + e->ReleaseStringUTFChars(hostString, chars); + if (host == 0) { + // herror("init: gethostbyname"); + throwIOException(e); + return; + } + memset(address, 0, sizeof(sockaddr_in)); + address->sin_family = AF_INET; + address->sin_port = htons(port); + address->sin_addr = *reinterpret_cast(host->h_addr_list[0]); + } +} + +inline bool +einProgress() +{ +#ifdef WIN32 + return WSAGetLastError() == WSAEINPROGRESS + or WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EINPROGRESS; +#endif +} + +inline bool +eagain() +{ +#ifdef WIN32 + return WSAGetLastError() == WSAEINPROGRESS + or WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EAGAIN; +#endif +} + +bool +makeNonblocking(JNIEnv* e, int d) +{ +#ifdef WIN32 + u_long a = 1; + int r = ioctlsocket(d, FIONBIO, &a); + if (r != 0) throw new IOException(errorString()); +#else + int r = fcntl(d, F_SETFL, fcntl(d, F_GETFL) | O_NONBLOCK); + if (r < 0) { + throwIOException(e); + return false; + } + return true; +#endif +} + +void +doListen(JNIEnv* e, int s, sockaddr_in* address) +{ + int opt = 1; + int r = ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast(&opt), sizeof(int)); + if (r != 0) { + throwIOException(e); + return; + } + + r = ::bind(s, reinterpret_cast(address), sizeof(sockaddr_in)); + if (r != 0) { + throwIOException(e); + return; + } + + r = ::listen(s, 100); + if (r != 0) { + throwIOException(e); + } +} + +bool +doConnect(JNIEnv* e, int s, sockaddr_in* address) +{ + int r = ::connect(s, reinterpret_cast(address), + sizeof(sockaddr_in)); + if (r == 0) { + return true; + } else if (not einProgress()) { + throwIOException(e); + return false; + } else { + return false; + } +} + +int +doAccept(JNIEnv* e, int s) +{ + sockaddr address; + socklen_t length = sizeof(address); + int r = ::accept(s, &address, &length); + if (r >= 0) { +// System::out->print(JvNewStringLatin1("doAccept: socket: ")); +// System::out->println(String::valueOf((jint) r)); + + makeNonblocking(e, r); + return r; + } else { + throwIOException(e); + } + return -1; +} + +int +doRead(int fd, void* buffer, size_t count) +{ +#ifdef WIN32 + return recv(fd, static_cast(buffer), count, 0); +#else + return read(fd, buffer, count); +#endif +} + +int +doWrite(int fd, const void* buffer, size_t count) +{ +#ifdef WIN32 + return send(fd, static_cast(buffer), count, 0); +#else + return write(fd, buffer, count); +#endif +} + +int +makeSocket(JNIEnv* e, bool blocking = false) +{ +#ifdef WIN32 + static bool wsaInitialized = false; + if (not wsaInitialized) { + WSADATA data; + int r = WSAStartup(MAKEWORD(2, 2), &data); + if (r or LOBYTE(data.wVersion) != 2 or HIBYTE(data.wVersion) != 2) { + throw new IOException(JvNewStringLatin1("WSAStartup failed")); + } + } +#endif + + int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (s < 0) { throwIOException(e); return s; } + +// System::out->print(JvNewStringLatin1("makeSocket: socket: ")); +// System::out->println(String::valueOf((jint) s)); + + if (not blocking) makeNonblocking(e, s); + + return s; +} + +} // namespace + + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_ServerSocketChannel_natDoAccept(JNIEnv *e, jclass, jint socket) +{ + return ::doAccept(e, socket); +} + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_ServerSocketChannel_natDoListen(JNIEnv *e, + jclass, + jstring host, + jint port) +{ + int s = makeSocket(e); + if (s < 0) return s; + + sockaddr_in address; + init(e, &address, host, port); + + ::doListen(e, s, &address); + return s; +} + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_SocketChannel_natDoConnect(JNIEnv *e, + jclass, + jstring host, + jint port, + jbooleanArray retVal) +{ + int s = makeSocket(e); + + sockaddr_in address; + init(e, &address, host, port); + + jboolean connected = ::doConnect(e, s, &address); + e->SetBooleanArrayRegion(retVal, 0, 1, &connected); + + return s; +} + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_SocketChannel_natRead(JNIEnv *e, + jclass, + jint socket, + jbyteArray buffer, + jint offset, + jint length) +{ + jboolean isCopy; + uint8_t *buf = + reinterpret_cast(e->GetPrimitiveArrayCritical(buffer, &isCopy)); + int r = ::doRead(socket, buf + offset, length); + e->ReleasePrimitiveArrayCritical(buffer, buf, 0); + if (r < 0) { + if (eagain()) { + return 0; + } else { + throwIOException(e); + } + } + return r; +} + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_SocketChannel_natWrite(JNIEnv *e, + jclass, + jint socket, + jbyteArray buffer, + jint offset, + jint length) +{ + jboolean isCopy; + uint8_t *buf = + reinterpret_cast(e->GetPrimitiveArrayCritical(buffer, &isCopy)); + int r = ::doWrite(socket, buf + offset, length); + e->ReleasePrimitiveArrayCritical(buffer, buf, 0); + if (r < 0) { + if (eagain()) { + return 0; + } else { + throwIOException(e); + } + } + return r; +} + + +extern "C" JNIEXPORT void JNICALL +Java_java_nio_channels_SocketChannel_natThrowWriteError(JNIEnv *e, + jclass, + jint socket) +{ + int error; + socklen_t size = sizeof(int); + int r = getsockopt(socket, SOL_SOCKET, SO_ERROR, + reinterpret_cast(&error), &size); + if (r != 0 or size != sizeof(int) or error != 0) { + throwNew(e, "java/io/IOException", errorString(error)); + } +} + +extern "C" JNIEXPORT void JNICALL +Java_java_nio_channels_SocketChannel_natCloseSocket(JNIEnv *, + jclass, + jint socket) +{ + close(socket); +} + +namespace { + +class Pipe { + public: +#ifdef WIN32 + // The Windows socket API only accepts socket file descriptors, not + // pipe descriptors or others. Thus, to implement + // Selector.wakeup(), we make a socket connection via the loopback + // interface and use it as a pipe. + Pipe(): connected_(false), listener_(-1), reader_(-1), writer_(-1) { + sockaddr_in address; + address.sin_family = AF_INET; + address.sin_port = 0; + address.sin_addr.s_addr = inet_addr("127.0.0.1"); //INADDR_LOOPBACK; + listener_ = makeSocket(); + ::doListen(listener_, &address); + + socklen_t length = sizeof(sockaddr_in); + int r = getsockname(listener_, reinterpret_cast(&address), + &length); + if (r) { + throw new IOException(errorString()); + } + + writer_ = makeSocket(true); + connected_ = ::doConnect(writer_, &address); + } + + ~Pipe() { + if (listener_ >= 0) ::close(listener_); + if (reader_ >= 0) ::close(reader_); + if (writer_ >= 0) ::close(writer_); + } + + bool connected() { + return connected_; + } + + void setConnected(bool v) { + connected_ = v; + } + + int listener() { + return listener_; + } + + void setListener(int v) { + listener_ = v; + } + + int reader() { + return reader_; + } + + void setReader(int v) { + reader_ = v; + } + + int writer() { + return writer_; + } + + private: + bool connected_; + int listener_; + int reader_; + int writer_; +#else + Pipe(JNIEnv* e) { + if (::pipe(pipe) != 0) { + throwIOException(e); + return; + } + + if (makeNonblocking(e, pipe[0])) { + makeNonblocking(e, pipe[1]); + } + } + + ~Pipe() { + ::close(pipe[0]); + ::close(pipe[1]); + } + + bool connected() { + return true; + } + + int reader() { + return pipe[0]; + } + + int writer() { + return pipe[1]; + } + + private: + int pipe[2]; +#endif +}; + +struct SelectorState { + fd_set read; + fd_set write; + fd_set except; + Pipe control; + SelectorState(JNIEnv* e) : control(e) { } +}; + +} // namespace + +extern "C" JNIEXPORT jlong JNICALL +Java_java_nio_channels_SocketSelector_natInit(JNIEnv* e, jclass) +{ + SelectorState* s = new SelectorState(e); + if (s) { + FD_ZERO(&(s->read)); + FD_ZERO(&(s->write)); + FD_ZERO(&(s->except)); + } + return reinterpret_cast(s); +} + +extern "C" JNIEXPORT void JNICALL +Java_java_nio_channels_SocketSelector_natWakeup(JNIEnv *e, jclass, jlong state) +{ + SelectorState* s = reinterpret_cast(state); + if (s->control.connected()) { + const char c = 1; + int r = ::doWrite(s->control.writer(), &c, 1); + if (r != 1) { + throwIOException(e); + } + } +} + +extern "C" JNIEXPORT void JNICALL +Java_java_nio_channels_SocketSelector_natClearWoken(JNIEnv *e, jclass, jlong state) +{ + SelectorState* s = reinterpret_cast(state); + if (s->control.connected() and s->control.reader() >= 0) { + char c; + int r = ::doRead(s->control.reader(), &c, 1); + if (r != 1) { + throwIOException(e); + } + } +} + +extern "C" JNIEXPORT void JNICALL +Java_java_nio_channels_SocketSelector_natClose(JNIEnv *, jclass, jlong state) +{ + SelectorState* s = reinterpret_cast(state); + delete s; +} + +extern "C" JNIEXPORT void JNICALL +Java_java_nio_channels_SocketSelector_natSelectClearAll(JNIEnv *, jclass, + jint socket, + jlong state) +{ + SelectorState* s = reinterpret_cast(state); + FD_CLR(socket, &(s->read)); + FD_CLR(socket, &(s->write)); + FD_CLR(socket, &(s->except)); +} + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_SocketSelector_natSelectUpdateInterestSet(JNIEnv *, + jclass, + jint socket, + jint interest, + jlong state, + jint max) +{ + SelectorState* s = reinterpret_cast(state); + if (interest & (java_nio_channels_SelectionKey_OP_READ | + java_nio_channels_SelectionKey_OP_ACCEPT)) { + FD_SET(socket, &(s->read)); + if (max < socket) max = socket; + } else { + FD_CLR(socket, &(s->read)); + } + + if (interest & java_nio_channels_SelectionKey_OP_WRITE) { + FD_SET(socket, &(s->write)); + if (max < socket) max = socket; + } else { + FD_CLR(socket, &(s->write)); + } + return max; +} + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_SocketSelector_natDoSocketSelect(JNIEnv *e, jclass, + jlong state, + jint max, + jlong interval) +{ + SelectorState* s = reinterpret_cast(state); + if (s->control.reader() >= 0) { + int socket = s->control.reader(); + FD_SET(socket, &(s->read)); + if (max < socket) max = socket; + } + timeval time = { interval / 1000, (interval % 1000) * 1000 }; + int r = ::select(max + 1, &(s->read), &(s->write), &(s->except), &time); + if (r < 0) { + if (errno != EINTR) { + throwIOException(e); + } + } + return r; +} + +extern "C" JNIEXPORT jint JNICALL +Java_java_nio_channels_SocketSelector_natUpdateReadySet(JNIEnv *, jclass, + jint socket, + jint interest, + jlong state) +{ + SelectorState* s = reinterpret_cast(state); + jint ready = 0; + + if (FD_ISSET(socket, &(s->read))) { + if (interest & java_nio_channels_SelectionKey_OP_READ) { + ready |= java_nio_channels_SelectionKey_OP_READ; + } + + if (interest & java_nio_channels_SelectionKey_OP_ACCEPT) { + ready |= java_nio_channels_SelectionKey_OP_ACCEPT; + } + } + + if ((interest & java_nio_channels_SelectionKey_OP_WRITE) + and FD_ISSET(socket, &(s->write))) { + ready |= java_nio_channels_SelectionKey_OP_WRITE; + } + return ready; +} + + diff --git a/classpath/java/net/InetSocketAddress.java b/classpath/java/net/InetSocketAddress.java new file mode 100644 index 0000000000..0d13bb680e --- /dev/null +++ b/classpath/java/net/InetSocketAddress.java @@ -0,0 +1,19 @@ +package java.net; + +public class InetSocketAddress { + private final String host; + private final int port; + + public InetSocketAddress(String host, int port) { + this.host = host; + this.port = port; + } + + public String getHostName() { + return host; + } + + public int getPort() { + return port; + } +} diff --git a/classpath/java/nio/ByteBuffer.java b/classpath/java/nio/ByteBuffer.java index dc32fac794..08ba722bd5 100644 --- a/classpath/java/nio/ByteBuffer.java +++ b/classpath/java/nio/ByteBuffer.java @@ -60,7 +60,9 @@ public class ByteBuffer { if (position != 0) { System.arraycopy(array, arrayOffset+position, array, arrayOffset, remaining()); } - position=0; + position=remaining(); + limit(capacity()); + return this; } diff --git a/classpath/java/nio/channels/Channel.java b/classpath/java/nio/channels/Channel.java new file mode 100644 index 0000000000..ce21549fdb --- /dev/null +++ b/classpath/java/nio/channels/Channel.java @@ -0,0 +1,8 @@ +package java.nio.channels; + +import java.io.IOException; + +public interface Channel { + public void close() throws IOException; + public boolean isOpen(); +} diff --git a/classpath/java/nio/channels/ReadableByteChannel.java b/classpath/java/nio/channels/ReadableByteChannel.java new file mode 100644 index 0000000000..cd19ae8186 --- /dev/null +++ b/classpath/java/nio/channels/ReadableByteChannel.java @@ -0,0 +1,8 @@ +package java.nio.channels; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface ReadableByteChannel extends Channel { + public int read(ByteBuffer b) throws IOException; +} diff --git a/classpath/java/nio/channels/SelectableChannel.java b/classpath/java/nio/channels/SelectableChannel.java new file mode 100644 index 0000000000..072d0a5bd6 --- /dev/null +++ b/classpath/java/nio/channels/SelectableChannel.java @@ -0,0 +1,28 @@ +package java.nio.channels; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public abstract class SelectableChannel implements Channel { + private SelectionKey key; + + public abstract int read(ByteBuffer b) throws Exception; + public abstract int write(ByteBuffer b) throws Exception; + public abstract boolean isOpen(); + + public SelectionKey register(Selector selector, int interestOps, + Object attachment) + { + SelectionKey key = new SelectionKey + (this, selector, interestOps, attachment); + selector.add(key); + return key; + } + + public void close() throws IOException { + if (key != null) { + key.selector().remove(key); + key = null; + } + } +} diff --git a/classpath/java/nio/channels/SelectionKey.java b/classpath/java/nio/channels/SelectionKey.java new file mode 100644 index 0000000000..58b433b1ae --- /dev/null +++ b/classpath/java/nio/channels/SelectionKey.java @@ -0,0 +1,68 @@ +package java.nio.channels; + +public class SelectionKey { + public static final int OP_READ = 1 << 0; + public static final int OP_WRITE = 1 << 2; + public static final int OP_ACCEPT = 1 << 4; +// public static final int OP_CONNECT = 1 << 3; + + private final SelectableChannel channel; + private final Selector selector; + private int interestOps; + private int readyOps; + private final Object attachment; + + public SelectionKey(SelectableChannel channel, Selector selector, + int interestOps, Object attachment) + { + this.channel = channel; + this.selector = selector; + this.interestOps = interestOps; + this.attachment = attachment; + this.readyOps = 0; + } + + public int interestOps() { + return interestOps; + } + + public void interestOps(int v) { + this.interestOps = v; + } + + public int readyOps() { + return readyOps; + } + + public void readyOps(int v) { + this.readyOps = v; + } + + public boolean isReadable() { + return (readyOps & OP_READ) != 0; + } + + public boolean isWritable() { + return (readyOps & OP_WRITE) != 0; + } + + public boolean isAcceptable() { + return (readyOps & OP_ACCEPT) != 0; + } + + public boolean isValid() { + return channel.isOpen() && selector.isOpen(); + } + + public SelectableChannel channel() { + return channel; + } + + public Selector selector() { + return selector; + } + + public Object attachment() { + return attachment; + } +} diff --git a/classpath/java/nio/channels/Selector.java b/classpath/java/nio/channels/Selector.java new file mode 100644 index 0000000000..f60d01c28a --- /dev/null +++ b/classpath/java/nio/channels/Selector.java @@ -0,0 +1,38 @@ +package java.nio.channels; + +import java.io.IOException; +import java.util.Set; +import java.util.HashSet; + +public abstract class Selector { + protected final Set keys = new HashSet(); + protected final Set selectedKeys = new HashSet(); + + public static Selector open() { + return new SocketSelector(); + } + + public void add(SelectionKey key) { + keys.add(key); + } + + public void remove(SelectionKey key) { + keys.remove(key); + } + + public Set keys() { + return keys; + } + + public Set selectedKeys() { + return selectedKeys; + } + + public abstract boolean isOpen(); + + public abstract void wakeup(); + + public abstract void select(long interval) throws IOException; + + public abstract void close(); +} diff --git a/classpath/java/nio/channels/ServerSocketChannel.java b/classpath/java/nio/channels/ServerSocketChannel.java new file mode 100644 index 0000000000..0123066e66 --- /dev/null +++ b/classpath/java/nio/channels/ServerSocketChannel.java @@ -0,0 +1,39 @@ +package java.nio.channels; + +import java.net.InetSocketAddress; + +public class ServerSocketChannel extends SocketChannel { + public static ServerSocketChannel open() { + return new ServerSocketChannel(); + } + + public SocketChannel accept() throws Exception { + SocketChannel c = new SocketChannel(); + c.socket = doAccept(); + c.connected = true; + return c; + } + + public Handle socket() { + return new Handle(); + } + + private int doAccept() throws Exception { + return natDoAccept(socket); + } + + private int doListen(String host, int port) throws Exception { + return natDoListen(host, port); + } + + public class Handle { + public void bind(InetSocketAddress address) + throws Exception + { + socket = doListen(address.getHostName(), address.getPort()); + } + } + + private static native int natDoAccept(int socket) throws Exception; + private static native int natDoListen(String host, int port) throws Exception; +} diff --git a/classpath/java/nio/channels/SocketChannel.java b/classpath/java/nio/channels/SocketChannel.java new file mode 100644 index 0000000000..5c17f57b6c --- /dev/null +++ b/classpath/java/nio/channels/SocketChannel.java @@ -0,0 +1,78 @@ +package java.nio.channels; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class SocketChannel extends SelectableChannel + implements ReadableByteChannel, WritableByteChannel +{ + public static final int InvalidSocket = -1; + + protected int socket = InvalidSocket; + protected boolean open = true; + protected boolean connected = false; + + public static SocketChannel open() { + return new SocketChannel(); + } + + public void configureBlocking(boolean v) { + if (v) throw new IllegalArgumentException(); + } + + public void connect(InetSocketAddress address) throws Exception { + socket = doConnect(address.getHostName(), address.getPort()); + } + + public void close() throws IOException { + super.close(); + if (! open) return; + closeSocket(); + open = false; + } + + public boolean isOpen() { + return open; + } + + private int doConnect(String host, int port) throws Exception { + boolean b[] = new boolean[1]; + int s = natDoConnect(host, port, b); + connected = b[0]; + return s; + } + + public int read(ByteBuffer b) throws IOException { + if (b.remaining() == 0) return 0; + int r = natRead(socket, b.array(), b.arrayOffset() + b.position(), b.remaining()); + b.position(b.position() + r); + return r; + } + + public int write(ByteBuffer b) throws IOException { + if (! connected) { + natThrowWriteError(socket); + } + int w = natWrite(socket, b.array(), b.arrayOffset() + b.position(), b.remaining()); + b.position(b.position() + w); + return w; + } + + private void closeSocket() { + natCloseSocket(socket); + } + + int socketFD() { + return socket; + } + + private static native int natDoConnect(String host, int port, boolean[] connected) + throws Exception; + private static native int natRead(int socket, byte[] buffer, int offset, int length) + throws IOException; + private static native int natWrite(int socket, byte[] buffer, int offset, int length) + throws IOException; + private static native void natThrowWriteError(int socket) throws IOException; + private static native void natCloseSocket(int socket); +} diff --git a/classpath/java/nio/channels/SocketSelector.java b/classpath/java/nio/channels/SocketSelector.java new file mode 100644 index 0000000000..c0863488c0 --- /dev/null +++ b/classpath/java/nio/channels/SocketSelector.java @@ -0,0 +1,159 @@ +package java.nio.channels; + +import java.io.IOException; +import java.util.Iterator; + +class SocketSelector extends Selector { + private static final boolean isWin32; + protected long state; + protected final Object lock = new Object(); + protected boolean woken = false; + + static { + isWin32 = false; + } + + public SocketSelector() { + state = natInit(); + } + + public boolean isOpen() { + return state != 0; + } + + public void wakeup() { + synchronized (lock) { + if (! woken) { + woken = true; + + natWakeup(state); + } + } + } + + private boolean clearWoken() { + synchronized (lock) { + if (woken) { + woken = false; + natClearWoken(state); + return true; + } else { + return false; + } + } + } + + public synchronized void select(long interval) throws IOException { + if (isWin32) { + win32Select(interval); + } else { + posixSelect(interval); + } + } + + private void win32Select(long interval) { } + + private void posixSelect(long interval) throws IOException { + selectedKeys.clear(); + + if (clearWoken()) return; + int max=0; + for (Iterator it = keys.iterator(); + it.hasNext();) { + SelectionKey key = it.next(); + SocketChannel c = (SocketChannel)key.channel(); + int socket = c.socketFD(); + if (! c.isOpen()) { + natSelectClearAll(socket, state); + // Equivalent to: + // + // FD_CLR(socket, &(s->read)); + // FD_CLR(socket, &(s->write)); + // FD_CLR(socket, &(s->except)); + it.remove(); + continue; + } + + key.readyOps(0); + max = natSelectUpdateInterestSet(socket, key.interestOps(), state, max); + // Equivalent to: + // + // if (interest & (SelectionKey::OP_READ | SelectionKey::OP_ACCEPT)) { + // FD_SET(socket, &(s->read)); + // if (max < socket) max = socket; + // } else { + // FD_CLR(socket, &(s->read)); + // } + // + // if (interest & SelectionKey::OP_WRITE) { + // FD_SET(socket, &(s->write)); + // if (max < socket) max = socket; + // } else { + // FD_CLR(socket, &(s->write)); + // } + } + + int r = natDoSocketSelect(state, max, interval); + // Equivalent to: + // + // if (s->control.reader() >= 0) { + // unsigned socket = s->control.reader(); + // FD_SET(socket, &(s->read)); + // if (max < socket) max = socket; + // } + // timeval time = { interval / 1000, (interval % 1000) * 1000 }; + // int r = ::select(max + 1, &(s->read), &(s->write), &(s->except), &time); + // if (r < 0) { + // if (errno != EINTR) { + // throw new IOException(errorString()); + // } + // } + if (r > 0) { + for (SelectionKey key : keys) { + SocketChannel c = (SocketChannel)key.channel(); + int socket = c.socketFD(); + int ready = natUpdateReadySet(socket, key.interestOps(), state); + // Equivalent to: + // + // jint ready = 0; + // + // if (FD_ISSET(c->socket, &(s->read))) { + // if (interest & SelectionKey::OP_READ) { + // ready |= SelectionKey::OP_READ; + // } + // + // if (interest & SelectionKey::OP_ACCEPT) { + // ready |= SelectionKey::OP_ACCEPT; + // } + // } + // + // if ((interest & SelectionKey::OP_WRITE) + // and FD_ISSET(c->socket, &(s->write))) { + // ready |= SelectionKey::OP_WRITE; + // } + key.readyOps(ready); + if (ready != 0) { + selectedKeys.add(key); + } + } + } + clearWoken(); + } + + public void close() { + natClose(state); + } + + private static native long natInit(); + private static native void natWakeup(long state); + private static native void natClearWoken(long state); + private static native void natClose(long state); + private static native void natSelectClearAll(int socket, long state); + private static native int natSelectUpdateInterestSet(int socket, + int interest, + long state, + int max); + private static native int natDoSocketSelect(long state, int max, long interval) + throws IOException; + private static native int natUpdateReadySet(int socket, int interest, long state); +} diff --git a/classpath/java/nio/channels/WritableByteChannel.java b/classpath/java/nio/channels/WritableByteChannel.java new file mode 100644 index 0000000000..d604ef781b --- /dev/null +++ b/classpath/java/nio/channels/WritableByteChannel.java @@ -0,0 +1,8 @@ +package java.nio.channels; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface WritableByteChannel extends Channel { + public int write(ByteBuffer b) throws IOException; +}