From 71e7a6d79644edb4cb514da0a4a743eea3d8c527 Mon Sep 17 00:00:00 2001 From: Eric Scharff Date: Sun, 7 Oct 2007 17:15:29 -0600 Subject: [PATCH] Simple non-blocking client and server example programs. The client actually simulates blocking IO by implementing a Socket OutputStream, and sends a file to a port. The server listens on a port and dumps the output to test files. Together, these classes can be used to send a file from a client to a server machine over a socket. --- test/SendFile.java | 71 ++++++++++++++++++++++++++++++++++ test/SendServer.java | 90 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 test/SendFile.java create mode 100644 test/SendServer.java diff --git a/test/SendFile.java b/test/SendFile.java new file mode 100644 index 0000000000..ad525a1e05 --- /dev/null +++ b/test/SendFile.java @@ -0,0 +1,71 @@ +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.io.IOException; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.io.OutputStream; +import java.io.FileInputStream; + +public class SendFile { + private static class SocketOutputStream extends OutputStream { + private final SocketChannel channel; + private final Selector selector; + public SocketOutputStream(String host, int port) throws Exception { + channel = SocketChannel.open(); + channel.connect(new InetSocketAddress(host, port)); + channel.configureBlocking(false); + selector = Selector.open(); + channel.register(selector, SelectionKey.OP_WRITE, null); + } + + public void close() throws IOException { + channel.close(); + } + + public void write(int c) { + throw new RuntimeException("Do not use!"); + } + public void write(byte[] buffer, int offset, int length) + throws IOException { + ByteBuffer buf = ByteBuffer.wrap(buffer); + buf.position(offset); + buf.limit(offset+length); + while (buf.hasRemaining()) { + selector.select(10000); + for (SelectionKey key : selector.selectedKeys()) { + if (key.isWritable() && (key.channel() == channel)) { + channel.write(buf); + } + } + } + } + } + + public static void sendFile(String file, String host, int port) + throws Exception { + System.out.println("Sending " + file); + OutputStream os = new SocketOutputStream(host, port); + FileInputStream is = new FileInputStream(file); + byte[] buf = new byte[16384]; + int count=-1; + while ((count = is.read(buf)) >= 0) { + os.write(buf, 0, count); + } + is.close(); + os.close(); + } + + public static void main(String args[]) { + if (args.length != 2) { + System.out.println("Usage: SendFile file host"); + } else { + try { + sendFile(args[0], args[1], 8988); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + } +} + diff --git a/test/SendServer.java b/test/SendServer.java new file mode 100644 index 0000000000..e2877d3b22 --- /dev/null +++ b/test/SendServer.java @@ -0,0 +1,90 @@ +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +public class SendServer { + private static char cIndex = 'A'; + private static ByteBuffer inBuf = ByteBuffer.allocate(8192); + + private static void dumpByteBuffer(char note, ByteBuffer buf) { + System.out.println(note + ": Buffer position: " + buf.position() + " limit: " + + buf.limit() + " capacity: " + buf.capacity() + " remaining: " + + buf.remaining()); + } + + private static class Connection { + private final char myIndex; + private final java.io.FileOutputStream fos; + + public Connection() throws Exception { + myIndex = cIndex++; + fos = new java.io.FileOutputStream("dump." + myIndex); + } + + public void handleRead(SocketChannel channel) throws Exception { + int count = -1; + while ((count = channel.read(inBuf)) > 0) { + System.out.println(myIndex + ": read " + count); + } + inBuf.flip(); + fos.write(inBuf.array(), inBuf.arrayOffset()+inBuf.position(), inBuf.remaining()); + inBuf.position(inBuf.limit()); + if (count < 0) { + System.out.println(myIndex + ": Closing channel"); + fos.close(); + channel.close(); + } +// dumpByteBuffer(myIndex, inBuf); + inBuf.compact(); + } + } + + public void runMainLoop() throws Exception { + boolean keepRunning = true; + int port = 8988; + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + try { + serverChannel.configureBlocking(false); + serverChannel.socket().bind(new InetSocketAddress("0.0.0.0", port)); + Selector selector = Selector.open(); + serverChannel.register(selector, SelectionKey.OP_ACCEPT, null); + while (keepRunning) { + System.out.println("Running main loop"); + selector.select(10000); + for (SelectionKey key : selector.selectedKeys()) { + if (key.isAcceptable()) { + System.out.println("Accepting new connection"); + SocketChannel c = ((ServerSocketChannel) key.channel()).accept(); + if (c != null) { + c.configureBlocking(false); + c.register(selector, SelectionKey.OP_READ, new Connection()); + } + } else { + SocketChannel c = (SocketChannel) key.channel(); + if (c.isOpen() && key.isReadable()) { + Connection connection = (Connection)key.attachment(); + connection.handleRead(c); + } + } + } + selector.selectedKeys().clear(); + } + } finally { + serverChannel.close(); + } + } + + public static void main(String args[]) { + try { + System.out.println("Starting server"); + if (args.length > 0) { + new SendServer().runMainLoop(); + } + } catch (Exception ex) { + ex.printStackTrace(); + } + } +}