implement java.nio.channels.DatagramChannel

This commit is contained in:
Joel Dice 2012-07-03 11:24:05 -06:00
parent bc1c797911
commit c602f4673b
5 changed files with 339 additions and 6 deletions

View File

@ -257,7 +257,7 @@ setTcpNoDelay(JNIEnv* e, int d, bool on)
}
void
doListen(JNIEnv* e, int s, sockaddr_in* address)
doBind(JNIEnv* e, int s, sockaddr_in* address)
{
int opt = 1;
int r = ::setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
@ -272,8 +272,12 @@ doListen(JNIEnv* e, int s, sockaddr_in* address)
throwIOException(e);
return;
}
}
r = ::listen(s, 100);
void
doListen(JNIEnv* e, int s)
{
int r = ::listen(s, 100);
if (r != 0) {
throwIOException(e);
}
@ -344,9 +348,9 @@ doWrite(int fd, const void* buffer, size_t count)
}
int
makeSocket(JNIEnv* e)
makeSocket(JNIEnv* e, int type = SOCK_STREAM, int protocol = IPPROTO_TCP)
{
int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
int s = ::socket(AF_INET, type, protocol);
if (s < 0) {
throwIOException(e);
return s;
@ -378,7 +382,28 @@ Java_java_nio_channels_ServerSocketChannel_natDoListen(JNIEnv *e,
init(e, &address, host, port);
if (e->ExceptionCheck()) return 0;
::doListen(e, s, &address);
::doBind(e, s, &address);
if (e->ExceptionCheck()) return 0;
::doListen(e, s);
return s;
}
extern "C" JNIEXPORT jint JNICALL
Java_java_nio_channels_DatagramChannel_bind(JNIEnv *e,
jclass,
jstring host,
jint port)
{
int s = makeSocket(e, SOCK_DGRAM, IPPROTO_UDP);
if (s < 0) return s;
if (e->ExceptionCheck()) return 0;
sockaddr_in address;
init(e, &address, host, port);
if (e->ExceptionCheck()) return 0;
::doBind(e, s, &address);
return s;
}
@ -391,6 +416,16 @@ Java_java_nio_channels_SocketChannel_configureBlocking(JNIEnv *e,
setBlocking(e, socket, blocking);
}
extern "C" JNIEXPORT void JNICALL
Java_java_nio_channels_DatagramChannel_configureBlocking(JNIEnv* e,
jclass c,
jint socket,
jboolean blocking)
{
return Java_java_nio_channels_SocketChannel_configureBlocking
(e, c, socket, blocking);
}
extern "C" JNIEXPORT void JNICALL
Java_java_nio_channels_SocketChannel_natSetTcpNoDelay(JNIEnv *e,
jclass,
@ -423,6 +458,24 @@ Java_java_nio_channels_SocketChannel_natDoConnect(JNIEnv *e,
return s;
}
extern "C" JNIEXPORT jint JNICALL
Java_java_nio_channels_DatagramChannel_connect(JNIEnv *e,
jclass,
jstring host,
jint port)
{
int s = makeSocket(e, SOCK_DGRAM, IPPROTO_UDP);
if (e->ExceptionCheck()) return 0;
sockaddr_in address;
init(e, &address, host, port);
if (e->ExceptionCheck()) return 0;
::doConnect(e, s, &address);
return s;
}
extern "C" JNIEXPORT void JNICALL
Java_java_nio_channels_SocketChannel_natFinishConnect(JNIEnv *e,
jclass,
@ -475,6 +528,19 @@ Java_java_nio_channels_SocketChannel_natRead(JNIEnv *e,
return r;
}
extern "C" JNIEXPORT jint JNICALL
Java_java_nio_channels_DatagramChannel_read(JNIEnv* e,
jclass c,
jint socket,
jbyteArray buffer,
jint offset,
jint length,
jboolean blocking)
{
return Java_java_nio_channels_SocketChannel_natRead
(e, c, socket, buffer, offset, length, blocking);
}
extern "C" JNIEXPORT jint JNICALL
Java_java_nio_channels_SocketChannel_natWrite(JNIEnv *e,
jclass,
@ -515,6 +581,18 @@ Java_java_nio_channels_SocketChannel_natWrite(JNIEnv *e,
return r;
}
extern "C" JNIEXPORT jint JNICALL
Java_java_nio_channels_DatagramChannel_write(JNIEnv* e,
jclass c,
jint socket,
jbyteArray buffer,
jint offset,
jint length,
jboolean blocking)
{
return Java_java_nio_channels_SocketChannel_natWrite
(e, c, socket, buffer, offset, length, blocking);
}
extern "C" JNIEXPORT void JNICALL
Java_java_nio_channels_SocketChannel_natThrowWriteError(JNIEnv *e,
@ -554,18 +632,29 @@ class Pipe {
address.sin_family = AF_INET;
address.sin_port = 0;
address.sin_addr.s_addr = inet_addr("127.0.0.1"); //INADDR_LOOPBACK;
listener_ = makeSocket(e);
if (e->ExceptionCheck()) return;
setBlocking(e, listener_, false);
::doListen(e, listener_, &address);
::doBind(e, listener_, &address);
if (e->ExceptionCheck()) return;
::doListen(e, listener_);
if (e->ExceptionCheck()) return;
socklen_t length = sizeof(sockaddr_in);
int r = getsockname(listener_, reinterpret_cast<sockaddr*>(&address),
&length);
if (r) {
throwIOException(e);
return;
}
writer_ = makeSocket(e);
if (e->ExceptionCheck()) return;
setBlocking(e, writer_, true);
connected_ = ::doConnect(e, writer_, &address);
}
@ -663,6 +752,7 @@ Java_java_nio_channels_SocketSelector_natInit(JNIEnv* e, jclass)
void *mem = malloc(sizeof(SelectorState));
if (mem) {
SelectorState *s = new (mem) SelectorState(e);
if (e->ExceptionCheck()) return 0;
if (s) {
FD_ZERO(&(s->read));

View File

@ -0,0 +1,13 @@
/* Copyright (c) 2012, Avian Contributors
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear
in all copies.
There is NO WARRANTY for this software. See license.txt for
details. */
package java.net;
public interface ProtocolFamily { }

View File

@ -0,0 +1,15 @@
/* Copyright (c) 2012, Avian Contributors
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear
in all copies.
There is NO WARRANTY for this software. See license.txt for
details. */
package java.net;
public enum StandardProtocolFamily implements ProtocolFamily {
INET;
}

View File

@ -0,0 +1,127 @@
/* Copyright (c) 2012, Avian Contributors
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear
in all copies.
There is NO WARRANTY for this software. See license.txt for
details. */
package java.nio.channels;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.net.ProtocolFamily;
import java.net.Socket;
import java.net.StandardProtocolFamily;
public class DatagramChannel extends SelectableChannel
implements ReadableByteChannel, WritableByteChannel
{
public static final int InvalidSocket = -1;
private int socket = InvalidSocket;
private boolean blocking = true;
public DatagramChannel configureBlocking(boolean v) throws IOException {
blocking = v;
if (socket != InvalidSocket) {
configureBlocking(socket, v);
}
return this;
}
int socketFD() {
return socket;
}
void handleReadyOps(int ops) {
// ignore
}
public static DatagramChannel open(ProtocolFamily family)
throws IOException
{
if (family.equals(StandardProtocolFamily.INET)) {
Socket.init();
return new DatagramChannel();
} else {
throw new UnsupportedOperationException();
}
}
public DatagramChannel bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress;
try {
inetAddress = (InetSocketAddress) address;
} catch (ClassCastException e) {
throw new UnsupportedAddressTypeException();
}
socket = bind(inetAddress.getHostName(), inetAddress.getPort());
return this;
}
public DatagramChannel connect(SocketAddress address) throws IOException {
InetSocketAddress inetAddress;
try {
inetAddress = (InetSocketAddress) address;
} catch (ClassCastException e) {
throw new UnsupportedAddressTypeException();
}
socket = connect(inetAddress.getHostName(), inetAddress.getPort());
return this;
}
public int write(ByteBuffer b) throws IOException {
if (b.remaining() == 0) return 0;
byte[] array = b.array();
if (array == null) throw new NullPointerException();
int c = write
(socket, array, b.arrayOffset() + b.position(), b.remaining(), blocking);
if (c > 0) {
b.position(b.position() + c);
}
return c;
}
public int read(ByteBuffer b) throws IOException {
if (b.remaining() == 0) return 0;
byte[] array = b.array();
if (array == null) throw new NullPointerException();
int c = read
(socket, array, b.arrayOffset() + b.position(), b.remaining(), blocking);
if (c > 0) {
b.position(b.position() + c);
}
return c;
}
private static native void configureBlocking(int socket, boolean blocking)
throws IOException;
private static native int bind(String hostname, int port)
throws IOException;
private static native int connect(String hostname, int port)
throws IOException;
private static native int write(int socket, byte[] array, int offset,
int length, boolean blocking)
throws IOException;
private static native int read(int socket, byte[] array, int offset,
int length, boolean blocking)
throws IOException;
}

88
test/Datagrams.java Normal file
View File

@ -0,0 +1,88 @@
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.net.ProtocolFamily;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
public class Datagrams {
private static void expect(boolean v) {
if (! v) throw new RuntimeException();
}
private static boolean equal(byte[] a, int aOffset, byte[] b, int bOffset,
int length)
{
for (int i = 0; i < length; ++i) {
if (a[aOffset + i] != b[bOffset + i]) return false;
}
return true;
}
public static void main(String[] args) throws Exception {
final String Hostname = "localhost";
final int Port = 22043;
final SocketAddress Address = new InetSocketAddress(Hostname, Port);
final byte[] Message = "hello, world!".getBytes();
DatagramChannel out = DatagramChannel.open(StandardProtocolFamily.INET);
try {
out.configureBlocking(false);
out.connect(Address);
DatagramChannel in = DatagramChannel.open(StandardProtocolFamily.INET);
try {
in.configureBlocking(false);
in.bind(Address);
Selector selector = Selector.open();
try {
SelectionKey outKey = out.register
(selector, SelectionKey.OP_WRITE, null);
SelectionKey inKey = in.register
(selector, SelectionKey.OP_READ, null);
int state = 0;
ByteBuffer inBuffer = ByteBuffer.allocate(Message.length);
loop: while (true) {
selector.select();
switch (state) {
case 0: {
if (outKey.isWritable()) {
out.write(ByteBuffer.wrap(Message));
state = 1;
}
} break;
case 1: {
if (inKey.isReadable()) {
in.read(inBuffer);
if (! inBuffer.hasRemaining()) {
expect(equal(inBuffer.array(),
inBuffer.arrayOffset(),
Message,
0,
Message.length));
break loop;
}
}
} break;
default: throw new RuntimeException();
}
}
} finally {
selector.close();
}
} finally {
in.close();
}
} finally {
out.close();
}
}
}