From 20494a90d5cd42bfb2f8418cbb605fd1e6fda390 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Fri, 21 Feb 2014 16:39:47 +1030 Subject: [PATCH] Add Java MDP Client API - support mdp clients over loopback UDP Note this is using an environment variable to specify the port number In future we expect to support environments where this port is already bound - monitor mdp sockets in a single & separate thread --- .../servaldna/ChannelSelector.java | 130 ++++++++++++++++++ .../servalproject/servaldna/MdpDnaLookup.java | 77 +++++++++++ .../servalproject/servaldna/MdpPacket.java | 129 +++++++++++++++++ .../servalproject/servaldna/MdpSocket.java | 102 ++++++++++++++ .../servaldna/ServalDCommand.java | 11 ++ java/org/servalproject/test/CommandLine.java | 27 ++++ overlay_mdp.c | 42 +++++- tests/jni | 26 ++++ 8 files changed, 543 insertions(+), 1 deletion(-) create mode 100644 java/org/servalproject/servaldna/ChannelSelector.java create mode 100644 java/org/servalproject/servaldna/MdpDnaLookup.java create mode 100644 java/org/servalproject/servaldna/MdpPacket.java create mode 100644 java/org/servalproject/servaldna/MdpSocket.java diff --git a/java/org/servalproject/servaldna/ChannelSelector.java b/java/org/servalproject/servaldna/ChannelSelector.java new file mode 100644 index 00000000..6fca7a09 --- /dev/null +++ b/java/org/servalproject/servaldna/ChannelSelector.java @@ -0,0 +1,130 @@ +package org.servalproject.servaldna; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; + +/** + * Created by jeremy on 20/02/14. + */ +public class ChannelSelector { + + public static abstract class Handler{ + public abstract SelectableChannel getChannel(); + public abstract int getInterest(); + public void read(){}; + public void write(){}; + public void accept(){}; + public void connect(){}; + } + + private Runnable selectorThread = new Runnable(){ + @Override + public void run() { + try { + while(true){ + if (registerHandler!=null || unregisterHandler!=null){ + synchronized (ChannelSelector.this){ + try { + if (registerHandler!=null){ + SelectableChannel channel = registerHandler.getChannel(); + channel.configureBlocking(false); + channel.register(selector, registerHandler.getInterest(), registerHandler); + } + if (unregisterHandler!=null){ + SelectableChannel channel = unregisterHandler.getChannel(); + channel.keyFor(selector).cancel(); + // force the cancelled key to be removed now + selector.selectNow(); + } + }catch (IOException e){ + e.printStackTrace(); + registerException = e; + }catch (Throwable e){ + e.printStackTrace(); + registerException = new IOException(e.getMessage()); + registerException.initCause(e); + } + unregisterHandler = null; + registerHandler=null; + ChannelSelector.this.notify(); + } + } + if (selector.keys().isEmpty()) + break; + + if (selector.selectedKeys().isEmpty()) + selector.select(); + + Iterator keys = selector.selectedKeys().iterator(); + while(keys.hasNext()){ + SelectionKey key = keys.next(); + Handler h = (Handler)key.attachment(); + if (key.isValid()){ + if (key.isReadable()) + h.read(); + if (key.isWritable()) + h.write(); + if (key.isAcceptable()) + h.accept(); + if (key.isConnectable()) + h.connect(); + } + keys.remove(); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + running = false; + } + }; + private boolean running = false; + private final Selector selector; + + private Handler registerHandler; + private Handler unregisterHandler; + private IOException registerException; + + public ChannelSelector() throws IOException { + selector = Selector.open(); + } + + public synchronized void unregister(Handler handler) throws IOException { + // since we have to worry about thread synchronization, + // pass the channel over to the selectorThread and only register it when we aren't blocking. + if (running){ + unregisterHandler = handler; + selector.wakeup(); + try { + this.wait(); + } catch (InterruptedException e) { + } + } + IOException e = registerException; + registerException=null; + if (e!=null) + throw e; + } + + public synchronized void register(Handler handler) throws IOException { + // since we have to worry about thread synchronization, + // pass the channel over to the selectorThread and only register it when we aren't blocking. + registerHandler = handler; + if (!running){ + running=true; + new Thread(selectorThread).start(); + } + selector.wakeup(); + try { + this.wait(); + } catch (InterruptedException e) { + } + IOException e = registerException; + registerException=null; + if (e!=null) + throw e; + } +} diff --git a/java/org/servalproject/servaldna/MdpDnaLookup.java b/java/org/servalproject/servaldna/MdpDnaLookup.java new file mode 100644 index 00000000..65208b8a --- /dev/null +++ b/java/org/servalproject/servaldna/MdpDnaLookup.java @@ -0,0 +1,77 @@ +package org.servalproject.servaldna; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; + +/** + * Created by jeremy on 21/02/14. + */ +public class MdpDnaLookup extends ChannelSelector.Handler{ + private final ChannelSelector selector; + private final MdpSocket socket; + private final AsyncResult results; + + public MdpDnaLookup(ChannelSelector selector, AsyncResult results) throws IOException { + socket = new MdpSocket(); + this.selector = selector; + this.results = results; + selector.register(this); + } + + public void sendRequest(SubscriberId destination, String did) throws IOException { + MdpPacket request = new MdpPacket(); + if (destination.isBroadcast()) + request.setFlags(MdpPacket.MDP_FLAG_NO_CRYPT); + request.setRemoteSid(destination); + request.setRemotePort(MdpPacket.MDP_PORT_DNALOOKUP); + request.payload.put(did.getBytes()); + request.payload.put((byte)0); + request.payload.flip(); + socket.send(request); + } + + @Override + public void read() { + try { + MdpPacket response = new MdpPacket(); + socket.receive(response); + byte bytes[] = new byte[response.payload.remaining()]; + response.payload.get(bytes); + String resultString = new String(bytes); + String fields[] = resultString.split("\\|"); + if (fields.length < 4) + throw new IOException("Expected 4 result fields"); + ServalDCommand.LookupResult result = new ServalDCommand.LookupResult(); + result.subscriberId = new SubscriberId(fields[0]); + result.did = fields[1]; + result.name = fields[2]; + result.uri = fields[3]; + results.result(result); + } catch (IOException e) { + e.printStackTrace(); + } catch (AbstractId.InvalidHexException e) { + e.printStackTrace(); + } + + } + + public void close(){ + try { + selector.unregister(this); + } catch (IOException e) { + e.printStackTrace(); + } + socket.close(); + } + + @Override + public SelectableChannel getChannel() { + return socket.getChannel(); + } + + @Override + public int getInterest() { + return SelectionKey.OP_READ; + } +} diff --git a/java/org/servalproject/servaldna/MdpPacket.java b/java/org/servalproject/servaldna/MdpPacket.java new file mode 100644 index 00000000..87147eb9 --- /dev/null +++ b/java/org/servalproject/servaldna/MdpPacket.java @@ -0,0 +1,129 @@ +package org.servalproject.servaldna; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.DatagramChannel; + +/** + * Created by jeremy on 17/02/14. + */ +public class MdpPacket { + private ByteBuffer buff; + public ByteBuffer payload; + + private static final int MDP_MTU = 1200; + private static final int HEADER_LEN = 32+4+32+4+1+1+1; + + public static final byte MDP_FLAG_NO_CRYPT = (1<<0); + public static final byte MDP_FLAG_NO_SIGN = (1<<1); + public static final byte MDP_FLAG_BIND = (1<<2); + public static final byte MDP_FLAG_CLOSE = (1<<3); + public static final byte MDP_FLAG_ERROR = (1<<4); + + public static final int MDP_PORT_ECHO = 7; + public static final int MDP_PORT_DNALOOKUP = 10; + + public MdpPacket(){ + buff = ByteBuffer.allocate(MDP_MTU); + buff.order(ByteOrder.nativeOrder()); + buff.position(HEADER_LEN); + payload = buff.slice(); + } + + public MdpPacket prepareReply(){ + MdpPacket reply = new MdpPacket(); + buff.position(0); + buff.limit(HEADER_LEN); + reply.buff.position(0); + reply.buff.put(buff); + return reply; + } + + public SubscriberId getLocalSid() throws AbstractId.InvalidBinaryException { + buff.position(0); + return new SubscriberId(buff); + } + + public void setLocalSid(SubscriberId local_sid){ + buff.position(0); + local_sid.toByteBuffer(buff); + } + + public int getLocalPort(){ + buff.position(32); + return buff.getInt(); + } + + public void setLocalPort(int local_port){ + buff.position(32); + buff.putInt(local_port); + } + + public SubscriberId getRemoteSid() throws AbstractId.InvalidBinaryException { + buff.position(32+4); + return new SubscriberId(buff); + } + + public void setRemoteSid(SubscriberId local_sid){ + buff.position(32+4); + local_sid.toByteBuffer(buff); + } + + public int getRemotePort(){ + buff.position(32+4+32); + return buff.getInt(); + } + + public void setRemotePort(int remote_port){ + buff.position(32+4+32); + buff.putInt(remote_port); + } + + public byte getFlags(){ + buff.position(32+4+32+4); + return buff.get(); + } + + public void setFlags(byte flags){ + buff.position(32+4+32+4); + buff.put(flags); + } + + public byte getQOS(){ + buff.position(32+4+32+4+1); + return buff.get(); + } + + public void setQOS(byte qos){ + buff.position(32+4+32+4+1); + buff.put(qos); + } + + public byte getTTL(){ + buff.position(32+4+32+4+1+1); + return buff.get(); + } + + public void setTTL(byte ttl){ + buff.position(32+4+32+4+1+1); + buff.put(ttl); + } + + public void send(DatagramChannel channel) throws IOException { + buff.clear(); + buff.limit(HEADER_LEN+payload.limit()); + channel.write(buff); + } + + public void receive(DatagramChannel channel) throws IOException { + buff.clear(); + channel.read(buff); + buff.flip(); + if (buff.remaining() < HEADER_LEN) + throw new MdpSocket.MdpError("Received packet is too short"); + payload.position(0); + payload.limit(buff.limit() - HEADER_LEN); + } + +} diff --git a/java/org/servalproject/servaldna/MdpSocket.java b/java/org/servalproject/servaldna/MdpSocket.java new file mode 100644 index 00000000..582fd76f --- /dev/null +++ b/java/org/servalproject/servaldna/MdpSocket.java @@ -0,0 +1,102 @@ +package org.servalproject.servaldna; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; + +/** + * Created by jeremy on 17/02/14. + */ +public class MdpSocket{ + private DatagramChannel channel; + private SubscriberId sid; + private int port; + + private static final InetAddress loopback; + public static int loopbackMdpPort =0; + static { + InetAddress local=null; + try { + local = Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + loopback = local; + } + + public MdpSocket() throws IOException { + this(SubscriberId.ANY, 0); + } + public MdpSocket(int port) throws IOException { + this(SubscriberId.ANY, port); + } + public MdpSocket(SubscriberId sid, int port) throws IOException { + if (loopbackMdpPort==0) + throw new IOException("Loopback MDP port has not been set"); + channel = DatagramChannel.open(); + channel.connect(new InetSocketAddress(loopback, loopbackMdpPort)); + MdpPacket packet = new MdpPacket(); + packet.setLocalSid(sid); + packet.setLocalPort(port); + packet.setFlags(MdpPacket.MDP_FLAG_BIND); + packet.payload.flip(); + packet.send(channel); + receive(packet); + try { + this.sid = packet.getLocalSid(); + } catch (AbstractId.InvalidBinaryException e) { + throw new MdpError(e); + } + this.port = packet.getLocalPort(); + } + + public SelectableChannel getChannel(){ + return channel; + } + + public void send(MdpPacket packet) throws IOException { + packet.setLocalSid(this.sid); + packet.setLocalPort(this.port); + packet.send(channel); + } + + public void receive(MdpPacket packet) throws IOException { + packet.receive(channel); + if ((packet.getFlags() & MdpPacket.MDP_FLAG_ERROR)!=0) + throw new MdpError("Unspecified error reported by server"); + } + + public void close() { + try { + MdpPacket p = new MdpPacket(); + p.payload.flip(); + p.setFlags(MdpPacket.MDP_FLAG_CLOSE); + send(p); + } catch (IOException e) { + e.printStackTrace(); + } + try { + channel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static class MdpError extends IOException{ + public MdpError(String msg){ + super(msg); + } + public MdpError(String msg, Throwable cause){ + super(msg); + this.initCause(cause); + } + public MdpError(Throwable cause){ + super(); + this.initCause(cause); + } + } +} diff --git a/java/org/servalproject/servaldna/ServalDCommand.java b/java/org/servalproject/servaldna/ServalDCommand.java index 315336e1..7f113f19 100644 --- a/java/org/servalproject/servaldna/ServalDCommand.java +++ b/java/org/servalproject/servaldna/ServalDCommand.java @@ -233,6 +233,7 @@ public class ServalDCommand } public static class LookupResult extends JniResult { + public SubscriberId subscriberId; public String did; public String name; public String uri; @@ -246,6 +247,16 @@ public class ServalDCommand if (this.columnName.equals("uri")) this.uri = value; } + + @Override + public String toString() { + return "LookupResult{" + + "subscriberId=" + subscriberId + + ", did='" + did + '\'' + + ", name='" + name + '\'' + + ", uri='" + uri + '\'' + + '}'; + } } public static int dnaLookup(AsyncResult results, String did, int timeout) throws ServalDFailureException { diff --git a/java/org/servalproject/test/CommandLine.java b/java/org/servalproject/test/CommandLine.java index 635a7cd0..fefb53f9 100644 --- a/java/org/servalproject/test/CommandLine.java +++ b/java/org/servalproject/test/CommandLine.java @@ -1,9 +1,16 @@ package org.servalproject.test; +import org.servalproject.servaldna.AsyncResult; +import org.servalproject.servaldna.ChannelSelector; +import org.servalproject.servaldna.MdpDnaLookup; +import org.servalproject.servaldna.MdpSocket; import org.servalproject.servaldna.ResultList; import org.servalproject.servaldna.ServalDCommand; import org.servalproject.servaldna.ServalDFailureException; +import org.servalproject.servaldna.SubscriberId; +import java.io.IOException; +import java.util.Date; import java.util.LinkedList; import java.util.List; @@ -12,6 +19,10 @@ import java.util.List; */ public class CommandLine { + static void log(String msg){ + System.out.println(new Date().toString()+" "+msg); + } + static void getPeers() throws ServalDFailureException { List peers = new LinkedList(); ServalDCommand.idPeers(new ResultList(peers)); @@ -22,6 +33,20 @@ public class CommandLine { } } + static void lookup(String did) throws IOException, InterruptedException, ServalDFailureException { + MdpSocket.loopbackMdpPort = Integer.parseInt(System.getenv("SERVAL_MDP_INET_PORT")); + ChannelSelector selector = new ChannelSelector(); + MdpDnaLookup lookup = new MdpDnaLookup(selector, new AsyncResult() { + @Override + public void result(ServalDCommand.LookupResult nextResult) { + System.out.println(nextResult.toString()); + } + }); + lookup.sendRequest(SubscriberId.broadcastSid, did); + Thread.sleep(3000); + lookup.close(); + } + public static void main(String... args){ if (args.length<1) return; @@ -35,6 +60,8 @@ public class CommandLine { result=ServalDCommand.serverStop(); if (methodName.equals("peers")) getPeers(); + if (methodName.equals("lookup")) + lookup(args.length>=2 ? args[1] : ""); if (result!=null) System.out.println(result.toString()); diff --git a/overlay_mdp.c b/overlay_mdp.c index c26d99b3..094bf749 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -81,6 +81,11 @@ static struct sched_ent mdp_sock2 = { .stats = &mdp_stats2, .poll={.fd = -1}, }; +static struct sched_ent mdp_sock2_inet = { + .function = mdp_poll2, + .stats = &mdp_stats2, + .poll={.fd = -1}, +}; static int overlay_saw_mdp_frame( struct internal_mdp_header *header, @@ -171,6 +176,28 @@ int overlay_mdp_setup_sockets() mdp_sock2.poll.events = POLLIN; watch(&mdp_sock2); } + + if (mdp_sock2_inet.poll.fd == -1) { + const char *port_str = getenv("SERVAL_MDP_INET_PORT"); + if (port_str){ + int fd = esocket(PF_INET, SOCK_DGRAM, 0); + if (fd>=0){ + struct socket_address addr; + addr.addrlen = sizeof(addr.inet); + addr.inet.sin_family = AF_INET; + addr.inet.sin_port = htons(atoi(port_str)); + addr.inet.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if (socket_bind(fd, &addr)==0){ + mdp_sock2_inet.poll.fd = fd; + mdp_sock2_inet.poll.events = POLLIN; + watch(&mdp_sock2_inet); + INFOF("Socket mdp.2.inet: fd=%d %s", fd, alloca_socket_address(&addr)); + }else{ + close(fd); + } + } + } + } return 0; } @@ -1460,8 +1487,21 @@ static int mdp_send2(const struct socket_address *client, const struct mdp_heade .msg_iovlen=2, }; - if (sendmsg(mdp_sock2.poll.fd, &hdr, 0)<0) + int fd=-1; + switch(client->addr.sa_family){ + case AF_UNIX: + fd = mdp_sock2.poll.fd; + break; + case AF_INET: + fd = mdp_sock2_inet.poll.fd; + break; + } + if (fd==-1) + return WHYF("Unhandled client family %d", client->addr.sa_family); + + if (sendmsg(fd, &hdr, 0)<0) return WHY_perror("sendmsg"); + return 0; } diff --git a/tests/jni b/tests/jni index 88edd45f..4a2bd9b9 100755 --- a/tests/jni +++ b/tests/jni @@ -109,4 +109,30 @@ teardown_PeerList() { report_all_servald_servers } +doc_DnaLookup="DNA Lookup via JNI MDP API" +setup_DnaLookup() { + configure_servald_server() { + add_servald_interface + executeOk_servald config set debug.mdprequests on + export SERVAL_MDP_INET_PORT="411$instance_number" + } + setup + set_instance +A + foreach_instance +A +B create_single_identity + start_servald_instances +A +B + set_instance +A +} +test_DnaLookup() { + export SERVAL_MDP_INET_PORT="411$instance_number" + execute --timeout=10 --core-backtrace java "-Djava.library.path=$LD_LIBRARY_PATH" -classpath "$PWD/classes" org.servalproject.test.CommandLine 'lookup' + assertStdoutGrep "$SIDB" + tfw_cat --stdout --stderr +} +teardown_DnaLookup() { + stop_all_servald_servers + kill_all_servald_processes + assert_no_servald_processes + report_all_servald_servers +} + runTests "$@"