Create extensible service discovery protocol, with initial basic implementation

This commit is contained in:
Jeremy Lakeman 2014-05-09 10:02:44 +09:30
parent 4204973052
commit 672104bdd8
10 changed files with 279 additions and 39 deletions

View File

@ -3235,7 +3235,7 @@ struct cli_schema command_line_options[]={
"Run memory speed test"},
{app_byteorder_test,{"test","byteorder",NULL}, 0,
"Run byte order handling test"},
{app_msp_connection,{"msp", "listen", "[--once]", "[--forward=<local_port>]", "<port>", NULL}, 0,
{app_msp_connection,{"msp", "listen", "[--once]", "[--forward=<local_port>]", "[--service=<service_name>]", "<port>", NULL}, 0,
"Listen for incoming connections"},
{app_msp_connection,{"msp", "connect", "[--once]", "[--forward=<local_port>]", "<sid>", "<port>", NULL}, 0,
"Connect to a remote party"},

View File

@ -117,6 +117,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MDP_PORT_ECHO 7
#define MDP_PORT_TRACE 8
#define MDP_PORT_DNALOOKUP 10
#define MDP_PORT_SERVICE_DISCOVERY 11
#define MDP_PORT_VOMP 12
#define MDP_PORT_RHIZOME_REQUEST 13
#define MDP_PORT_RHIZOME_RESPONSE 14

View File

@ -0,0 +1,54 @@
package org.servalproject.servaldna;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
/**
* Created by jeremy on 8/05/14.
*/
public abstract class AbstractMdpProtocol<T> extends ChannelSelector.Handler {
private final ChannelSelector selector;
protected final MdpSocket socket;
protected final AsyncResult<T> results;
public AbstractMdpProtocol(ChannelSelector selector, AsyncResult<T> results) throws IOException {
socket = new MdpSocket();
socket.bind();
this.selector = selector;
this.results = results;
selector.register(this);
}
public void close(){
try {
selector.unregister(this);
} catch (IOException e) {
e.printStackTrace();
}
socket.close();
}
protected abstract void parse(MdpPacket response);
@Override
public void read() {
try {
MdpPacket response = new MdpPacket();
socket.receive(response);
parse(response);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public SelectableChannel getChannel() throws IOException {
return socket.getChannel();
}
@Override
public int getInterest() {
return SelectionKey.OP_READ;
}
}

View File

@ -1,23 +1,14 @@
package org.servalproject.servaldna;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
/**
* Created by jeremy on 21/02/14.
*/
public class MdpDnaLookup extends ChannelSelector.Handler{
private final ChannelSelector selector;
private final MdpSocket socket;
private final AsyncResult<ServalDCommand.LookupResult> results;
public class MdpDnaLookup extends AbstractMdpProtocol<ServalDCommand.LookupResult> {
public MdpDnaLookup(ChannelSelector selector, AsyncResult<ServalDCommand.LookupResult> results) throws IOException {
socket = new MdpSocket();
socket.bind();
this.selector = selector;
this.results = results;
selector.register(this);
super(selector, results);
}
public void sendRequest(SubscriberId destination, String did) throws IOException {
@ -33,10 +24,8 @@ public class MdpDnaLookup extends ChannelSelector.Handler{
}
@Override
public void read() {
protected void parse(MdpPacket response) {
try {
MdpPacket response = new MdpPacket();
socket.receive(response);
byte bytes[] = new byte[response.payload.remaining()];
response.payload.get(bytes);
String resultString = new String(bytes);
@ -54,25 +43,5 @@ public class MdpDnaLookup extends ChannelSelector.Handler{
} catch (AbstractId.InvalidHexException e) {
e.printStackTrace();
}
}
public void close(){
try {
selector.unregister(this);
} catch (IOException e) {
e.printStackTrace();
}
socket.close();
}
@Override
public SelectableChannel getChannel() throws IOException {
return socket.getChannel();
}
@Override
public int getInterest() {
return SelectionKey.OP_READ;
}
}

View File

@ -23,6 +23,7 @@ public class MdpPacket {
public static final int MDP_PORT_ECHO = 7;
public static final int MDP_PORT_DNALOOKUP = 10;
public static final int MDP_PORT_SERVICE_DISCOVERY = 11;
public MdpPacket(){
buff = ByteBuffer.allocate(MDP_MTU);

View File

@ -0,0 +1,96 @@
package org.servalproject.servaldna;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Properties;
/**
* Created by jeremy on 8/05/14.
*/
public class MdpServiceLookup extends AbstractMdpProtocol<MdpServiceLookup.ServiceResult> {
public static class ServiceResult extends Properties {
public final SubscriberId subscriberId;
public ServiceResult(SubscriberId subscriberId){
this.subscriberId = subscriberId;
}
public String toString(){
return "ServiceResult{subscriberId="+subscriberId+", "+super.toString()+"}";
}
}
public MdpServiceLookup(ChannelSelector selector, AsyncResult<ServiceResult> results) throws IOException {
super(selector, results);
}
public void sendRequest(SubscriberId destination, String pattern) throws IOException {
MdpPacket request = new MdpPacket();
if (destination.isBroadcast())
request.setFlags(MdpPacket.MDP_FLAG_NO_CRYPT);
request.setRemoteSid(destination);
request.setRemotePort(MdpPacket.MDP_PORT_SERVICE_DISCOVERY);
request.payload.put(pattern.getBytes());
request.payload.put((byte)0);
request.payload.flip();
socket.send(request);
}
public static class BuffStream extends InputStream{
private final ByteBuffer buff;
public BuffStream(ByteBuffer buff){
this.buff = buff;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public int read() throws IOException {
if (!buff.hasRemaining())
return -1;
return buff.get()&0xFF;
}
@Override
public void mark(int readLimit){
buff.mark();
}
@Override
public void reset() throws IOException {
buff.rewind();
}
@Override
public void close() throws IOException {
// noop
}
@Override
public int read(byte[] dst, int dstOffset, int charCount) throws IOException {
if (!buff.hasRemaining())
return -1;
if (charCount > buff.remaining())
charCount = buff.remaining();
buff.get(dst, dstOffset, charCount);
return charCount;
}
}
@Override
protected void parse(MdpPacket response) {
try {
ServiceResult result = new ServiceResult(response.getRemoteSid());
result.load(new BuffStream(response.payload));
results.result(result);
} catch (IOException e) {
e.printStackTrace();
} catch (AbstractId.InvalidBinaryException e) {
e.printStackTrace();
}
}
}

View File

@ -3,6 +3,7 @@ package org.servalproject.test;
import org.servalproject.servaldna.AsyncResult;
import org.servalproject.servaldna.ChannelSelector;
import org.servalproject.servaldna.MdpDnaLookup;
import org.servalproject.servaldna.MdpServiceLookup;
import org.servalproject.servaldna.MdpSocket;
import org.servalproject.servaldna.ResultList;
import org.servalproject.servaldna.ServalDCommand;
@ -38,7 +39,6 @@ public class CommandLine {
System.out.println(s);
if (s.getResult()!=0)
throw new ServalDFailureException("Serval daemon isn't running");
System.out.println(s);
MdpSocket.loopbackMdpPort = s.mdpInetPort;
ChannelSelector selector = new ChannelSelector();
MdpDnaLookup lookup = new MdpDnaLookup(selector, new AsyncResult<ServalDCommand.LookupResult>() {
@ -52,6 +52,24 @@ public class CommandLine {
lookup.close();
}
static void service(String pattern) throws IOException, InterruptedException, ServalDFailureException {
ServalDCommand.Status s = ServalDCommand.serverStatus();
System.out.println(s);
if (s.getResult()!=0)
throw new ServalDFailureException("Serval daemon isn't running");
MdpSocket.loopbackMdpPort = s.mdpInetPort;
ChannelSelector selector = new ChannelSelector();
MdpServiceLookup lookup = new MdpServiceLookup(selector, new AsyncResult<MdpServiceLookup.ServiceResult>() {
@Override
public void result(MdpServiceLookup.ServiceResult nextResult) {
System.out.println(nextResult.toString());
}
});
lookup.sendRequest(SubscriberId.broadcastSid, pattern);
Thread.sleep(3000);
lookup.close();
}
public static void main(String... args){
if (args.length<1)
return;
@ -66,7 +84,9 @@ public class CommandLine {
if (methodName.equals("peers"))
getPeers();
if (methodName.equals("lookup"))
lookup(args.length>=2 ? args[1] : "");
lookup(args.length >= 2 ? args[1] : "");
if (methodName.equals("service"))
service(args.length >= 2 ? args[1] : "");
if (result!=null)
System.out.println(result.toString());

View File

@ -32,6 +32,7 @@ struct mdp_sockaddr {
#define MDP_FLAG_NO_CRYPT (1<<0)
#define MDP_FLAG_NO_SIGN (1<<1)
#define MDP_FLAG_BIND (1<<2)
#define MDP_FLAG_REUSE (1<<5)
#define MDP_FLAG_CLOSE (1<<3)
#define MDP_FLAG_ERROR (1<<4)

View File

@ -50,6 +50,7 @@ struct socket_address ip_addr;
static int try_send(struct connection *conn);
static void msp_poll(struct sched_ent *alarm);
static void service_poll(struct sched_ent *alarm);
static void listen_poll(struct sched_ent *alarm);
static void io_poll(struct sched_ent *alarm);
@ -65,6 +66,18 @@ struct sched_ent mdp_sock={
.stats = &mdp_sock_stats,
};
struct profile_total service_sock_stats={
.name="service_poll"
};
struct sched_ent service_sock={
.poll.revents = 0,
.poll.events = POLLIN,
.poll.fd = -1,
.function = service_poll,
.stats = &service_sock_stats,
};
struct profile_total io_stats={
.name="io_stats"
};
@ -81,6 +94,9 @@ struct sched_ent listen_alarm={
.stats = &listen_stats,
};
const char *service_name=NULL;
mdp_port_t service_port;
static struct connection *alloc_connection(
struct msp_sock *sock,
int fd_in,
@ -294,6 +310,23 @@ static void msp_poll(struct sched_ent *alarm)
}
}
static void service_poll(struct sched_ent *alarm){
if (alarm->poll.revents & POLLIN){
struct mdp_header header;
uint8_t payload[256];
ssize_t len = mdp_recv(alarm->poll.fd, &header, payload, sizeof payload);
if (len==-1)
return;
if (header.flags & (MDP_FLAG_ERROR|MDP_FLAG_BIND))
return;
if (is_sid_t_broadcast(header.local.sid))
header.local.sid = SID_ANY;
len = snprintf((char*)payload, sizeof payload, "%s.msp.port=%d", service_name, service_port);
mdp_send(alarm->poll.fd, &header, payload, len);
}
}
static int try_send(struct connection *conn)
{
if (!conn->in->limit)
@ -422,6 +455,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
once = cli_arg(parsed, "--once", NULL, NULL, NULL) == 0;
if ( cli_arg(parsed, "--forward", &local_port_string, cli_uint, NULL) == -1
|| cli_arg(parsed, "--service", &service_name, NULL, NULL) == -1
|| cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1
|| cli_arg(parsed, "port", &port_string, cli_uint, NULL) == -1)
return -1;
@ -429,7 +463,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
struct mdp_sockaddr addr;
bzero(&addr, sizeof addr);
addr.port = atoi(port_string);
service_port = addr.port = atoi(port_string);
saw_error=0;
if (sidhex && *sidhex){
@ -440,6 +474,29 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
int ret=-1;
struct msp_sock *sock = NULL;
if (service_name){
// listen for service discovery messages
service_sock.poll.fd = mdp_socket();
if (service_sock.poll.fd==-1)
goto end;
set_nonblock(service_sock.poll.fd);
watch(&service_sock);
// bind
struct mdp_header header;
bzero(&header, sizeof(header));
header.local.sid = BIND_PRIMARY;
header.local.port = MDP_PORT_SERVICE_DISCOVERY;
header.remote.sid = SID_ANY;
header.remote.port = MDP_LISTEN;
header.ttl = PAYLOAD_TTL_DEFAULT;
header.flags = MDP_FLAG_BIND|MDP_FLAG_REUSE;
if (mdp_send(service_sock.poll.fd, &header, NULL, 0)==-1)
goto end;
}else
service_sock.poll.fd=-1;
mdp_sock.poll.fd = mdp_socket();
if (mdp_sock.poll.fd==-1)
goto end;
@ -514,6 +571,13 @@ end:
if (mdp_sock.poll.fd!=-1){
msp_close_all(mdp_sock.poll.fd);
mdp_close(mdp_sock.poll.fd);
mdp_sock.poll.fd=-1;
}
if (is_watching(&service_sock))
unwatch(&service_sock);
if (service_sock.poll.fd!=-1){
mdp_close(service_sock.poll.fd);
service_sock.poll.fd=-1;
}
if (listen_alarm.poll.fd !=-1 && is_watching(&listen_alarm))
unwatch(&listen_alarm);

View File

@ -118,7 +118,6 @@ setup_DnaLookup() {
set mdp.enable_inet on
}
setup
set_instance +A
foreach_instance +A +B create_single_identity
start_servald_instances +A +B
set_instance +A
@ -135,4 +134,39 @@ teardown_DnaLookup() {
report_all_servald_servers
}
doc_serviceDiscovery="Discover network services by name"
listen_service() {
executeOk_servald --timeout=20 msp listen --service=test_name 512 <<EOF
Hi!
EOF
tfw_cat --stderr
}
setup_serviceDiscovery() {
configure_servald_server() {
add_servald_interface
executeOk_servald config \
set debug.mdprequests on \
set mdp.enable_inet on \
set log.console.level debug
}
setup
set_instance +A
foreach_instance +A +B create_single_identity
start_servald_instances +A +B
set_instance +B
fork %service listen_service
set_instance +A
}
test_serviceDiscovery() {
execute --timeout=10 --core-backtrace java "-Djava.library.path=$LD_LIBRARY_PATH" -classpath "$PWD/classes" org.servalproject.test.CommandLine 'service' 'test_name.*'
assertStdoutGrep "$SIDB"
assertStdoutGrep "test_name.msp.port=512"
tfw_cat --stdout --stderr
}
teardown_serviceDiscovery() {
stop_all_servald_servers
kill_all_servald_processes
assert_no_servald_processes
report_all_servald_servers
}
runTests "$@"