Add Java MDP Client API

- support mdp clients over loopback UDP
  Note this is using an environment variable to specify the port number
  In future we expect to support environments where this port is already bound
- monitor mdp sockets in a single & separate thread
This commit is contained in:
Jeremy Lakeman 2014-02-21 16:39:47 +10:30
parent 693d1e9b60
commit 20494a90d5
8 changed files with 543 additions and 1 deletions

View File

@ -0,0 +1,130 @@
package org.servalproject.servaldna;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
/**
* Created by jeremy on 20/02/14.
*/
public class ChannelSelector {
public static abstract class Handler{
public abstract SelectableChannel getChannel();
public abstract int getInterest();
public void read(){};
public void write(){};
public void accept(){};
public void connect(){};
}
private Runnable selectorThread = new Runnable(){
@Override
public void run() {
try {
while(true){
if (registerHandler!=null || unregisterHandler!=null){
synchronized (ChannelSelector.this){
try {
if (registerHandler!=null){
SelectableChannel channel = registerHandler.getChannel();
channel.configureBlocking(false);
channel.register(selector, registerHandler.getInterest(), registerHandler);
}
if (unregisterHandler!=null){
SelectableChannel channel = unregisterHandler.getChannel();
channel.keyFor(selector).cancel();
// force the cancelled key to be removed now
selector.selectNow();
}
}catch (IOException e){
e.printStackTrace();
registerException = e;
}catch (Throwable e){
e.printStackTrace();
registerException = new IOException(e.getMessage());
registerException.initCause(e);
}
unregisterHandler = null;
registerHandler=null;
ChannelSelector.this.notify();
}
}
if (selector.keys().isEmpty())
break;
if (selector.selectedKeys().isEmpty())
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
Handler h = (Handler)key.attachment();
if (key.isValid()){
if (key.isReadable())
h.read();
if (key.isWritable())
h.write();
if (key.isAcceptable())
h.accept();
if (key.isConnectable())
h.connect();
}
keys.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
running = false;
}
};
private boolean running = false;
private final Selector selector;
private Handler registerHandler;
private Handler unregisterHandler;
private IOException registerException;
public ChannelSelector() throws IOException {
selector = Selector.open();
}
public synchronized void unregister(Handler handler) throws IOException {
// since we have to worry about thread synchronization,
// pass the channel over to the selectorThread and only register it when we aren't blocking.
if (running){
unregisterHandler = handler;
selector.wakeup();
try {
this.wait();
} catch (InterruptedException e) {
}
}
IOException e = registerException;
registerException=null;
if (e!=null)
throw e;
}
public synchronized void register(Handler handler) throws IOException {
// since we have to worry about thread synchronization,
// pass the channel over to the selectorThread and only register it when we aren't blocking.
registerHandler = handler;
if (!running){
running=true;
new Thread(selectorThread).start();
}
selector.wakeup();
try {
this.wait();
} catch (InterruptedException e) {
}
IOException e = registerException;
registerException=null;
if (e!=null)
throw e;
}
}

View File

@ -0,0 +1,77 @@
package org.servalproject.servaldna;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
/**
* Created by jeremy on 21/02/14.
*/
public class MdpDnaLookup extends ChannelSelector.Handler{
private final ChannelSelector selector;
private final MdpSocket socket;
private final AsyncResult<ServalDCommand.LookupResult> results;
public MdpDnaLookup(ChannelSelector selector, AsyncResult<ServalDCommand.LookupResult> results) throws IOException {
socket = new MdpSocket();
this.selector = selector;
this.results = results;
selector.register(this);
}
public void sendRequest(SubscriberId destination, String did) throws IOException {
MdpPacket request = new MdpPacket();
if (destination.isBroadcast())
request.setFlags(MdpPacket.MDP_FLAG_NO_CRYPT);
request.setRemoteSid(destination);
request.setRemotePort(MdpPacket.MDP_PORT_DNALOOKUP);
request.payload.put(did.getBytes());
request.payload.put((byte)0);
request.payload.flip();
socket.send(request);
}
@Override
public void read() {
try {
MdpPacket response = new MdpPacket();
socket.receive(response);
byte bytes[] = new byte[response.payload.remaining()];
response.payload.get(bytes);
String resultString = new String(bytes);
String fields[] = resultString.split("\\|");
if (fields.length < 4)
throw new IOException("Expected 4 result fields");
ServalDCommand.LookupResult result = new ServalDCommand.LookupResult();
result.subscriberId = new SubscriberId(fields[0]);
result.did = fields[1];
result.name = fields[2];
result.uri = fields[3];
results.result(result);
} catch (IOException e) {
e.printStackTrace();
} catch (AbstractId.InvalidHexException e) {
e.printStackTrace();
}
}
public void close(){
try {
selector.unregister(this);
} catch (IOException e) {
e.printStackTrace();
}
socket.close();
}
@Override
public SelectableChannel getChannel() {
return socket.getChannel();
}
@Override
public int getInterest() {
return SelectionKey.OP_READ;
}
}

View File

@ -0,0 +1,129 @@
package org.servalproject.servaldna;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.DatagramChannel;
/**
* Created by jeremy on 17/02/14.
*/
public class MdpPacket {
private ByteBuffer buff;
public ByteBuffer payload;
private static final int MDP_MTU = 1200;
private static final int HEADER_LEN = 32+4+32+4+1+1+1;
public static final byte MDP_FLAG_NO_CRYPT = (1<<0);
public static final byte MDP_FLAG_NO_SIGN = (1<<1);
public static final byte MDP_FLAG_BIND = (1<<2);
public static final byte MDP_FLAG_CLOSE = (1<<3);
public static final byte MDP_FLAG_ERROR = (1<<4);
public static final int MDP_PORT_ECHO = 7;
public static final int MDP_PORT_DNALOOKUP = 10;
public MdpPacket(){
buff = ByteBuffer.allocate(MDP_MTU);
buff.order(ByteOrder.nativeOrder());
buff.position(HEADER_LEN);
payload = buff.slice();
}
public MdpPacket prepareReply(){
MdpPacket reply = new MdpPacket();
buff.position(0);
buff.limit(HEADER_LEN);
reply.buff.position(0);
reply.buff.put(buff);
return reply;
}
public SubscriberId getLocalSid() throws AbstractId.InvalidBinaryException {
buff.position(0);
return new SubscriberId(buff);
}
public void setLocalSid(SubscriberId local_sid){
buff.position(0);
local_sid.toByteBuffer(buff);
}
public int getLocalPort(){
buff.position(32);
return buff.getInt();
}
public void setLocalPort(int local_port){
buff.position(32);
buff.putInt(local_port);
}
public SubscriberId getRemoteSid() throws AbstractId.InvalidBinaryException {
buff.position(32+4);
return new SubscriberId(buff);
}
public void setRemoteSid(SubscriberId local_sid){
buff.position(32+4);
local_sid.toByteBuffer(buff);
}
public int getRemotePort(){
buff.position(32+4+32);
return buff.getInt();
}
public void setRemotePort(int remote_port){
buff.position(32+4+32);
buff.putInt(remote_port);
}
public byte getFlags(){
buff.position(32+4+32+4);
return buff.get();
}
public void setFlags(byte flags){
buff.position(32+4+32+4);
buff.put(flags);
}
public byte getQOS(){
buff.position(32+4+32+4+1);
return buff.get();
}
public void setQOS(byte qos){
buff.position(32+4+32+4+1);
buff.put(qos);
}
public byte getTTL(){
buff.position(32+4+32+4+1+1);
return buff.get();
}
public void setTTL(byte ttl){
buff.position(32+4+32+4+1+1);
buff.put(ttl);
}
public void send(DatagramChannel channel) throws IOException {
buff.clear();
buff.limit(HEADER_LEN+payload.limit());
channel.write(buff);
}
public void receive(DatagramChannel channel) throws IOException {
buff.clear();
channel.read(buff);
buff.flip();
if (buff.remaining() < HEADER_LEN)
throw new MdpSocket.MdpError("Received packet is too short");
payload.position(0);
payload.limit(buff.limit() - HEADER_LEN);
}
}

View File

@ -0,0 +1,102 @@
package org.servalproject.servaldna;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
/**
* Created by jeremy on 17/02/14.
*/
public class MdpSocket{
private DatagramChannel channel;
private SubscriberId sid;
private int port;
private static final InetAddress loopback;
public static int loopbackMdpPort =0;
static {
InetAddress local=null;
try {
local = Inet4Address.getByAddress(new byte[]{127, 0, 0, 1});
} catch (UnknownHostException e) {
e.printStackTrace();
}
loopback = local;
}
public MdpSocket() throws IOException {
this(SubscriberId.ANY, 0);
}
public MdpSocket(int port) throws IOException {
this(SubscriberId.ANY, port);
}
public MdpSocket(SubscriberId sid, int port) throws IOException {
if (loopbackMdpPort==0)
throw new IOException("Loopback MDP port has not been set");
channel = DatagramChannel.open();
channel.connect(new InetSocketAddress(loopback, loopbackMdpPort));
MdpPacket packet = new MdpPacket();
packet.setLocalSid(sid);
packet.setLocalPort(port);
packet.setFlags(MdpPacket.MDP_FLAG_BIND);
packet.payload.flip();
packet.send(channel);
receive(packet);
try {
this.sid = packet.getLocalSid();
} catch (AbstractId.InvalidBinaryException e) {
throw new MdpError(e);
}
this.port = packet.getLocalPort();
}
public SelectableChannel getChannel(){
return channel;
}
public void send(MdpPacket packet) throws IOException {
packet.setLocalSid(this.sid);
packet.setLocalPort(this.port);
packet.send(channel);
}
public void receive(MdpPacket packet) throws IOException {
packet.receive(channel);
if ((packet.getFlags() & MdpPacket.MDP_FLAG_ERROR)!=0)
throw new MdpError("Unspecified error reported by server");
}
public void close() {
try {
MdpPacket p = new MdpPacket();
p.payload.flip();
p.setFlags(MdpPacket.MDP_FLAG_CLOSE);
send(p);
} catch (IOException e) {
e.printStackTrace();
}
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static class MdpError extends IOException{
public MdpError(String msg){
super(msg);
}
public MdpError(String msg, Throwable cause){
super(msg);
this.initCause(cause);
}
public MdpError(Throwable cause){
super();
this.initCause(cause);
}
}
}

View File

@ -233,6 +233,7 @@ public class ServalDCommand
}
public static class LookupResult extends JniResult {
public SubscriberId subscriberId;
public String did;
public String name;
public String uri;
@ -246,6 +247,16 @@ public class ServalDCommand
if (this.columnName.equals("uri"))
this.uri = value;
}
@Override
public String toString() {
return "LookupResult{" +
"subscriberId=" + subscriberId +
", did='" + did + '\'' +
", name='" + name + '\'' +
", uri='" + uri + '\'' +
'}';
}
}
public static int dnaLookup(AsyncResult<LookupResult> results, String did, int timeout) throws ServalDFailureException {

View File

@ -1,9 +1,16 @@
package org.servalproject.test;
import org.servalproject.servaldna.AsyncResult;
import org.servalproject.servaldna.ChannelSelector;
import org.servalproject.servaldna.MdpDnaLookup;
import org.servalproject.servaldna.MdpSocket;
import org.servalproject.servaldna.ResultList;
import org.servalproject.servaldna.ServalDCommand;
import org.servalproject.servaldna.ServalDFailureException;
import org.servalproject.servaldna.SubscriberId;
import java.io.IOException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
@ -12,6 +19,10 @@ import java.util.List;
*/
public class CommandLine {
static void log(String msg){
System.out.println(new Date().toString()+" "+msg);
}
static void getPeers() throws ServalDFailureException {
List<ServalDCommand.IdentityResult> peers = new LinkedList<ServalDCommand.IdentityResult>();
ServalDCommand.idPeers(new ResultList<ServalDCommand.IdentityResult>(peers));
@ -22,6 +33,20 @@ public class CommandLine {
}
}
static void lookup(String did) throws IOException, InterruptedException, ServalDFailureException {
MdpSocket.loopbackMdpPort = Integer.parseInt(System.getenv("SERVAL_MDP_INET_PORT"));
ChannelSelector selector = new ChannelSelector();
MdpDnaLookup lookup = new MdpDnaLookup(selector, new AsyncResult<ServalDCommand.LookupResult>() {
@Override
public void result(ServalDCommand.LookupResult nextResult) {
System.out.println(nextResult.toString());
}
});
lookup.sendRequest(SubscriberId.broadcastSid, did);
Thread.sleep(3000);
lookup.close();
}
public static void main(String... args){
if (args.length<1)
return;
@ -35,6 +60,8 @@ public class CommandLine {
result=ServalDCommand.serverStop();
if (methodName.equals("peers"))
getPeers();
if (methodName.equals("lookup"))
lookup(args.length>=2 ? args[1] : "");
if (result!=null)
System.out.println(result.toString());

View File

@ -81,6 +81,11 @@ static struct sched_ent mdp_sock2 = {
.stats = &mdp_stats2,
.poll={.fd = -1},
};
static struct sched_ent mdp_sock2_inet = {
.function = mdp_poll2,
.stats = &mdp_stats2,
.poll={.fd = -1},
};
static int overlay_saw_mdp_frame(
struct internal_mdp_header *header,
@ -171,6 +176,28 @@ int overlay_mdp_setup_sockets()
mdp_sock2.poll.events = POLLIN;
watch(&mdp_sock2);
}
if (mdp_sock2_inet.poll.fd == -1) {
const char *port_str = getenv("SERVAL_MDP_INET_PORT");
if (port_str){
int fd = esocket(PF_INET, SOCK_DGRAM, 0);
if (fd>=0){
struct socket_address addr;
addr.addrlen = sizeof(addr.inet);
addr.inet.sin_family = AF_INET;
addr.inet.sin_port = htons(atoi(port_str));
addr.inet.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
if (socket_bind(fd, &addr)==0){
mdp_sock2_inet.poll.fd = fd;
mdp_sock2_inet.poll.events = POLLIN;
watch(&mdp_sock2_inet);
INFOF("Socket mdp.2.inet: fd=%d %s", fd, alloca_socket_address(&addr));
}else{
close(fd);
}
}
}
}
return 0;
}
@ -1460,8 +1487,21 @@ static int mdp_send2(const struct socket_address *client, const struct mdp_heade
.msg_iovlen=2,
};
if (sendmsg(mdp_sock2.poll.fd, &hdr, 0)<0)
int fd=-1;
switch(client->addr.sa_family){
case AF_UNIX:
fd = mdp_sock2.poll.fd;
break;
case AF_INET:
fd = mdp_sock2_inet.poll.fd;
break;
}
if (fd==-1)
return WHYF("Unhandled client family %d", client->addr.sa_family);
if (sendmsg(fd, &hdr, 0)<0)
return WHY_perror("sendmsg");
return 0;
}

View File

@ -109,4 +109,30 @@ teardown_PeerList() {
report_all_servald_servers
}
doc_DnaLookup="DNA Lookup via JNI MDP API"
setup_DnaLookup() {
configure_servald_server() {
add_servald_interface
executeOk_servald config set debug.mdprequests on
export SERVAL_MDP_INET_PORT="411$instance_number"
}
setup
set_instance +A
foreach_instance +A +B create_single_identity
start_servald_instances +A +B
set_instance +A
}
test_DnaLookup() {
export SERVAL_MDP_INET_PORT="411$instance_number"
execute --timeout=10 --core-backtrace java "-Djava.library.path=$LD_LIBRARY_PATH" -classpath "$PWD/classes" org.servalproject.test.CommandLine 'lookup'
assertStdoutGrep "$SIDB"
tfw_cat --stdout --stderr
}
teardown_DnaLookup() {
stop_all_servald_servers
kill_all_servald_processes
assert_no_servald_processes
report_all_servald_servers
}
runTests "$@"