Implemented a basic NIO socket channel interface. Non-blocking socket channels

and server socket channels are implemented.  This version works but only when
libnative is linked with g++ (because of C++ object creation code that fails
without this linking)
This commit is contained in:
Eric Scharff
2007-10-05 15:32:56 -06:00
parent e32a335079
commit 98269286e5
12 changed files with 1021 additions and 1 deletions

View File

@ -0,0 +1,19 @@
package java.net;
public class InetSocketAddress {
private final String host;
private final int port;
public InetSocketAddress(String host, int port) {
this.host = host;
this.port = port;
}
public String getHostName() {
return host;
}
public int getPort() {
return port;
}
}

View File

@ -60,7 +60,9 @@ public class ByteBuffer {
if (position != 0) {
System.arraycopy(array, arrayOffset+position, array, arrayOffset, remaining());
}
position=0;
position=remaining();
limit(capacity());
return this;
}

View File

@ -0,0 +1,8 @@
package java.nio.channels;
import java.io.IOException;
public interface Channel {
public void close() throws IOException;
public boolean isOpen();
}

View File

@ -0,0 +1,8 @@
package java.nio.channels;
import java.io.IOException;
import java.nio.ByteBuffer;
public interface ReadableByteChannel extends Channel {
public int read(ByteBuffer b) throws IOException;
}

View File

@ -0,0 +1,28 @@
package java.nio.channels;
import java.io.IOException;
import java.nio.ByteBuffer;
public abstract class SelectableChannel implements Channel {
private SelectionKey key;
public abstract int read(ByteBuffer b) throws Exception;
public abstract int write(ByteBuffer b) throws Exception;
public abstract boolean isOpen();
public SelectionKey register(Selector selector, int interestOps,
Object attachment)
{
SelectionKey key = new SelectionKey
(this, selector, interestOps, attachment);
selector.add(key);
return key;
}
public void close() throws IOException {
if (key != null) {
key.selector().remove(key);
key = null;
}
}
}

View File

@ -0,0 +1,68 @@
package java.nio.channels;
public class SelectionKey {
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_ACCEPT = 1 << 4;
// public static final int OP_CONNECT = 1 << 3;
private final SelectableChannel channel;
private final Selector selector;
private int interestOps;
private int readyOps;
private final Object attachment;
public SelectionKey(SelectableChannel channel, Selector selector,
int interestOps, Object attachment)
{
this.channel = channel;
this.selector = selector;
this.interestOps = interestOps;
this.attachment = attachment;
this.readyOps = 0;
}
public int interestOps() {
return interestOps;
}
public void interestOps(int v) {
this.interestOps = v;
}
public int readyOps() {
return readyOps;
}
public void readyOps(int v) {
this.readyOps = v;
}
public boolean isReadable() {
return (readyOps & OP_READ) != 0;
}
public boolean isWritable() {
return (readyOps & OP_WRITE) != 0;
}
public boolean isAcceptable() {
return (readyOps & OP_ACCEPT) != 0;
}
public boolean isValid() {
return channel.isOpen() && selector.isOpen();
}
public SelectableChannel channel() {
return channel;
}
public Selector selector() {
return selector;
}
public Object attachment() {
return attachment;
}
}

View File

@ -0,0 +1,38 @@
package java.nio.channels;
import java.io.IOException;
import java.util.Set;
import java.util.HashSet;
public abstract class Selector {
protected final Set<SelectionKey> keys = new HashSet();
protected final Set<SelectionKey> selectedKeys = new HashSet();
public static Selector open() {
return new SocketSelector();
}
public void add(SelectionKey key) {
keys.add(key);
}
public void remove(SelectionKey key) {
keys.remove(key);
}
public Set<SelectionKey> keys() {
return keys;
}
public Set<SelectionKey> selectedKeys() {
return selectedKeys;
}
public abstract boolean isOpen();
public abstract void wakeup();
public abstract void select(long interval) throws IOException;
public abstract void close();
}

View File

@ -0,0 +1,39 @@
package java.nio.channels;
import java.net.InetSocketAddress;
public class ServerSocketChannel extends SocketChannel {
public static ServerSocketChannel open() {
return new ServerSocketChannel();
}
public SocketChannel accept() throws Exception {
SocketChannel c = new SocketChannel();
c.socket = doAccept();
c.connected = true;
return c;
}
public Handle socket() {
return new Handle();
}
private int doAccept() throws Exception {
return natDoAccept(socket);
}
private int doListen(String host, int port) throws Exception {
return natDoListen(host, port);
}
public class Handle {
public void bind(InetSocketAddress address)
throws Exception
{
socket = doListen(address.getHostName(), address.getPort());
}
}
private static native int natDoAccept(int socket) throws Exception;
private static native int natDoListen(String host, int port) throws Exception;
}

View File

@ -0,0 +1,78 @@
package java.nio.channels;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
public class SocketChannel extends SelectableChannel
implements ReadableByteChannel, WritableByteChannel
{
public static final int InvalidSocket = -1;
protected int socket = InvalidSocket;
protected boolean open = true;
protected boolean connected = false;
public static SocketChannel open() {
return new SocketChannel();
}
public void configureBlocking(boolean v) {
if (v) throw new IllegalArgumentException();
}
public void connect(InetSocketAddress address) throws Exception {
socket = doConnect(address.getHostName(), address.getPort());
}
public void close() throws IOException {
super.close();
if (! open) return;
closeSocket();
open = false;
}
public boolean isOpen() {
return open;
}
private int doConnect(String host, int port) throws Exception {
boolean b[] = new boolean[1];
int s = natDoConnect(host, port, b);
connected = b[0];
return s;
}
public int read(ByteBuffer b) throws IOException {
if (b.remaining() == 0) return 0;
int r = natRead(socket, b.array(), b.arrayOffset() + b.position(), b.remaining());
b.position(b.position() + r);
return r;
}
public int write(ByteBuffer b) throws IOException {
if (! connected) {
natThrowWriteError(socket);
}
int w = natWrite(socket, b.array(), b.arrayOffset() + b.position(), b.remaining());
b.position(b.position() + w);
return w;
}
private void closeSocket() {
natCloseSocket(socket);
}
int socketFD() {
return socket;
}
private static native int natDoConnect(String host, int port, boolean[] connected)
throws Exception;
private static native int natRead(int socket, byte[] buffer, int offset, int length)
throws IOException;
private static native int natWrite(int socket, byte[] buffer, int offset, int length)
throws IOException;
private static native void natThrowWriteError(int socket) throws IOException;
private static native void natCloseSocket(int socket);
}

View File

@ -0,0 +1,159 @@
package java.nio.channels;
import java.io.IOException;
import java.util.Iterator;
class SocketSelector extends Selector {
private static final boolean isWin32;
protected long state;
protected final Object lock = new Object();
protected boolean woken = false;
static {
isWin32 = false;
}
public SocketSelector() {
state = natInit();
}
public boolean isOpen() {
return state != 0;
}
public void wakeup() {
synchronized (lock) {
if (! woken) {
woken = true;
natWakeup(state);
}
}
}
private boolean clearWoken() {
synchronized (lock) {
if (woken) {
woken = false;
natClearWoken(state);
return true;
} else {
return false;
}
}
}
public synchronized void select(long interval) throws IOException {
if (isWin32) {
win32Select(interval);
} else {
posixSelect(interval);
}
}
private void win32Select(long interval) { }
private void posixSelect(long interval) throws IOException {
selectedKeys.clear();
if (clearWoken()) return;
int max=0;
for (Iterator<SelectionKey> it = keys.iterator();
it.hasNext();) {
SelectionKey key = it.next();
SocketChannel c = (SocketChannel)key.channel();
int socket = c.socketFD();
if (! c.isOpen()) {
natSelectClearAll(socket, state);
// Equivalent to:
//
// FD_CLR(socket, &(s->read));
// FD_CLR(socket, &(s->write));
// FD_CLR(socket, &(s->except));
it.remove();
continue;
}
key.readyOps(0);
max = natSelectUpdateInterestSet(socket, key.interestOps(), state, max);
// Equivalent to:
//
// if (interest & (SelectionKey::OP_READ | SelectionKey::OP_ACCEPT)) {
// FD_SET(socket, &(s->read));
// if (max < socket) max = socket;
// } else {
// FD_CLR(socket, &(s->read));
// }
//
// if (interest & SelectionKey::OP_WRITE) {
// FD_SET(socket, &(s->write));
// if (max < socket) max = socket;
// } else {
// FD_CLR(socket, &(s->write));
// }
}
int r = natDoSocketSelect(state, max, interval);
// Equivalent to:
//
// if (s->control.reader() >= 0) {
// unsigned socket = s->control.reader();
// FD_SET(socket, &(s->read));
// if (max < socket) max = socket;
// }
// timeval time = { interval / 1000, (interval % 1000) * 1000 };
// int r = ::select(max + 1, &(s->read), &(s->write), &(s->except), &time);
// if (r < 0) {
// if (errno != EINTR) {
// throw new IOException(errorString());
// }
// }
if (r > 0) {
for (SelectionKey key : keys) {
SocketChannel c = (SocketChannel)key.channel();
int socket = c.socketFD();
int ready = natUpdateReadySet(socket, key.interestOps(), state);
// Equivalent to:
//
// jint ready = 0;
//
// if (FD_ISSET(c->socket, &(s->read))) {
// if (interest & SelectionKey::OP_READ) {
// ready |= SelectionKey::OP_READ;
// }
//
// if (interest & SelectionKey::OP_ACCEPT) {
// ready |= SelectionKey::OP_ACCEPT;
// }
// }
//
// if ((interest & SelectionKey::OP_WRITE)
// and FD_ISSET(c->socket, &(s->write))) {
// ready |= SelectionKey::OP_WRITE;
// }
key.readyOps(ready);
if (ready != 0) {
selectedKeys.add(key);
}
}
}
clearWoken();
}
public void close() {
natClose(state);
}
private static native long natInit();
private static native void natWakeup(long state);
private static native void natClearWoken(long state);
private static native void natClose(long state);
private static native void natSelectClearAll(int socket, long state);
private static native int natSelectUpdateInterestSet(int socket,
int interest,
long state,
int max);
private static native int natDoSocketSelect(long state, int max, long interval)
throws IOException;
private static native int natUpdateReadySet(int socket, int interest, long state);
}

View File

@ -0,0 +1,8 @@
package java.nio.channels;
import java.io.IOException;
import java.nio.ByteBuffer;
public interface WritableByteChannel extends Channel {
public int write(ByteBuffer b) throws IOException;
}