Add unicast probe packets

Probe directory service instead of assuming that it is always reachable
Fix directory service and routing tests
This commit is contained in:
Jeremy Lakeman 2012-12-04 14:47:57 +10:30
parent 2b21a691ca
commit 5c7eb4a594
15 changed files with 346 additions and 170 deletions

View File

@ -123,6 +123,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MAX_SIGNATURES 16 #define MAX_SIGNATURES 16
#define MDP_PORT_KEYMAPREQUEST 1 #define MDP_PORT_KEYMAPREQUEST 1
#define MDP_PORT_PROBE 6
#define MDP_PORT_ECHO 7 #define MDP_PORT_ECHO 7
#define MDP_PORT_DNALOOKUP 10 #define MDP_PORT_DNALOOKUP 10
#define MDP_PORT_VOMP 12 #define MDP_PORT_VOMP 12

View File

@ -60,9 +60,11 @@ int overlay_mdp_send(overlay_mdp_frame *mdp,int flags,int timeout_ms)
} }
} }
int port=mdp->out.dst.port; int port=0;
if ((mdp->packetTypeAndFlags&MDP_TYPE_MASK) == MDP_TX)
port = mdp->out.dst.port;
time_ms_t started = gettime_ms(); time_ms_t started = gettime_ms();
while(timeout_ms>=0 && overlay_mdp_client_poll(timeout_ms)>0){ while(timeout_ms>=0 && overlay_mdp_client_poll(timeout_ms)>0){
int ttl=-1; int ttl=-1;
if (!overlay_mdp_recv(mdp, port, &ttl)) { if (!overlay_mdp_recv(mdp, port, &ttl)) {
@ -167,10 +169,10 @@ int overlay_mdp_client_poll(time_ms_t timeout_ms)
if (timeout_ms>=0) { if (timeout_ms>=0) {
tv.tv_sec=timeout_ms/1000; tv.tv_sec=timeout_ms/1000;
tv.tv_usec=(timeout_ms%1000)*1000; tv.tv_usec=(timeout_ms%1000)*1000;
ret=select(1,&r,NULL,&r,&tv); ret=select(mdp_client_socket+1,&r,NULL,&r,&tv);
} }
else else
ret=select(1,&r,NULL,&r,NULL); ret=select(mdp_client_socket+1,&r,NULL,&r,NULL);
return ret; return ret;
} }
@ -209,8 +211,10 @@ int overlay_mdp_recv(overlay_mdp_frame *mdp, int port, int *ttl)
} }
// silently drop incoming packets for the wrong port number // silently drop incoming packets for the wrong port number
if (port>0 && port != mdp->in.dst.port) if (port>0 && port != mdp->in.dst.port){
WARNF("Ignoring packet for port %d",mdp->in.dst.port);
return -1; return -1;
}
int expected_len = overlay_mdp_relevant_bytes(mdp); int expected_len = overlay_mdp_relevant_bytes(mdp);

View File

@ -194,8 +194,6 @@ int subscriber_is_reachable(struct subscriber *subscriber){
int set_reachable(struct subscriber *subscriber, int reachable){ int set_reachable(struct subscriber *subscriber, int reachable){
if (subscriber->reachable==reachable) if (subscriber->reachable==reachable)
return 0; return 0;
int old_value=subscriber->reachable;
subscriber->reachable=reachable; subscriber->reachable=reachable;
// These log messages are for use in tests. Changing them may break test scripts. // These log messages are for use in tests. Changing them may break test scripts.
@ -208,6 +206,8 @@ int set_reachable(struct subscriber *subscriber, int reachable){
break; break;
case REACHABLE_INDIRECT: case REACHABLE_INDIRECT:
DEBUGF("REACHABLE INDIRECTLY sid=%s", alloca_tohex_sid(subscriber->sid)); DEBUGF("REACHABLE INDIRECTLY sid=%s", alloca_tohex_sid(subscriber->sid));
DEBUGF("(via %s, %d)",subscriber->next_hop?alloca_tohex_sid(subscriber->next_hop->sid):"NOONE!"
,subscriber->next_hop?subscriber->next_hop->reachable:0);
break; break;
case REACHABLE_UNICAST: case REACHABLE_UNICAST:
DEBUGF("REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid)); DEBUGF("REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
@ -215,6 +215,12 @@ int set_reachable(struct subscriber *subscriber, int reachable){
case REACHABLE_BROADCAST: case REACHABLE_BROADCAST:
DEBUGF("REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid)); DEBUGF("REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break; break;
case REACHABLE_UNICAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
} }
} }
@ -223,9 +229,7 @@ int set_reachable(struct subscriber *subscriber, int reachable){
keyring_send_sas_request(subscriber); keyring_send_sas_request(subscriber);
// Hacky layering violation... send our identity to a directory service // Hacky layering violation... send our identity to a directory service
if (subscriber==directory_service && if (subscriber==directory_service)
(!(old_value&REACHABLE)) &&
reachable&REACHABLE)
directory_registration(); directory_registration();
return 0; return 0;
@ -248,37 +252,41 @@ int reachable_unicast(struct subscriber *subscriber, overlay_interface *interfac
return 0; return 0;
} }
// load a unicast address from configuration, replace with database?? // load a unicast address from configuration
int load_subscriber_address(struct subscriber *subscriber){ int load_subscriber_address(struct subscriber *subscriber){
if (subscriber_is_reachable(subscriber)&REACHABLE)
return 0;
char buff[80]; char buff[80];
const char *sid_hex = alloca_tohex_sid(subscriber->sid); const char *sid_hex = alloca_tohex_sid(subscriber->sid);
overlay_interface *interface=NULL;
snprintf(buff, sizeof(buff), "%s.interface", sid_hex);
const char *interface_name = confValueGet(buff, NULL);
// no unicast configuration? just return.
if (!interface_name)
return 1;
snprintf(buff, sizeof(buff), "%s.address", sid_hex); snprintf(buff, sizeof(buff), "%s.address", sid_hex);
const char *address = confValueGet(buff, NULL); const char *address = confValueGet(buff, NULL);
// no address configuration? just return.
if (!address) if (!address)
return 1; return 1;
snprintf(buff, sizeof(buff), "%s.port", sid_hex); snprintf(buff, sizeof(buff), "%s.interface", sid_hex);
int port = confValueGetInt64Range(buff, PORT_DNA, 1, 65535); const char *interface_name = confValueGet(buff, NULL);
if (interface_name){
overlay_interface *interface = overlay_interface_find_name(interface_name); interface = overlay_interface_find_name(interface_name);
if (!interface){ // explicity defined interface isn't up? just return.
WARNF("Interface %s is not UP", interface_name); if (!interface)
return -1; return 1;
} }
struct in_addr addr; struct sockaddr_in addr;
if (!inet_aton(address, &addr)){ addr.sin_family=AF_INET;
if (!inet_aton(address, &addr.sin_addr)){
return WHYF("%s doesn't look like an IP address", address); return WHYF("%s doesn't look like an IP address", address);
} }
return reachable_unicast(subscriber, interface, addr, port); snprintf(buff, sizeof(buff), "%s.port", sid_hex);
addr.sin_port = confValueGetInt64Range(buff, PORT_DNA, 1, 65535);
return overlay_send_probe(subscriber, addr, interface);
} }
// generate a new random broadcast address // generate a new random broadcast address
@ -456,30 +464,36 @@ int overlay_address_parse(struct decode_context *context, struct overlay_buffer
// once we've finished parsing a packet, complete and send a please explain if required. // once we've finished parsing a packet, complete and send a please explain if required.
int send_please_explain(struct decode_context *context, struct subscriber *source, struct subscriber *destination){ int send_please_explain(struct decode_context *context, struct subscriber *source, struct subscriber *destination){
IN(); IN();
if (!context->please_explain) struct overlay_frame *frame=context->please_explain;
if (!frame)
RETURN(0); RETURN(0);
frame->type = OF_TYPE_PLEASEEXPLAIN;
context->please_explain->type = OF_TYPE_PLEASEEXPLAIN;
if (source) if (source)
context->please_explain->source = source; frame->source = source;
else else
context->please_explain->source = my_subscriber; frame->source = my_subscriber;
context->please_explain->source->send_full=1; frame->source->send_full=1;
frame->destination = destination;
if (destination && (destination->reachable & REACHABLE)){ if (destination && (destination->reachable & REACHABLE)){
context->please_explain->destination = destination; frame->ttl=64;
context->please_explain->ttl=64;
}else{ }else{
context->please_explain->ttl=1;// how will this work with olsr?? frame->ttl=1;// how will this work with olsr??
overlay_broadcast_generate_address(&context->please_explain->broadcast_id); overlay_broadcast_generate_address(&frame->broadcast_id);
if (context->interface){
frame->destination_resolved=1;
frame->next_hop = destination;
frame->recvaddr=context->addr;
frame->interface=context->interface;
}
} }
context->please_explain->queue=OQ_MESH_MANAGEMENT; frame->queue=OQ_MESH_MANAGEMENT;
if (!overlay_payload_enqueue(context->please_explain)) if (!overlay_payload_enqueue(frame))
RETURN(0); RETURN(0);
op_free(context->please_explain); op_free(frame);
RETURN(-1); RETURN(-1);
} }

View File

@ -54,7 +54,8 @@ struct subscriber{
// should we send the full address once? // should we send the full address once?
int send_full; int send_full;
// sequence number for this unicast or broadcast destination
int sequence;
// overlay routing information // overlay routing information
struct overlay_node *node; struct overlay_node *node;
@ -84,6 +85,8 @@ struct broadcast{
}; };
struct decode_context{ struct decode_context{
struct overlay_interface *interface;
struct sockaddr_in addr;
int invalid_addresses; int invalid_addresses;
struct overlay_frame *please_explain; struct overlay_frame *please_explain;
struct subscriber *sender; struct subscriber *sender;

View File

@ -67,7 +67,8 @@ int add_advertisement(struct subscriber *subscriber, void *context){
if (subscriber->node){ if (subscriber->node){
overlay_node *n=subscriber->node; overlay_node *n=subscriber->node;
if (n->best_link_score>0 && n->observations[n->best_observation].gateways_en_route < 64){ if ((subscriber->reachable&REACHABLE) && (!(subscriber->reachable&REACHABLE_ASSUMED))
&& n->best_link_score>0 && n->observations[n->best_observation].gateways_en_route < 64){
// never send the full sid in an advertisement // never send the full sid in an advertisement
subscriber->send_full=0; subscriber->send_full=0;

View File

@ -265,15 +265,21 @@ error:
overlay_interface * overlay_interface_find(struct in_addr addr){ overlay_interface * overlay_interface_find(struct in_addr addr){
int i; int i;
overlay_interface *ret = NULL;
for (i=0;i<OVERLAY_MAX_INTERFACES;i++){ for (i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP) if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue; continue;
if ((overlay_interfaces[i].netmask.s_addr & addr.s_addr) == (overlay_interfaces[i].netmask.s_addr & overlay_interfaces[i].address.sin_addr.s_addr)){ if ((overlay_interfaces[i].netmask.s_addr & addr.s_addr) == (overlay_interfaces[i].netmask.s_addr & overlay_interfaces[i].address.sin_addr.s_addr)){
return &overlay_interfaces[i]; return &overlay_interfaces[i];
} }
// check if this is a default interface
if (overlay_interfaces[i].default_route)
ret=&overlay_interfaces[i];
} }
return NULL; return ret;
} }
overlay_interface * overlay_interface_find_name(const char *name){ overlay_interface * overlay_interface_find_name(const char *name){
@ -503,6 +509,16 @@ overlay_interface_init(char *name, struct in_addr src_addr, struct in_addr netma
return WHYF("could not open dummy interface file %s for append", dummyfile); return WHYF("could not open dummy interface file %s for append", dummyfile);
} }
interface->address.sin_family=AF_INET;
interface->address.sin_port = 0;
interface->address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
interface->netmask.s_addr=0xFFFFFF00;
interface->broadcast_address.sin_family=AF_INET;
interface->broadcast_address.sin_port = 0;
interface->broadcast_address.sin_addr.s_addr = interface->address.sin_addr.s_addr | ~interface->netmask.s_addr;
/* Seek to end of file as initial reading point */ /* Seek to end of file as initial reading point */
interface->recv_offset = lseek(interface->alarm.poll.fd,0,SEEK_END); interface->recv_offset = lseek(interface->alarm.poll.fd,0,SEEK_END);
/* XXX later add pretend location information so that we can decide which "packets" to receive /* XXX later add pretend location information so that we can decide which "packets" to receive
@ -608,7 +624,11 @@ void overlay_dummy_poll(struct sched_ent *alarm)
*/ */
unsigned char packet[2048]; unsigned char packet[2048];
int plen=0; int plen=0;
struct sockaddr src_addr; struct sockaddr_in src_addr={
.sin_family = AF_INET,
.sin_port = 0,
.sin_addr.s_addr = htonl(INADDR_LOOPBACK),
};
size_t addrlen = sizeof(src_addr); size_t addrlen = sizeof(src_addr);
time_ms_t now = gettime_ms(); time_ms_t now = gettime_ms();
@ -644,9 +664,8 @@ void overlay_dummy_poll(struct sched_ent *alarm)
plen = -1; plen = -1;
if (debug&DEBUG_PACKETRX) if (debug&DEBUG_PACKETRX)
DEBUG_packet_visualise("Read from dummy interface", &packet[128], plen); DEBUG_packet_visualise("Read from dummy interface", &packet[128], plen);
bzero(&src_addr,sizeof(src_addr));
if (packetOkOverlay(interface, &packet[128], plen, -1, &src_addr, addrlen)) { if (packetOkOverlay(interface, &packet[128], plen, -1, (struct sockaddr*)&src_addr, addrlen)) {
WARN("Unsupported packet from dummy interface"); WARN("Unsupported packet from dummy interface");
} }
} }

View File

@ -158,7 +158,7 @@ int overlay_mdp_reply(int sock,struct sockaddr_un *recvaddr,int recvaddrlen,
{ {
int replylen; int replylen;
if (!recvaddr) return 0; if (!recvaddr) return WHY("No reply address");
replylen=overlay_mdp_relevant_bytes(mdpreply); replylen=overlay_mdp_relevant_bytes(mdpreply);
if (replylen<0) return WHY("Invalid MDP frame (could not compute length)"); if (replylen<0) return WHY("Invalid MDP frame (could not compute length)");
@ -525,6 +525,7 @@ int overlay_mdp_check_binding(struct subscriber *subscriber, int port, int userG
case MDP_PORT_DNALOOKUP: case MDP_PORT_DNALOOKUP:
case MDP_PORT_RHIZOME_RESPONSE: case MDP_PORT_RHIZOME_RESPONSE:
case MDP_PORT_RHIZOME_REQUEST: case MDP_PORT_RHIZOME_REQUEST:
case MDP_PORT_PROBE:
return 0; return 0;
} }
} }
@ -537,6 +538,20 @@ int overlay_mdp_check_binding(struct subscriber *subscriber, int port, int userG
); );
} }
int overlay_mdp_encode_ports(struct overlay_buffer *plaintext, int dst_port, int src_port){
int port=dst_port << 1;
if (dst_port==src_port)
port |= 1;
if (ob_append_packed_ui32(plaintext, port))
return -1;
if (dst_port!=src_port){
if (ob_append_packed_ui32(plaintext, src_port))
return -1;
}
return 0;
}
/* Construct MDP packet frame from overlay_mdp_frame structure /* Construct MDP packet frame from overlay_mdp_frame structure
(need to add return address from bindings list, and copy (need to add return address from bindings list, and copy
payload etc). payload etc).
@ -635,21 +650,11 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
frame->next=NULL; frame->next=NULL;
struct overlay_buffer *plaintext=ob_new(); struct overlay_buffer *plaintext=ob_new();
// build the plain text payload if (overlay_mdp_encode_ports(plaintext, mdp->out.dst.port, mdp->out.src.port)){
int port=mdp->out.dst.port << 1;
if (mdp->out.dst.port==mdp->out.src.port)
port |= 1;
if (ob_append_packed_ui32(plaintext, port)){
ob_free(plaintext); ob_free(plaintext);
RETURN(-1); RETURN (-1);
} }
if (mdp->out.dst.port!=mdp->out.src.port){
if (ob_append_packed_ui32(plaintext, mdp->out.src.port)){
ob_free(plaintext);
RETURN(-1);
}
}
if (ob_append_bytes(plaintext, mdp->out.payload, mdp->out.payload_length)){ if (ob_append_bytes(plaintext, mdp->out.payload, mdp->out.payload_length)){
ob_free(plaintext); ob_free(plaintext);
RETURN(-1); RETURN(-1);

View File

@ -278,6 +278,77 @@ int overlay_mdp_service_echo(overlay_mdp_frame *mdp)
RETURN(0); RETURN(0);
} }
struct probe_contents{
struct sockaddr_in addr;
unsigned char interface;
};
/* Collection of unicast echo responses to detect working links */
static int
overlay_mdp_service_probe(overlay_mdp_frame *mdp)
{
IN();
if (mdp->out.src.port!=MDP_PORT_ECHO || mdp->out.payload_length != sizeof(struct probe_contents)){
WARN("Probe packets should be returned from remote echo port");
RETURN(-1);
}
struct subscriber *peer = find_subscriber(mdp->out.src.sid, SID_SIZE, 0);
struct probe_contents *probe = (struct probe_contents *)&mdp->out.payload;
if (probe->addr.sin_family!=AF_INET)
RETURN(WHY("Unsupported address family"));
if (peer->reachable == REACHABLE_NONE || peer->reachable == REACHABLE_INDIRECT){
reachable_unicast(peer, &overlay_interfaces[probe->interface], probe->addr.sin_addr, probe->addr.sin_port);
}
RETURN(0);
}
int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay_interface *interface){
if (interface==NULL)
interface = overlay_interface_find(addr.sin_addr);
if (!interface)
return WHY("I don't know which interface to use");
struct overlay_frame *frame=malloc(sizeof(struct overlay_frame));
bzero(frame,sizeof(struct overlay_frame));
frame->type=OF_TYPE_DATA;
frame->source = my_subscriber;
frame->next_hop = frame->destination = peer;
frame->ttl=1;
frame->queue=OQ_MESH_MANAGEMENT;
frame->destination_resolved=1;
frame->recvaddr=addr;
frame->flags=PACKET_UNICAST;
frame->interface=interface;
frame->payload = ob_new();
if ((!peer) || !(peer->reachable&REACHABLE))
my_subscriber->send_full=1;
if (overlay_mdp_encode_ports(frame->payload, MDP_PORT_ECHO, MDP_PORT_PROBE)){
op_free(frame);
return -1;
}
// not worried about byte order here as we are the only node that should be parsing the contents.
struct probe_contents *probe = (struct probe_contents*)ob_append_space(frame->payload, sizeof(struct probe_contents));
if (!probe){
op_free(frame);
return -1;
}
probe->addr=addr;
// get interface number
probe->interface = interface - overlay_interfaces;
if (overlay_payload_enqueue(frame)){
op_free(frame);
return -1;
}
DEBUGF("Queued probe packet on interface %s", interface->name);
return 0;
}
int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp) int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp)
{ {
IN(); IN();
@ -286,6 +357,7 @@ int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp)
case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp)); case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp));
case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp)); case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp));
case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp)); case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp));
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(mdp));
case MDP_PORT_RHIZOME_REQUEST: case MDP_PORT_RHIZOME_REQUEST:
if (is_rhizome_mdp_server_running()) { if (is_rhizome_mdp_server_running()) {
RETURN(overlay_mdp_service_rhizomerequest(mdp)); RETURN(overlay_mdp_service_rhizomerequest(mdp));

View File

@ -23,6 +23,8 @@
#include "overlay_address.h" #include "overlay_address.h"
#include "serval.h" #include "serval.h"
#define PACKET_UNICAST (1<<0)
struct overlay_frame { struct overlay_frame {
struct overlay_frame *prev; struct overlay_frame *prev;
struct overlay_frame *next; struct overlay_frame *next;
@ -51,6 +53,7 @@ struct overlay_frame {
int destination_resolved; int destination_resolved;
struct sockaddr_in recvaddr; struct sockaddr_in recvaddr;
overlay_interface *interface; overlay_interface *interface;
int flags;
/* Actual payload */ /* Actual payload */
struct overlay_buffer *payload; struct overlay_buffer *payload;

View File

@ -27,13 +27,15 @@ struct sockaddr_in loopback;
unsigned char magic_header[]={0x00, 0x01}; unsigned char magic_header[]={0x00, 0x01};
int overlay_packet_init_header(struct decode_context *context, struct overlay_buffer *buff){ int overlay_packet_init_header(struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination, int flags){
if (ob_append_bytes(buff,magic_header,sizeof magic_header)) if (ob_append_bytes(buff,magic_header,sizeof magic_header))
return -1; return -1;
if (overlay_address_append(context, buff, my_subscriber)) if (overlay_address_append(context, buff, my_subscriber))
return -1; return -1;
context->sender = my_subscriber; context->sender = my_subscriber;
ob_append_byte(buff,0);//seq ob_append_byte(buff,0);
ob_append_byte(buff,flags);
return 0; return 0;
} }
@ -174,40 +176,59 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
the source having received the frame from elsewhere. the source having received the frame from elsewhere.
*/ */
if (recvaddr->sa_family!=AF_INET)
RETURN(WHYF("Unexpected protocol family %d",recvaddr->sa_family));
struct overlay_frame f; struct overlay_frame f;
struct decode_context context; struct decode_context context;
bzero(&context, sizeof context); bzero(&context, sizeof context);
bzero(&f,sizeof f);
time_ms_t now = gettime_ms(); time_ms_t now = gettime_ms();
struct overlay_buffer *b = ob_static(packet, len); struct overlay_buffer *b = ob_static(packet, len);
ob_limitsize(b, len); ob_limitsize(b, len);
if (ob_get(b)!=magic_header[0] || ob_get(b)!=magic_header[1]) if (ob_get(b)!=magic_header[0] || ob_get(b)!=magic_header[1]){
return WHY("Packet type not recognised."); ob_free(b);
RETURN(WHY("Packet type not recognised."));
bzero(&f,sizeof(struct overlay_frame));
f.interface = interface;
if (recvaddr->sa_family==AF_INET){
f.recvaddr=*((struct sockaddr_in *)recvaddr);
if (debug&DEBUG_OVERLAYFRAMES)
DEBUG("Received overlay packet");
} else {
if (interface->fileP) {
/* dummy interface, so tell to use localhost */
f.recvaddr.sin_family = AF_INET;
f.recvaddr.sin_port = 0;
f.recvaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
}
} }
context.interface = f.interface = interface;
f.recvaddr = *((struct sockaddr_in *)recvaddr);
if (debug&DEBUG_OVERLAYFRAMES)
DEBUG("Received overlay packet");
overlay_address_parse(&context, b, &context.sender); overlay_address_parse(&context, b, &context.sender);
if (context.sender && context.sender->reachable==REACHABLE_SELF){
ob_free(b);
RETURN(0);
}
int seq = ob_get(b); int seq = ob_get(b);
int packet_flags = ob_get(b);
if (context.sender){
if (context.sender->reachable==REACHABLE_SELF){
ob_free(b);
RETURN(0);
}
// always update the IP address we heard them from, even if we don't need to use it right now
context.sender->address = f.recvaddr;
// if this is a dummy announcement for a node that isn't in our routing table
if (context.sender->reachable == REACHABLE_NONE &&
(!context.sender->node) &&
packet_flags&PACKET_UNICAST){
// mark this subscriber as reachable directly via unicast.
reachable_unicast(context.sender, interface, f.recvaddr.sin_addr, ntohs(f.recvaddr.sin_port));
}
}
if (packet_flags & PACKET_UNICAST)
context.addr=f.recvaddr;
else
context.addr=interface->broadcast_address;
while(b->position < b->sizeLimit){ while(b->position < b->sizeLimit){
context.invalid_addresses=0; context.invalid_addresses=0;
@ -328,22 +349,6 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
b->position=next_payload; b->position=next_payload;
} }
if (context.sender && recvaddr){
struct sockaddr_in *addr=(struct sockaddr_in *)recvaddr;
// always update the IP address we heard them from, even if we don't need to use it right now
context.sender->address = *addr;
// if this is a dummy announcement for a node that isn't in our routing table
if (context.sender->reachable == REACHABLE_NONE &&
(!context.sender->node) &&
(interface->fileP || recvaddr->sa_family==AF_INET)){
// mark this subscriber as reachable directly via unicast.
reachable_unicast(context.sender, interface, addr->sin_addr, ntohs(addr->sin_port));
}
}
send_please_explain(&context, my_subscriber, context.sender); send_please_explain(&context, my_subscriber, context.sender);
ob_free(b); ob_free(b);

View File

@ -55,9 +55,9 @@ struct outgoing_packet{
overlay_interface *interface; overlay_interface *interface;
int i; int i;
struct subscriber *unicast_subscriber; struct subscriber *unicast_subscriber;
int unicast;
int add_advertisements; int add_advertisements;
struct sockaddr_in dest; struct sockaddr_in dest;
int header_length;
struct overlay_buffer *buffer; struct overlay_buffer *buffer;
struct decode_context context; struct decode_context context;
}; };
@ -161,13 +161,23 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (!p) return WHY("Cannot queue NULL"); if (!p) return WHY("Cannot queue NULL");
if (!p->destination_resolved){ do{
if (p->destination){ if (p->destination_resolved)
int r = subscriber_is_reachable(p->destination); break;
if (!(r&REACHABLE)) if (!p->destination)
return WHYF("Cannot send %x packet, destination %s is %s", p->type, alloca_tohex_sid(p->destination->sid), r==REACHABLE_SELF?"myself":"unreachable"); break;
int r = subscriber_is_reachable(p->destination);
if (r&REACHABLE)
break;
if (directory_service){
r = subscriber_is_reachable(directory_service);
if (r&REACHABLE)
break;
} }
}
return WHYF("Cannot send %x packet, destination %s is %s", p->type, alloca_tohex_sid(p->destination->sid), r==REACHABLE_SELF?"myself":"unreachable");
} while(0);
if (p->queue>=OQ_MAX) if (p->queue>=OQ_MAX)
return WHY("Invalid queue specified"); return WHY("Invalid queue specified");
@ -193,26 +203,30 @@ int overlay_payload_enqueue(struct overlay_frame *p)
else if(p->send_copies>5) else if(p->send_copies>5)
return WHY("Too many copies requested"); return WHY("Too many copies requested");
if (!p->destination){ if (!p->destination_resolved){
int i; if (!p->destination){
int drop=1; int i;
int drop=1;
// hook to allow for flooding via olsr
olsr_send(p); // hook to allow for flooding via olsr
olsr_send(p);
// make sure there is an interface up that allows broadcasts
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){ // make sure there is an interface up that allows broadcasts
if (overlay_interfaces[i].state==INTERFACE_STATE_UP for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
&& overlay_interfaces[i].send_broadcasts){ if (overlay_interfaces[i].state==INTERFACE_STATE_UP
p->broadcast_sent_via[i]=0; && overlay_interfaces[i].send_broadcasts){
drop=0; p->broadcast_sent_via[i]=0;
}else drop=0;
p->broadcast_sent_via[i]=1; }else
p->broadcast_sent_via[i]=1;
}
// just drop it now
if (drop){
WARN("No broadcast interfaces to send with");
return -1;
}
} }
// just drop it now
if (drop)
return -1;
} }
struct overlay_frame *l=queue->last; struct overlay_frame *l=queue->last;
@ -231,16 +245,19 @@ int overlay_payload_enqueue(struct overlay_frame *p)
} }
static void static void
overlay_init_packet(struct outgoing_packet *packet, overlay_interface *interface, struct sockaddr_in addr, int tick){ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination, int flags,
overlay_interface *interface, struct sockaddr_in addr, int tick){
packet->interface = interface; packet->interface = interface;
packet->i = (interface - overlay_interfaces); packet->i = (interface - overlay_interfaces);
packet->dest=addr; packet->dest=addr;
packet->buffer=ob_new(); packet->buffer=ob_new();
packet->add_advertisements=1; packet->add_advertisements=1;
if (flags & PACKET_UNICAST)
packet->unicast_subscriber = destination;
ob_limitsize(packet->buffer, packet->interface->mtu); ob_limitsize(packet->buffer, packet->interface->mtu);
overlay_packet_init_header(&packet->context, packet->buffer); overlay_packet_init_header(&packet->context, packet->buffer, destination, flags);
packet->header_length = ob_position(packet->buffer);
if (tick){ if (tick){
/* 1. Send announcement about ourselves, including one SID that we host if we host more than one SID /* 1. Send announcement about ourselves, including one SID that we host if we host more than one SID
(the first SID we host becomes our own identity, saving a little bit of data here). (the first SID we host becomes our own identity, saving a little bit of data here).
@ -259,12 +276,24 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
int ret=0; int ret=0;
time_ms_t send_time; time_ms_t send_time;
// ignore packet if the destination is currently unreachable do{
if (frame->destination && (!(subscriber_is_reachable(frame->destination)&REACHABLE))) if (frame->destination_resolved)
break;
if (!frame->destination)
break;
if (subscriber_is_reachable(frame->destination)&REACHABLE)
break;
if (directory_service){
if (subscriber_is_reachable(directory_service)&REACHABLE)
break;
}
// ignore payload alarm if the destination is currently unreachable
return 0; return 0;
}while(0);
// when is the next packet from this queue due? // when is the next packet from this queue due?
send_time=queue->first->enqueued_at + queue->transmit_delay; send_time=queue->first->enqueued_at + queue->transmit_delay;
if (next_packet.alarm==0 || send_time < next_packet.alarm){ if (next_packet.alarm==0 || send_time < next_packet.alarm){
next_packet.alarm=send_time; next_packet.alarm=send_time;
ret = 1; ret = 1;
@ -323,13 +352,15 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
r = subscriber_is_reachable(frame->next_hop); r = subscriber_is_reachable(frame->next_hop);
} }
if (!(r&REACHABLE_DIRECT)) if (!(r&REACHABLE_DIRECT)){
goto skip; goto skip;
}
frame->interface = frame->next_hop->interface; frame->interface = frame->next_hop->interface;
if(r&REACHABLE_UNICAST){ if(r&REACHABLE_UNICAST){
frame->recvaddr = frame->next_hop->address; frame->recvaddr = frame->next_hop->address;
frame->flags = PACKET_UNICAST;
// ignore resend logic for unicast packets, where wifi gives better resilience // ignore resend logic for unicast packets, where wifi gives better resilience
frame->send_copies=1; frame->send_copies=1;
}else }else
@ -340,8 +371,9 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
if (packet->buffer){ if (packet->buffer){
// check if we can stuff into this packet // check if we can stuff into this packet
if (frame->broadcast_sent_via[packet->i]) if (frame->broadcast_sent_via[packet->i]){
goto skip; goto skip;
}
frame->interface = packet->interface; frame->interface = packet->interface;
frame->recvaddr = packet->interface->broadcast_address; frame->recvaddr = packet->interface->broadcast_address;
@ -369,15 +401,12 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
} }
if (!packet->buffer){ if (!packet->buffer){
overlay_init_packet(packet, frame->interface, frame->recvaddr, 0); overlay_init_packet(packet, frame->next_hop, frame->flags, frame->interface, frame->recvaddr, 0);
if (frame->next_hop && (frame->next_hop->reachable&REACHABLE_UNICAST)){
packet->unicast_subscriber = frame->next_hop;
packet->unicast=1;
}
}else{ }else{
// is this packet going our way? // is this packet going our way?
if (frame->interface!=packet->interface || memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0) if (frame->interface!=packet->interface || memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0){
goto skip; goto skip;
}
} }
if (debug&DEBUG_OVERLAYFRAMES){ if (debug&DEBUG_OVERLAYFRAMES){
@ -386,9 +415,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN)); frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
} }
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)) if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)){
// payload was not queued // payload was not queued
goto skip; goto skip;
}
// don't send rhizome adverts if the packet contains a voice payload // don't send rhizome adverts if the packet contains a voice payload
if (frame->queue==OQ_ISOCHRONOUS_VOICE) if (frame->queue==OQ_ISOCHRONOUS_VOICE)
@ -397,10 +427,13 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// mark the payload as sent // mark the payload as sent
int keep_payload = 0; int keep_payload = 0;
if (frame->next_hop){ if (frame->destination_resolved){
frame->send_copies --; frame->send_copies --;
if (frame->send_copies>0) if (frame->send_copies>0){
keep_payload=1; keep_payload=1;
// make sure we don't schedule the next alarm immediately
frame->enqueued_at=gettime_ms();
}
}else{ }else{
int i; int i;
frame->broadcast_sent_via[packet->i]=1; frame->broadcast_sent_via[packet->i]=1;
@ -419,10 +452,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
if (!keep_payload){ if (!keep_payload){
frame = overlay_queue_remove(queue, frame); frame = overlay_queue_remove(queue, frame);
continue; continue;
} }
skip: skip:
// if we can't send the payload now, check when we should try // if we can't send the payload now, check when we should try next
overlay_calc_queue_time(queue, frame); overlay_calc_queue_time(queue, frame);
frame = frame->next; frame = frame->next;
} }
@ -448,21 +481,23 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
schedule(&next_packet); schedule(&next_packet);
if(packet->buffer){ if(packet->buffer){
// TODO don't send empty packet if (ob_position(packet->buffer) > packet->header_length){
// stuff rhizome announcements at the last moment // stuff rhizome announcements at the last moment
if (packet->add_advertisements) if (packet->add_advertisements)
overlay_rhizome_add_advertisements(&packet->context, packet->i,packet->buffer); overlay_rhizome_add_advertisements(&packet->context, packet->i,packet->buffer);
if (debug&DEBUG_PACKETCONSTRUCTION) if (debug&DEBUG_PACKETCONSTRUCTION)
ob_dump(packet->buffer,"assembled packet"); ob_dump(packet->buffer,"assembled packet");
if (overlay_broadcast_ensemble(packet->i, &packet->dest, ob_ptr(packet->buffer), ob_position(packet->buffer))){ if (overlay_broadcast_ensemble(packet->i, &packet->dest, ob_ptr(packet->buffer), ob_position(packet->buffer))){
// sendto failed. We probably don't have a valid route // sendto failed. We probably don't have a valid route
if (packet->unicast_subscriber){ if (packet->unicast_subscriber){
set_reachable(packet->unicast_subscriber, REACHABLE_NONE); set_reachable(packet->unicast_subscriber, REACHABLE_NONE);
}
} }
} }else
WARN("No payloads were sent?");
ob_free(packet->buffer); ob_free(packet->buffer);
RETURN(1); RETURN(1);
} }
@ -500,7 +535,7 @@ overlay_tick_interface(int i, time_ms_t now) {
// initialise the packet buffer // initialise the packet buffer
bzero(&packet, sizeof(struct outgoing_packet)); bzero(&packet, sizeof(struct outgoing_packet));
overlay_init_packet(&packet, &overlay_interfaces[i], overlay_interfaces[i].broadcast_address, 1); overlay_init_packet(&packet, NULL, 0, &overlay_interfaces[i], overlay_interfaces[i].broadcast_address, 1);
/* Stuff more payloads from queues and send it */ /* Stuff more payloads from queues and send it */
overlay_fill_send_packet(&packet, now); overlay_fill_send_packet(&packet, now);

View File

@ -480,7 +480,9 @@ int overlay_route_recalc_node_metrics(overlay_node *n, time_ms_t now)
if (best_score<=0){ if (best_score<=0){
for(o=0;o<OVERLAY_MAX_OBSERVATIONS;o++) for(o=0;o<OVERLAY_MAX_OBSERVATIONS;o++)
{ {
if (n->observations[o].observed_score && n->observations[o].sender->reachable&REACHABLE) // only count observations from neighbours that we *know* we have a 2 way path to
if (n->observations[o].observed_score && n->observations[o].sender->reachable&REACHABLE
&& !(n->observations[o].sender->reachable&REACHABLE_ASSUMED))
{ {
int discounted_score=n->observations[o].observed_score; int discounted_score=n->observations[o].observed_score;
discounted_score-=(now-n->observations[o].rx_time)/1000; discounted_score-=(now-n->observations[o].rx_time)/1000;

View File

@ -380,6 +380,8 @@ typedef struct overlay_interface {
struct sockaddr_in address; struct sockaddr_in address;
struct sockaddr_in broadcast_address; struct sockaddr_in broadcast_address;
struct in_addr netmask; struct in_addr netmask;
// can we use this interface for routes to addresses in other subnets?
int default_route;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX on this interface. /* Not necessarily the real MTU, but the largest frame size we are willing to TX on this interface.
For radio links the actual maximum and the maximum that is likely to be delivered reliably are For radio links the actual maximum and the maximum that is likely to be delivered reliably are
@ -444,7 +446,8 @@ time_ms_t overlay_time_until_next_tick();
int overlay_add_selfannouncement(struct decode_context *context, int interface,struct overlay_buffer *b); int overlay_add_selfannouncement(struct decode_context *context, int interface,struct overlay_buffer *b);
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface, int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
struct overlay_frame *p, struct overlay_buffer *b); struct overlay_frame *p, struct overlay_buffer *b);
int overlay_packet_init_header(struct decode_context *context, struct overlay_buffer *buff); int overlay_packet_init_header(struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination, int flags);
int overlay_frame_build_header(struct decode_context *context, struct overlay_buffer *buff, int overlay_frame_build_header(struct decode_context *context, struct overlay_buffer *buff,
int queue, int type, int modifiers, int ttl, int queue, int type, int modifiers, int ttl,
struct broadcast *broadcast, struct subscriber *next_hop, struct broadcast *broadcast, struct subscriber *next_hop,
@ -597,6 +600,7 @@ int overlay_mdp_reply(int sock,struct sockaddr_un *recvaddr,int recvaddrlen,
overlay_mdp_frame *mdpreply); overlay_mdp_frame *mdpreply);
int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP, int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
struct sockaddr_un *recvaddr,int recvaddlen); struct sockaddr_un *recvaddr,int recvaddlen);
int overlay_mdp_encode_ports(struct overlay_buffer *plaintext, int dst_port, int src_port);
int overlay_mdp_dnalookup_reply(const sockaddr_mdp *dstaddr, const unsigned char *resolved_sid, const char *uri, const char *did, const char *name); int overlay_mdp_dnalookup_reply(const sockaddr_mdp *dstaddr, const unsigned char *resolved_sid, const char *uri, const char *did, const char *name);
int urandombytes(unsigned char *x,unsigned long long xlen); int urandombytes(unsigned char *x,unsigned long long xlen);
@ -739,6 +743,7 @@ void overlay_route_tick(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm); void server_shutdown_check(struct sched_ent *alarm);
void overlay_mdp_poll(struct sched_ent *alarm); void overlay_mdp_poll(struct sched_ent *alarm);
int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp); int overlay_mdp_try_interal_services(overlay_mdp_frame *mdp);
int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay_interface *interface);
void fd_periodicstats(struct sched_ent *alarm); void fd_periodicstats(struct sched_ent *alarm);
void rhizome_check_connections(struct sched_ent *alarm); void rhizome_check_connections(struct sched_ent *alarm);

View File

@ -279,6 +279,7 @@ start_servald_server() {
new_pids="$new_pids $apid" new_pids="$new_pids $apid"
fi fi
done done
eval LOG$instance_name=$instance_servald_log
assert --message="a new servald process is running" --dump-on-fail="$instance_servald_log" [ -n "$new_pids" ] assert --message="a new servald process is running" --dump-on-fail="$instance_servald_log" [ -n "$new_pids" ]
assert --message="servald pidfile process is running" --dump-on-fail="$instance_servald_log" $pidfile_running assert --message="servald pidfile process is running" --dump-on-fail="$instance_servald_log" $pidfile_running
assert --message="servald log file $instance_servald_log is present" [ -r "$instance_servald_log" ] assert --message="servald log file $instance_servald_log is present" [ -r "$instance_servald_log" ]
@ -642,7 +643,6 @@ start_servald_instances() {
configure_servald_server configure_servald_server
start_servald_server start_servald_server
eval DUMMY$instance_name="\$DUMMYNET" eval DUMMY$instance_name="\$DUMMYNET"
eval LOG$instance_name="\$(shellarg "\$instance_servald_log")"
done done
# Now wait until they see each other. # Now wait until they see each other.
wait_until --sleep=0.25 instances_see_each_other "$@" wait_until --sleep=0.25 instances_see_each_other "$@"

View File

@ -29,13 +29,12 @@ configure_servald_server() {
setup() { setup() {
setup_servald setup_servald
assert_no_servald_processes assert_no_servald_processes
foreach_instance +A +B +C create_single_identity foreach_instance +A +B +C +D create_single_identity
set_instance +D set_instance +D
executeOk_servald set did $SIDD $DIDC "Agent D Smith"
DIDD1=$DIDC DIDD1=$DIDC
NAMED1="Agent D Smith" NAMED1="Agent D Smith"
create_identities 1 DIDD=$DIDC1
SIDD=$SIDD1
DIDD=$DIDD1
NAMED=$NAMED1 NAMED=$NAMED1
set_instance +A set_instance +A
executeOk_servald config set dna.helper.executable "$servald_build_root/directory_service" executeOk_servald config set dna.helper.executable "$servald_build_root/directory_service"
@ -51,19 +50,25 @@ teardown() {
} }
is_published() { is_published() {
grep "PUBLISHED" $LOGA || return 1 tfw_log "grep \"PUBLISHED.*$1\" $LOGA"
grep "PUBLISHED.*$1" $LOGA || return 1
return 0 return 0
} }
sent_directory_request() { sent_directory_request() {
grep "Sending directory registration" $LOGD || return 1 tfw_log "grep \"Sending directory registration\" $1"
grep "Sending directory registration" $1 || return 1
return 0 return 0
} }
doc_publish="Publish and retrieve a directory entry" doc_publish="Publish and retrieve a directory entry"
test_publish() { test_publish() {
wait_until sent_directory_request wait_until sent_directory_request $LOGB
wait_until is_published wait_until sent_directory_request $LOGC
wait_until sent_directory_request $LOGD
wait_until is_published $SIDB
wait_until is_published $SIDC
wait_until is_published $SIDD
stop_servald_server +B stop_servald_server +B
stop_servald_server +C stop_servald_server +C
stop_servald_server +D stop_servald_server +D
@ -101,25 +106,27 @@ setup_routing() {
executeOk_servald config set mdp.dummyC.tick_ms 0 executeOk_servald config set mdp.dummyC.tick_ms 0
executeOk_servald config set dna.helper.executable "$servald_build_root/directory_service" executeOk_servald config set dna.helper.executable "$servald_build_root/directory_service"
executeOk_servald config set debug.dnahelper on executeOk_servald config set debug.dnahelper on
start_routing_instance
set_instance +B set_instance +B
executeOk_servald config set interfaces "+>dummyB" executeOk_servald config set interfaces "+>dummyB"
executeOk_servald config set mdp.dummyB.tick_ms 0 executeOk_servald config set mdp.dummyB.tick_ms 0
executeOk_servald config set directory.service $SIDA executeOk_servald config set directory.service $SIDA
executeOk_servald config set $SIDA.interface "dummyB" executeOk_servald config set $SIDA.interface "dummyB"
executeOk_servald config set $SIDA.address 127.0.0.1 executeOk_servald config set $SIDA.address 127.0.0.1
start_routing_instance
set_instance +C set_instance +C
executeOk_servald config set interfaces "+>dummyC" executeOk_servald config set interfaces "+>dummyC"
executeOk_servald config set mdp.dummyC.tick_ms 0 executeOk_servald config set mdp.dummyC.tick_ms 0
executeOk_servald config set directory.service $SIDA executeOk_servald config set directory.service $SIDA
executeOk_servald config set $SIDA.interface "dummyC" executeOk_servald config set $SIDA.interface "dummyC"
executeOk_servald config set $SIDA.address 127.0.0.1 executeOk_servald config set $SIDA.address 127.0.0.1
start_routing_instance foreach_instance +A +B +C start_routing_instance
} }
doc_routing="Ping via relay node" doc_routing="Ping via relay node"
test_routing() { test_routing() {
wait_until sent_directory_request $LOGB
wait_until sent_directory_request $LOGC
wait_until is_published $SIDB
wait_until is_published $SIDC
set_instance +B set_instance +B
executeOk_servald dna lookup "$DIDC" executeOk_servald dna lookup "$DIDC"
assertStdoutLineCount '==' 1 assertStdoutLineCount '==' 1