Merge branch 'unicast' into development

This commit is contained in:
Jeremy Lakeman 2013-08-13 16:18:42 +09:30
commit 6bf6d06008
24 changed files with 1165 additions and 1068 deletions

@ -2133,16 +2133,22 @@ int app_route_print(const struct cli_parsed *parsed, struct cli_context *context
char flags[32];
strbuf b = strbuf_local(flags, sizeof flags);
if (p->reachable & REACHABLE_SELF)
strbuf_puts(b, "SELF ");
if (p->reachable & REACHABLE_ASSUMED)
strbuf_puts(b, "ASSUMED ");
if (p->reachable & REACHABLE_BROADCAST)
strbuf_puts(b, "BROADCAST ");
if (p->reachable & REACHABLE_UNICAST)
strbuf_puts(b, "UNICAST ");
if (p->reachable & REACHABLE_INDIRECT)
strbuf_puts(b, "INDIRECT ");
switch (p->reachable){
case REACHABLE_SELF:
strbuf_puts(b, "SELF");
break;
case REACHABLE_BROADCAST:
strbuf_puts(b, "BROADCAST");
break;
case REACHABLE_UNICAST:
strbuf_puts(b, "UNICAST");
break;
case REACHABLE_INDIRECT:
strbuf_puts(b, "INDIRECT");
break;
default:
strbuf_sprintf(b, "%d", p->reachable);
}
cli_put_string(context, strbuf_str(b), ":");
cli_put_string(context, p->interface_name, ":");
cli_put_string(context, alloca_tohex_sid(p->neighbour), "\n");

@ -435,8 +435,9 @@ STRING(256, file, "", str_nonempty,, "Path of interfa
ATOM(struct in_addr, dummy_address, hton_in_addr(INADDR_LOOPBACK), in_addr,, "Dummy interface address")
ATOM(struct in_addr, dummy_netmask, hton_in_addr(0xFFFFFF00), in_addr,, "Dummy interface netmask")
ATOM(uint16_t, port, PORT_DNA, uint16_nonzero,, "Port number for network interface")
ATOM(uint16_t, drop_broadcasts, 0, uint16_nonzero,, "Percentage of incoming broadcast packets that should be dropped for testing purposes")
ATOM(bool_t, drop_broadcasts, 0, boolean,, "If true, drop all incoming broadcast packets")
ATOM(bool_t, drop_unicasts, 0, boolean,, "If true, drop all incoming unicast packets")
ATOM(uint16_t, drop_packets, 0, uint16_nonzero,, "Percentage of incoming packets that should be dropped for testing purposes")
ATOM(short, type, OVERLAY_INTERFACE_WIFI, interface_type,, "Type of network interface")
ATOM(int32_t, packet_interval, -1, int32_nonneg,, "Minimum interval between packets in microseconds")
ATOM(int32_t, mdp_tick_ms, -1, int32_nonneg,, "Override MDP tick interval for this interface")

@ -94,7 +94,7 @@ static void directory_update(struct sched_ent *alarm){
load_directory_config();
if (directory_service){
if (subscriber_is_reachable(directory_service) & REACHABLE){
if (directory_service->reachable & REACHABLE){
directory_send_keyring(directory_service);
unschedule(alarm);

@ -242,7 +242,8 @@ void keyring_free_identity(keyring_identity *id)
if (id->subscriber){
id->subscriber->identity=NULL;
set_reachable(id->subscriber, REACHABLE_NONE);
if (id->subscriber->reachable == REACHABLE_SELF)
id->subscriber->reachable = REACHABLE_NONE;
}
bzero(id,sizeof(keyring_identity));
@ -858,7 +859,8 @@ int keyring_decrypt_pkr(keyring_file *k,keyring_context *c,
if (id->keypairs[i]->type == KEYTYPE_CRYPTOBOX){
id->subscriber = find_subscriber(id->keypairs[i]->public_key, SID_SIZE, 1);
if (id->subscriber){
set_reachable(id->subscriber, REACHABLE_SELF);
if (id->subscriber->reachable==REACHABLE_NONE)
id->subscriber->reachable=REACHABLE_SELF;
id->subscriber->identity = id;
if (!my_subscriber)
my_subscriber=id->subscriber;
@ -1067,7 +1069,8 @@ keyring_identity *keyring_create_identity(keyring_file *k,keyring_context *c, co
// add new identity to in memory table
id->subscriber = find_subscriber(id->keypairs[0]->public_key, SID_SIZE, 1);
if (id->subscriber){
set_reachable(id->subscriber, REACHABLE_SELF);
if (id->subscriber->reachable==REACHABLE_NONE)
id->subscriber->reachable=REACHABLE_SELF;
id->subscriber->identity = id;
if (!my_subscriber)
my_subscriber=id->subscriber;

@ -208,20 +208,14 @@ int overlay_address_append(struct decode_context *context, struct overlay_buffer
return WHY("No address supplied");
if(context
&& context->packet_version>=1
&& context->interface
&& subscriber == context->interface->other_device
&& context->interface->point_to_point){
&& subscriber == context->point_to_point_device){
if (ob_append_byte(b, OA_CODE_P2P_YOU))
return -1;
}else if(context
&& context->packet_version>=1
&& context->interface
&& !subscriber->send_full
&& subscriber == my_subscriber
&& context->interface->other_device
&& context->interface->point_to_point
&& (!context->encoding_header || !context->interface->local_echo)){
&& context->point_to_point_device
&& (context->encoding_header==0 || !context->interface->local_echo)){
if (ob_append_byte(b, OA_CODE_P2P_ME))
return -1;
}else if (context && subscriber==context->sender){
@ -268,11 +262,11 @@ static int add_explain_response(struct subscriber *subscriber, void *context){
// if one of our identities is unknown,
// the header of this packet must include our full sid.
if (subscriber->reachable==REACHABLE_SELF){
subscriber->send_full=1;
if (subscriber==my_subscriber){
response->please_explain->source_full=1;
return 0;
}
subscriber->send_full=1;
}
// add the whole subscriber id to the payload, stop if we run out of space
@ -346,7 +340,8 @@ int overlay_address_parse(struct decode_context *context, struct overlay_buffer
switch(len){
case OA_CODE_P2P_YOU:
if (context->interface && context->interface->point_to_point){
// if we don't know who they are, we can't assume they mean us.
if (context->point_to_point_device){
*subscriber=my_subscriber;
context->previous=my_subscriber;
}else{
@ -356,11 +351,11 @@ int overlay_address_parse(struct decode_context *context, struct overlay_buffer
return 0;
case OA_CODE_P2P_ME:
if (context->interface && context->interface->point_to_point && context->interface->other_device){
*subscriber=context->interface->other_device;
if (context->point_to_point_device){
*subscriber=context->point_to_point_device;
context->previous=*subscriber;
}else{
WHYF("Could not resolve address on %s, I don't know who is on the other end of this link!", context->interface->name);
WHYF("Could not resolve address, I don't know who is on the other end of this link!");
context->invalid_addresses=1;
}
return 0;
@ -404,18 +399,23 @@ int send_please_explain(struct decode_context *context, struct subscriber *sourc
if (!context->sender)
frame->source_full=1;
frame->destination = destination;
if (destination && (destination->reachable & REACHABLE)){
if (destination){
frame->ttl = PAYLOAD_TTL_DEFAULT; // MAX?
frame->destination = destination;
frame->source_full=1;
}else{
// send both a broadcast & unicast response out the same interface this packet arrived on.
frame->ttl=1;// how will this work with olsr??
overlay_broadcast_generate_address(&frame->broadcast_id);
if (context->interface){
frame->destination_resolved=1;
frame->next_hop = destination;
frame->recvaddr = context->addr;
frame->interface = context->interface;
frame->destination = destination;
frame->destinations[frame->destination_count++].destination=add_destination_ref(context->interface->destination);
struct network_destination *dest = create_unicast_destination(context->addr, context->interface);
if (dest)
frame->destinations[frame->destination_count++].destination=dest;
}else{
FATAL("This context doesn't have an interface?");
}
}
@ -434,7 +434,8 @@ int process_explain(struct overlay_frame *frame){
struct decode_context context;
bzero(&context, sizeof context);
context.sender = frame->source;
context.interface = frame->interface;
while(ob_remaining(b)>0){
int len = ob_get(b);
if (len<=0 || len>SID_SIZE)

@ -54,9 +54,9 @@ struct subscriber{
// should we send the full address once?
int send_full;
// sequence number for this unicast or broadcast destination
int sequence;
int max_packet_version;
// overlay routing information
struct overlay_node *node;
@ -69,22 +69,14 @@ struct subscriber{
// result of routing calculations;
int reachable;
// highest seen packet version
int max_packet_version;
// if indirect, who is the next hop?
struct subscriber *next_hop;
// if direct, or unicast, where do we send packets?
struct overlay_interface *interface;
struct network_destination *destination;
// if reachable&REACHABLE_UNICAST send packets to this address, else use the interface broadcast address
struct sockaddr_in address;
time_ms_t last_stun_request;
time_ms_t last_probe;
time_ms_t last_probe_response;
time_ms_t last_tx;
time_ms_t last_explained;
// public signing key details for remote peers
@ -115,6 +107,7 @@ struct decode_context{
struct overlay_frame *please_explain;
struct subscriber *sender;
struct subscriber *previous;
struct subscriber *point_to_point_device;
};
extern struct subscriber *my_subscriber;
@ -122,9 +115,7 @@ extern struct subscriber *directory_service;
struct subscriber *find_subscriber(const unsigned char *sid, int len, int create);
void enum_subscribers(struct subscriber *start, int(*callback)(struct subscriber *, void *), void *context);
int subscriber_is_reachable(struct subscriber *subscriber);
int set_reachable(struct subscriber *subscriber, int reachable);
int reachable_unicast(struct subscriber *subscriber, overlay_interface *interface, struct in_addr addr, int port);
int set_reachable(struct subscriber *subscriber, struct network_destination *destination, struct subscriber *next_hop);
int load_subscriber_address(struct subscriber *subscriber);
int process_explain(struct overlay_frame *frame);

@ -139,7 +139,7 @@ int ob_unlimitsize(struct overlay_buffer *b)
return 0;
}
int ob_makespace(struct overlay_buffer *b,int bytes)
int _ob_makespace(struct __sourceloc __whence, struct overlay_buffer *b,int bytes)
{
if (b->sizeLimit!=-1 && b->position+bytes>b->sizeLimit) {
if (config.debug.packetformats) WHY("Asked to make space beyond size limit");
@ -209,16 +209,16 @@ int ob_makespace(struct overlay_buffer *b,int bytes)
Functions that append data and increase the size of the buffer if possible / required
*/
int ob_append_byte(struct overlay_buffer *b,unsigned char byte)
int _ob_append_byte(struct __sourceloc __whence, struct overlay_buffer *b,unsigned char byte)
{
if (ob_makespace(b,1)) return WHY("ob_makespace() failed");
if (_ob_makespace(__whence, b,1)) return WHY("ob_makespace() failed");
b->bytes[b->position++] = byte;
return 0;
}
unsigned char *ob_append_space(struct overlay_buffer *b,int count)
unsigned char *_ob_append_space(struct __sourceloc __whence, struct overlay_buffer *b,int count)
{
if (ob_makespace(b,count)) {
if (_ob_makespace(__whence, b,count)) {
WHY("ob_makespace() failed");
return NULL;
}
@ -228,31 +228,31 @@ unsigned char *ob_append_space(struct overlay_buffer *b,int count)
return r;
}
int ob_append_bytes(struct overlay_buffer *b, const unsigned char *bytes, int count)
int _ob_append_bytes(struct __sourceloc __whence, struct overlay_buffer *b, const unsigned char *bytes, int count)
{
if (ob_makespace(b,count)) return WHY("ob_makespace() failed");
if (_ob_makespace(__whence, b,count)) return WHY("ob_makespace() failed");
bcopy(bytes,&b->bytes[b->position],count);
b->position+=count;
return 0;
}
int ob_append_buffer(struct overlay_buffer *b, const struct overlay_buffer *s){
return ob_append_bytes(b, s->bytes, s->position);
int _ob_append_buffer(struct __sourceloc __whence, struct overlay_buffer *b, struct overlay_buffer *s){
return _ob_append_bytes(__whence, b, s->bytes, s->position);
}
int ob_append_ui16(struct overlay_buffer *b, uint16_t v)
int _ob_append_ui16(struct __sourceloc __whence, struct overlay_buffer *b, uint16_t v)
{
if (ob_makespace(b, 2)) return WHY("ob_makespace() failed");
if (_ob_makespace(__whence, b, 2)) return WHY("ob_makespace() failed");
b->bytes[b->position] = (v >> 8) & 0xFF;
b->bytes[b->position+1] = v & 0xFF;
b->position+=2;
return 0;
}
int ob_append_ui32(struct overlay_buffer *b, uint32_t v)
int _ob_append_ui32(struct __sourceloc __whence, struct overlay_buffer *b, uint32_t v)
{
if (ob_makespace(b, 4)) return WHY("ob_makespace() failed");
if (_ob_makespace(__whence, b, 4)) return WHY("ob_makespace() failed");
b->bytes[b->position] = (v >> 24) & 0xFF;
b->bytes[b->position+1] = (v >> 16) & 0xFF;
b->bytes[b->position+2] = (v >> 8) & 0xFF;
@ -261,9 +261,9 @@ int ob_append_ui32(struct overlay_buffer *b, uint32_t v)
return 0;
}
int ob_append_ui64(struct overlay_buffer *b, uint64_t v)
int _ob_append_ui64(struct __sourceloc __whence, struct overlay_buffer *b, uint64_t v)
{
if (ob_makespace(b, 8)) return WHY("ob_makespace() failed");
if (_ob_makespace(__whence, b, 8)) return WHY("ob_makespace() failed");
b->bytes[b->position] = (v >> 56) & 0xFF;
b->bytes[b->position+1] = (v >> 48) & 0xFF;
b->bytes[b->position+2] = (v >> 40) & 0xFF;
@ -310,7 +310,19 @@ int unpack_uint(unsigned char *buffer, int buff_size, uint64_t *v){
return i;
}
int ob_append_packed_ui32(struct overlay_buffer *b, uint32_t v)
int _ob_append_packed_ui32(struct __sourceloc __whence, struct overlay_buffer *b, uint32_t v)
{
do{
if (_ob_append_byte(__whence, b, (v&0x7f) | (v>0x7f?0x80:0)))
return -1;
v = v>>7;
}while(v!=0);
return 0;
}
int _ob_append_packed_ui64(struct __sourceloc __whence, struct overlay_buffer *b, uint64_t v)
{
do{
@ -322,24 +334,12 @@ int ob_append_packed_ui32(struct overlay_buffer *b, uint32_t v)
return 0;
}
int ob_append_packed_ui64(struct overlay_buffer *b, uint64_t v)
{
do{
if (ob_append_byte(b, (v&0x7f) | (v>0x7f?0x80:0)))
return -1;
v = v>>7;
}while(v!=0);
return 0;
}
int ob_append_rfs(struct overlay_buffer *b,int l)
int _ob_append_rfs(struct __sourceloc __whence, struct overlay_buffer *b, int l)
{
if (l<0||l>0xffff) return -1;
b->var_length_offset=b->position;
return ob_append_ui16(b,l);
return _ob_append_ui16(__whence, b,l);
}

@ -51,19 +51,33 @@ int ob_checkpoint(struct overlay_buffer *b);
int ob_rewind(struct overlay_buffer *b);
int ob_limitsize(struct overlay_buffer *b,int bytes);
int ob_unlimitsize(struct overlay_buffer *b);
int ob_makespace(struct overlay_buffer *b,int bytes);
int _ob_makespace(struct __sourceloc whence, struct overlay_buffer *b,int bytes);
int ob_set(struct overlay_buffer *b, int ofs, unsigned char byte);
int ob_append_byte(struct overlay_buffer *b,unsigned char byte);
int ob_append_bytes(struct overlay_buffer *b,const unsigned char *bytes,int count);
int ob_append_buffer(struct overlay_buffer *b,const struct overlay_buffer *s);
unsigned char *ob_append_space(struct overlay_buffer *b,int count);
int ob_append_ui16(struct overlay_buffer *b, uint16_t v);
int ob_append_ui32(struct overlay_buffer *b, uint32_t v);
int ob_append_ui64(struct overlay_buffer *b, uint64_t v);
int ob_append_packed_ui32(struct overlay_buffer *b, uint32_t v);
int ob_append_packed_ui64(struct overlay_buffer *b, uint64_t v);
int _ob_append_byte(struct __sourceloc whence, struct overlay_buffer *b,unsigned char byte);
int _ob_append_bytes(struct __sourceloc whence, struct overlay_buffer *b,const unsigned char *bytes,int count);
int _ob_append_buffer(struct __sourceloc whence, struct overlay_buffer *b,struct overlay_buffer *s);
unsigned char *_ob_append_space(struct __sourceloc whence, struct overlay_buffer *b,int count);
int _ob_append_ui16(struct __sourceloc whence, struct overlay_buffer *b, uint16_t v);
int _ob_append_ui32(struct __sourceloc whence, struct overlay_buffer *b, uint32_t v);
int _ob_append_ui64(struct __sourceloc whence, struct overlay_buffer *b, uint64_t v);
int _ob_append_packed_ui32(struct __sourceloc whence, struct overlay_buffer *b, uint32_t v);
int _ob_append_packed_ui64(struct __sourceloc whence, struct overlay_buffer *b, uint64_t v);
int _ob_append_rfs(struct __sourceloc whence, struct overlay_buffer *b,int l);
#define ob_makespace(b, bytes) _ob_makespace(__WHENCE__, b, bytes)
#define ob_append_byte(b, byte) _ob_append_byte(__WHENCE__, b, byte)
#define ob_append_bytes(b, bytes, count) _ob_append_bytes(__WHENCE__, b, bytes, count)
#define ob_append_buffer(b, s) _ob_append_buffer(__WHENCE__, b, s)
#define ob_append_space(b, count) _ob_append_space(__WHENCE__, b, count)
#define ob_append_ui16(b, v) _ob_append_ui16(__WHENCE__, b, v)
#define ob_append_ui32(b, v) _ob_append_ui32(__WHENCE__, b, v)
#define ob_append_ui64(b, v) _ob_append_ui64(__WHENCE__, b, v)
#define ob_append_packed_ui32(b, v) _ob_append_packed_ui32(__WHENCE__, b, v)
#define ob_append_packed_ui64(b, v) _ob_append_packed_ui64(__WHENCE__, b, v)
#define ob_append_rfs(b, l) _ob_append_rfs(__WHENCE__, b, l)
int ob_patch_rfs(struct overlay_buffer *b);
int ob_append_rfs(struct overlay_buffer *b,int l);
// get one byte, -ve number indicates failure
int ob_getbyte(struct overlay_buffer *b,int ofs);
// get one byte from the current position, -ve number indicates failure

@ -53,19 +53,11 @@ static int re_init_socket(int interface_index);
#define DEBUG_packet_visualise(M,P,N) logServalPacket(LOG_LEVEL_DEBUG, __WHENCE__, (M), (P), (N))
static int mark_subscriber_down(struct subscriber *subscriber, void *context)
{
overlay_interface *interface=context;
if ((subscriber->reachable & REACHABLE_DIRECT) && subscriber->interface == interface)
set_reachable(subscriber, REACHABLE_NONE);
return 0;
}
static void
overlay_interface_close(overlay_interface *interface){
link_interface_down(interface);
enum_subscribers(NULL, mark_subscriber_down, interface);
INFOF("Interface %s addr %s is down", interface->name, inet_ntoa(interface->broadcast_address.sin_addr));
INFOF("Interface %s addr %s is down",
interface->name, inet_ntoa(interface->address.sin_addr));
unschedule(&interface->alarm);
unwatch(&interface->alarm);
close(interface->alarm.poll.fd);
@ -193,7 +185,7 @@ static int interface_type_priority(int type)
}
// Which interface is better for routing packets?
// returns 0 to indicate the first is better, 1 for the second
// returns -1 to indicate the first is better, 0 for equal, 1 for the second
int overlay_interface_compare(overlay_interface *one, overlay_interface *two)
{
if (one==two)
@ -201,11 +193,9 @@ int overlay_interface_compare(overlay_interface *one, overlay_interface *two)
int p1 = interface_type_priority(one->type);
int p2 = interface_type_priority(two->type);
if (p1<p2)
return 0;
return -1;
if (p2<p1)
return 1;
if (two<one)
return 1;
return 0;
}
@ -318,9 +308,10 @@ overlay_interface_init_socket(int interface_index)
overlay_interface_init_any(interface->port);
const struct sockaddr *addr = (const struct sockaddr *)&interface->address;
interface->alarm.poll.fd = overlay_bind_socket(addr, sizeof(interface->broadcast_address), interface->name);
interface->alarm.poll.fd = overlay_bind_socket(
(const struct sockaddr *)&interface->address,
sizeof(interface->address), interface->name);
if (interface->alarm.poll.fd<0){
interface->state=INTERFACE_STATE_DOWN;
return WHYF("Failed to bind interface %s", interface->name);
@ -328,8 +319,8 @@ overlay_interface_init_socket(int interface_index)
if (config.debug.packetrx || config.debug.io) {
char srctxt[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, (const void *)&interface->broadcast_address.sin_addr, srctxt, INET_ADDRSTRLEN))
DEBUGF("Bound to %s:%d", srctxt, ntohs(interface->broadcast_address.sin_port));
if (inet_ntop(AF_INET, (const void *)&interface->address.sin_addr, srctxt, INET_ADDRSTRLEN))
DEBUGF("Bound to %s:%d", srctxt, ntohs(interface->address.sin_port));
}
interface->alarm.poll.events=POLLIN;
@ -376,23 +367,25 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
// copy ifconfig values
interface->drop_broadcasts = ifconfig->drop_broadcasts;
interface->drop_unicasts = ifconfig->drop_unicasts;
interface->drop_packets = ifconfig->drop_packets;
interface->port = ifconfig->port;
interface->type = ifconfig->type;
interface->send_broadcasts = ifconfig->send_broadcasts;
interface->prefer_unicast = ifconfig->prefer_unicast;
interface->default_route = ifconfig->default_route;
interface->socket_type = ifconfig->socket_type;
interface->encapsulation = ifconfig->encapsulation;
interface->uartbps = ifconfig->uartbps;
interface->ctsrts = ifconfig->ctsrts;
interface->destination = new_destination(interface, ifconfig->encapsulation);
/* Pick a reasonable default MTU.
This will ultimately get tuned by the bandwidth and other properties of the interface */
interface->mtu=1200;
interface->mtu = 1200;
interface->point_to_point = ifconfig->point_to_point;
interface->state=INTERFACE_STATE_DOWN;
interface->alarm.poll.fd=0;
interface->debug = ifconfig->debug;
interface->point_to_point = ifconfig->point_to_point;
// How often do we announce ourselves on this interface?
int tick_ms=-1;
@ -440,27 +433,21 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
tick_ms=0;
}else if (!interface->send_broadcasts){
INFOF("Interface %s is not sending any broadcast traffic!", name);
// no broadcast traffic implies no ticks
tick_ms=0;
}else if (tick_ms==0)
INFOF("Interface %s is running tickless", name);
if (tick_ms<0)
return WHYF("No tick interval %d specified for interface %s", interface->tick_ms, name);
return WHYF("No tick interval specified for interface %s", name);
interface->tick_ms = tick_ms;
interface->destination->tick_ms = tick_ms;
limit_init(&interface->transfer_limit, packet_interval);
limit_init(&interface->destination->transfer_limit, packet_interval);
interface->address.sin_family=AF_INET;
interface->address.sin_port = htons(ifconfig->port);
interface->address.sin_addr = ifconfig->dummy_address;
interface->netmask=ifconfig->dummy_netmask;
interface->broadcast_address.sin_family=AF_INET;
interface->broadcast_address.sin_port = htons(ifconfig->port);
interface->broadcast_address.sin_addr.s_addr = interface->address.sin_addr.s_addr | ~interface->netmask.s_addr;
interface->destination->address.sin_family=AF_INET;
interface->destination->address.sin_port = htons(ifconfig->port);
interface->alarm.function = overlay_interface_poll;
interface_poll_stats.name="overlay_interface_poll";
@ -468,7 +455,7 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
if (ifconfig->socket_type==SOCK_DGRAM){
interface->address.sin_addr = src_addr;
interface->broadcast_address.sin_addr = broadcast;
interface->destination->address.sin_addr = broadcast;
interface->netmask = netmask;
interface->local_echo = 1;
@ -477,6 +464,9 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
}else{
char read_file[1024];
interface->address.sin_addr = ifconfig->dummy_address;
interface->netmask = ifconfig->dummy_netmask;
interface->destination->address.sin_addr.s_addr = interface->address.sin_addr.s_addr | ~interface->netmask.s_addr;
interface->local_echo = interface->point_to_point?0:1;
strbuf d = strbuf_local(read_file, sizeof read_file);
@ -523,7 +513,9 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
directory_registration();
INFOF("Allowing a maximum of %d packets every %lldms", interface->transfer_limit.burst_size, interface->transfer_limit.burst_length);
INFOF("Allowing a maximum of %d packets every %lldms",
interface->destination->transfer_limit.burst_size,
interface->destination->transfer_limit.burst_length);
overlay_interface_count++;
return 0;
@ -597,17 +589,22 @@ struct file_packet{
};
static int should_drop(struct overlay_interface *interface, struct sockaddr_in addr){
if (interface->drop_packets>=100)
return 1;
if (memcmp(&addr, &interface->address, sizeof(addr))==0){
return interface->drop_unicasts;
}
if (memcmp(&addr, &interface->broadcast_address, sizeof(addr))==0){
if (interface->drop_broadcasts == 0)
return 0;
if (interface->drop_broadcasts >= 100)
if (interface->drop_unicasts)
return 1;
if (rand()%100 >= interface->drop_broadcasts)
return 0;
}
}else if (memcmp(&addr, &interface->destination->address, sizeof(addr))==0){
if (interface->drop_broadcasts)
return 1;
}else
return 1;
if (interface->drop_packets <= 0)
return 0;
if (rand()%100 >= interface->drop_packets)
return 0;
return 1;
}
@ -757,13 +754,15 @@ static void overlay_interface_poll(struct sched_ent *alarm)
alarm->alarm=-1;
time_ms_t now = gettime_ms();
if (interface->state==INTERFACE_STATE_UP && interface->tick_ms>0){
if (now >= interface->last_tx+interface->tick_ms)
overlay_send_tick_packet(interface);
alarm->alarm=interface->last_tx+interface->tick_ms;
alarm->deadline=alarm->alarm+interface->tick_ms/2;
if (interface->state==INTERFACE_STATE_UP
&& interface->destination->tick_ms>0
&& interface->send_broadcasts){
if (now >= interface->destination->last_tx+interface->destination->tick_ms)
overlay_send_tick_packet(interface->destination);
alarm->alarm=interface->destination->last_tx+interface->destination->tick_ms;
alarm->deadline=alarm->alarm+interface->destination->tick_ms/2;
}
switch(interface->socket_type){
case SOCK_DGRAM:
case SOCK_STREAM:
@ -813,17 +812,18 @@ static void overlay_interface_poll(struct sched_ent *alarm)
}
int
overlay_broadcast_ensemble(overlay_interface *interface,
struct sockaddr_in *recipientaddr,
overlay_broadcast_ensemble(struct network_destination *destination,
unsigned char *bytes,int len)
{
interface->last_tx = gettime_ms();
assert(destination && destination->interface);
struct overlay_interface *interface = destination->interface;
destination->last_tx = gettime_ms();
if (config.debug.packettx)
{
DEBUGF("Sending this packet via interface %s (len=%d)",interface->name,len);
DEBUG_packet_visualise(NULL,bytes,len);
}
if (config.debug.packettx){
DEBUGF("Sending this packet via interface %s (len=%d)",interface->name,len);
//DEBUG_packet_visualise(NULL,bytes,len);
}
if (interface->state!=INTERFACE_STATE_UP){
return WHYF("Cannot send to interface %s as it is down", interface->name);
@ -871,7 +871,7 @@ overlay_broadcast_ensemble(overlay_interface *interface,
{
struct file_packet packet={
.src_addr = interface->address,
.dst_addr = *recipientaddr,
.dst_addr = destination->address,
.pid = getpid(),
};
@ -904,14 +904,15 @@ overlay_broadcast_ensemble(overlay_interface *interface,
case SOCK_DGRAM:
{
if (config.debug.overlayinterfaces)
DEBUGF("Sending %d byte overlay frame on %s to %s",len,interface->name,inet_ntoa(recipientaddr->sin_addr));
DEBUGF("Sending %d byte overlay frame on %s to %s",len,interface->name,inet_ntoa(destination->address.sin_addr));
if(sendto(interface->alarm.poll.fd,
bytes, len, 0, (struct sockaddr *)recipientaddr, sizeof(struct sockaddr_in)) != len){
bytes, len, 0, (struct sockaddr *)&destination->address, sizeof(destination->address)) != len){
int e=errno;
WHY_perror("sendto(c)");
// only close the interface on some kinds of errors
if (e==ENETDOWN || e==EINVAL)
overlay_interface_close(interface);
// TODO mark unicast destination as failed
return -1;
}
return 0;
@ -969,7 +970,7 @@ overlay_interface_register(char *name,
int broadcast_match = 0;
int name_match =0;
if (overlay_interfaces[i].broadcast_address.sin_addr.s_addr == broadcast.s_addr)
if (overlay_interfaces[i].destination->address.sin_addr.s_addr == broadcast.s_addr)
broadcast_match = 1;
name_match = !strcasecmp(overlay_interfaces[i].name, name);
@ -998,7 +999,7 @@ overlay_interface_register(char *name,
if (found_interface>=0){
// try to reactivate the existing interface
overlay_interfaces[found_interface].address.sin_addr = addr;
overlay_interfaces[found_interface].broadcast_address.sin_addr = broadcast;
overlay_interfaces[found_interface].destination->address.sin_addr = broadcast;
overlay_interfaces[found_interface].netmask = mask;
return re_init_socket(found_interface);
}

@ -28,6 +28,8 @@ static void update_limit_state(struct limit_state *state, time_ms_t now){
/* When should we next allow this thing to occur? */
time_ms_t limit_next_allowed(struct limit_state *state){
time_ms_t now = gettime_ms();
if (!state->burst_length)
return now;
update_limit_state(state, now);
if (state->sent < state->burst_size)
@ -38,6 +40,8 @@ time_ms_t limit_next_allowed(struct limit_state *state){
/* Can we do this now? if so, track it */
int limit_is_allowed(struct limit_state *state){
time_ms_t now = gettime_ms();
if (!state->burst_length)
return 0;
update_limit_state(state, now);
if (state->sent >= state->burst_size){
return -1;
@ -50,6 +54,7 @@ int limit_is_allowed(struct limit_state *state){
int limit_init(struct limit_state *state, int rate_micro_seconds){
if (rate_micro_seconds==0){
state->burst_size=0;
state->burst_length=1;
}else{
state->burst_size = (MIN_BURST_LENGTH / rate_micro_seconds)+1;
state->burst_length = (state->burst_size * rate_micro_seconds) / 1000.0;
@ -57,76 +62,40 @@ int limit_init(struct limit_state *state, int rate_micro_seconds){
return 0;
}
// quick test to make sure the specified route is valid.
int subscriber_is_reachable(struct subscriber *subscriber){
if (!subscriber)
return REACHABLE_NONE;
int set_reachable(struct subscriber *subscriber,
struct network_destination *destination, struct subscriber *next_hop){
int ret = subscriber->reachable;
int reachable = REACHABLE_NONE;
if (destination)
reachable = destination->unicast?REACHABLE_UNICAST:REACHABLE_BROADCAST;
else if(next_hop)
reachable = REACHABLE_INDIRECT;
if (ret==REACHABLE_INDIRECT){
if (!subscriber->next_hop)
ret = REACHABLE_NONE;
// avoid infinite recursion...
else if (!(subscriber->next_hop->reachable & REACHABLE_DIRECT))
ret = REACHABLE_NONE;
else{
int r = subscriber_is_reachable(subscriber->next_hop);
if (r&REACHABLE_ASSUMED)
ret = REACHABLE_NONE;
else if (!(r & REACHABLE_DIRECT))
ret = REACHABLE_NONE;
}
}
if (ret & REACHABLE_DIRECT){
// make sure the interface is still up
if (!subscriber->interface)
ret=REACHABLE_NONE;
else if (subscriber->interface->state!=INTERFACE_STATE_UP)
ret=REACHABLE_NONE;
}
return ret;
}
int set_reachable(struct subscriber *subscriber, int reachable){
if (subscriber->reachable==reachable)
if (subscriber->reachable==reachable
&& subscriber->next_hop==next_hop
&& subscriber->destination == destination)
return 0;
int old_value = subscriber->reachable;
subscriber->reachable=reachable;
subscriber->reachable = reachable;
set_destination_ref(&subscriber->destination, destination);
subscriber->next_hop = next_hop;
// These log messages are for use in tests. Changing them may break test scripts.
if (config.debug.overlayrouting) {
if (config.debug.overlayrouting || config.debug.linkstate) {
switch (reachable) {
case REACHABLE_NONE:
DEBUGF("NOT REACHABLE sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_SELF:
break;
case REACHABLE_INDIRECT:
DEBUGF("REACHABLE INDIRECTLY sid=%s", alloca_tohex_sid(subscriber->sid));
DEBUGF("(via %s, %d)",subscriber->next_hop?alloca_tohex_sid(subscriber->next_hop->sid):"NOONE!"
,subscriber->next_hop?subscriber->next_hop->reachable:0);
DEBUGF("REACHABLE INDIRECTLY sid=%s, via %s",
alloca_tohex_sid(subscriber->sid), alloca_tohex_sid(next_hop->sid));
break;
case REACHABLE_UNICAST:
DEBUGF("REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
DEBUGF("REACHABLE VIA UNICAST sid=%s, on %s ", alloca_tohex_sid(subscriber->sid), destination->interface->name);
break;
case REACHABLE_BROADCAST:
DEBUGF("REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_UNICAST:
DEBUGF("REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_UNICAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_UNICAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
DEBUGF("REACHABLE VIA BROADCAST sid=%s, on %s ", alloca_tohex_sid(subscriber->sid), destination->interface->name);
break;
}
}
@ -144,24 +113,7 @@ int set_reachable(struct subscriber *subscriber, int reachable){
if ((!(old_value & REACHABLE)) && (reachable & REACHABLE))
monitor_announce_peer(subscriber->sid);
return 0;
}
// mark the subscriber as reachable via reply unicast packet
int reachable_unicast(struct subscriber *subscriber, overlay_interface *interface, struct in_addr addr, int port){
if (subscriber->reachable&REACHABLE)
return -1;
if (subscriber->node)
return -1;
subscriber->interface = interface;
subscriber->address.sin_family = AF_INET;
subscriber->address.sin_addr = addr;
subscriber->address.sin_port = htons(port);
set_reachable(subscriber, REACHABLE_UNICAST);
return 0;
return 1;
}
int resolve_name(const char *name, struct in_addr *addr){
@ -191,7 +143,7 @@ int resolve_name(const char *name, struct in_addr *addr){
// load a unicast address from configuration
int load_subscriber_address(struct subscriber *subscriber)
{
if (subscriber_is_reachable(subscriber)&REACHABLE)
if (!subscriber || subscriber->reachable&REACHABLE)
return 0;
int i = config_host_list__get(&config.hosts, (const sid_t*)subscriber->sid);
// No unicast configuration? just return.
@ -220,12 +172,17 @@ int load_subscriber_address(struct subscriber *subscriber)
}
if (config.debug.overlayrouting)
DEBUGF("Loaded address %s:%d for %s", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), alloca_tohex_sid(subscriber->sid));
return overlay_send_probe(subscriber, addr, interface, OQ_MESH_MANAGEMENT);
struct network_destination *destination = create_unicast_destination(addr, interface);
if (!destination)
return -1;
int ret=overlay_send_probe(subscriber, destination, OQ_MESH_MANAGEMENT);
release_destination_ref(destination);
return ret;
}
/* Collection of unicast echo responses to detect working links */
int
overlay_mdp_service_probe(overlay_mdp_frame *mdp)
overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp)
{
IN();
if (mdp->out.src.port!=MDP_PORT_ECHO || mdp->out.payload_length != sizeof(struct probe_contents)){
@ -233,8 +190,7 @@ overlay_mdp_service_probe(overlay_mdp_frame *mdp)
RETURN(-1);
}
struct subscriber *peer = find_subscriber(mdp->out.src.sid, SID_SIZE, 0);
if (peer->reachable == REACHABLE_SELF)
if (frame->source->reachable == REACHABLE_SELF)
RETURN(0);
struct probe_contents probe;
@ -242,62 +198,19 @@ overlay_mdp_service_probe(overlay_mdp_frame *mdp)
if (probe.addr.sin_family!=AF_INET)
RETURN(WHY("Unsupported address family"));
struct overlay_interface *interface = &overlay_interfaces[probe.interface];
// if a peer is already reachable, and this probe would change the interface, ignore it
// TODO track unicast links better in route_link.c
if (peer->reachable & REACHABLE_INDIRECT)
RETURN(0);
if (peer->reachable & REACHABLE_DIRECT && peer->interface && peer->interface != interface)
RETURN(0);
peer->last_probe_response = gettime_ms();
peer->interface = &overlay_interfaces[probe.interface];
peer->address.sin_family = AF_INET;
peer->address.sin_addr = probe.addr.sin_addr;
peer->address.sin_port = probe.addr.sin_port;
int r=REACHABLE_UNICAST;
// Don't turn assumed|broadcast into unicast|broadcast
if (!(peer->reachable & REACHABLE_ASSUMED))
r |= (peer->reachable & REACHABLE_DIRECT);
set_reachable(peer, r);
RETURN(0);
RETURN(link_unicast_ack(frame->source, &overlay_interfaces[probe.interface], probe.addr));
OUT();
}
int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay_interface *interface, int queue){
if (interface==NULL)
interface = overlay_interface_find(addr.sin_addr, 1);
if (!interface)
return WHY("I don't know which interface to use");
if (interface->state!=INTERFACE_STATE_UP)
return WHY("I can't send a probe if the interface is down.");
// don't send a unicast probe unless its on the same interface that is already known to be reachable
if (peer && peer->reachable & REACHABLE_INDIRECT)
return -1;
if (peer && (peer->reachable & REACHABLE_DIRECT) && peer->interface && peer->interface != interface)
return -1;
if (addr.sin_addr.s_addr==0) {
if (config.debug.overlayinterfaces)
DEBUG("I can't send a probe to address 0.0.0.0");
return -1;
}
if (addr.sin_port==0) {
if (config.debug.overlayinterfaces)
DEBUG("I can't send a probe to port 0");
return -1;
}
int overlay_send_probe(struct subscriber *peer, struct network_destination *destination, int queue){
// never send unicast probes over a stream interface
if (interface->socket_type==SOCK_STREAM)
if (destination->interface->socket_type==SOCK_STREAM)
return 0;
time_ms_t now = gettime_ms();
if (peer && peer->last_probe+1000>now)
// though unicast probes don't typically use the same network destination,
// we should still try to throttle when we can
if (destination->last_tx + destination->tick_ms > now)
return -1;
struct overlay_frame *frame=malloc(sizeof(struct overlay_frame));
@ -307,17 +220,11 @@ int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay
frame->next_hop = frame->destination = peer;
frame->ttl=1;
frame->queue=queue;
frame->destination_resolved=1;
frame->recvaddr=addr;
frame->unicast=1;
frame->interface=interface;
frame->destinations[frame->destination_count++].destination=add_destination_ref(destination);
frame->payload = ob_new();
frame->source_full = 1;
// TODO call mdp payload encryption / signing without calling overlay_mdp_dispatch...
if (peer)
peer->last_probe=gettime_ms();
if (overlay_mdp_encode_ports(frame->payload, MDP_PORT_ECHO, MDP_PORT_PROBE)){
op_free(frame);
return -1;
@ -329,9 +236,9 @@ int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay
return -1;
}
struct probe_contents probe;
probe.addr=addr;
probe.addr=destination->address;
// get interface number
probe.interface = interface - overlay_interfaces;
probe.interface = destination->interface - overlay_interfaces;
bcopy(&probe, dst, sizeof(struct probe_contents));
if (overlay_payload_enqueue(frame)){
op_free(frame);
@ -339,47 +246,34 @@ int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay
}
if (config.debug.overlayrouting)
DEBUGF("Queued probe packet on interface %s to %s:%d for %s",
interface->name, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), peer?alloca_tohex_sid(peer->sid):"ANY");
destination->interface->name,
inet_ntoa(destination->address.sin_addr), ntohs(destination->address.sin_port),
peer?alloca_tohex_sid(peer->sid):"ANY");
return 0;
}
// append the address of a unicast link into a packet buffer
static int overlay_append_unicast_address(struct subscriber *subscriber, struct overlay_buffer *buff)
{
if (subscriber->reachable & REACHABLE_ASSUMED || !(subscriber->reachable & REACHABLE_UNICAST)){
if (subscriber->destination
&& subscriber->destination->unicast
&& subscriber->destination->address.sin_family==AF_INET){
if (overlay_address_append(NULL, buff, subscriber))
return -1;
if (ob_append_ui32(buff, subscriber->destination->address.sin_addr.s_addr))
return -1;
if (ob_append_ui16(buff, subscriber->destination->address.sin_port))
return -1;
ob_checkpoint(buff);
if (config.debug.overlayrouting)
DEBUGF("Added STUN info for %s", alloca_tohex_sid(subscriber->sid));
}else{
if (config.debug.overlayrouting)
DEBUGF("Unable to give address of %s, %d", alloca_tohex_sid(subscriber->sid),subscriber->reachable);
return 0;
}
if (overlay_address_append(NULL, buff, subscriber))
return -1;
if (ob_append_ui32(buff, subscriber->address.sin_addr.s_addr))
return -1;
if (ob_append_ui16(buff, subscriber->address.sin_port))
return -1;
ob_checkpoint(buff);
if (config.debug.overlayrouting)
DEBUGF("Added STUN info for %s", alloca_tohex_sid(subscriber->sid));
return 0;
}
// append the address of all neighbour unicast links into a packet buffer
/*
static int overlay_append_local_unicasts(struct subscriber *subscriber, void *context)
{
struct overlay_buffer *buff = context;
if ((!subscriber->interface) ||
(!(subscriber->reachable & REACHABLE_UNICAST)) ||
(subscriber->reachable & REACHABLE_ASSUMED))
return 0;
if ((subscriber->address.sin_addr.s_addr & subscriber->interface->netmask.s_addr) !=
(subscriber->interface->address.sin_addr.s_addr & subscriber->interface->netmask.s_addr))
return 0;
return overlay_append_unicast_address(subscriber, buff);
}
*/
int overlay_mdp_service_stun_req(overlay_mdp_frame *mdp)
{
if (config.debug.overlayrouting)
@ -455,7 +349,11 @@ int overlay_mdp_service_stun(overlay_mdp_frame *mdp)
if (!subscriber || (subscriber->reachable!=REACHABLE_NONE))
continue;
overlay_send_probe(subscriber, addr, NULL, OQ_MESH_MANAGEMENT);
struct network_destination *destination = create_unicast_destination(addr, NULL);
if (destination){
overlay_send_probe(subscriber, destination, OQ_MESH_MANAGEMENT);
release_destination_ref(destination);
}
}
ob_free(buff);
@ -465,11 +363,10 @@ int overlay_mdp_service_stun(overlay_mdp_frame *mdp)
int overlay_send_stun_request(struct subscriber *server, struct subscriber *request){
if ((!server) || (!request))
return -1;
if (!(subscriber_is_reachable(server)&REACHABLE))
if (!(server->reachable&REACHABLE))
return -1;
// don't bother with a stun request if the peer is already reachable directly
// TODO link timeouts
if (subscriber_is_reachable(request)&REACHABLE_DIRECT)
if (request->reachable&REACHABLE_DIRECT)
return -1;
time_ms_t now = gettime_ms();

@ -842,8 +842,10 @@ static int routing_table(struct subscriber *subscriber, void *context){
if (subscriber->reachable==REACHABLE_INDIRECT && subscriber->next_hop)
memcpy(r->neighbour, subscriber->next_hop->sid, SID_SIZE);
if (subscriber->reachable & REACHABLE_DIRECT && subscriber->interface)
strcpy(r->interface_name, subscriber->interface->name);
if (subscriber->reachable & REACHABLE_DIRECT
&& subscriber->destination
&& subscriber->destination->interface)
strcpy(r->interface_name, subscriber->destination->interface->name);
else
r->interface_name[0]=0;
overlay_mdp_reply(mdp_named.poll.fd, state->recvaddr_un, state->recvaddrlen, &reply);
@ -872,7 +874,12 @@ static void overlay_mdp_scan(struct sched_ent *alarm)
while(state->current <= stop){
addr.sin_addr.s_addr=htonl(state->current);
if (addr.sin_addr.s_addr != state->interface->address.sin_addr.s_addr){
if (overlay_send_probe(NULL, addr, state->interface, OQ_ORDINARY))
struct network_destination *destination = create_unicast_destination(addr, state->interface);
if (!destination)
break;
int ret = overlay_send_probe(NULL, destination, OQ_ORDINARY);
release_destination_ref(destination);
if (ret)
break;
}
state->current++;
@ -999,7 +1006,7 @@ void overlay_mdp_poll(struct sched_ent *alarm)
scans[i].interface = interface;
scans[i].current = ntohl(interface->address.sin_addr.s_addr & interface->netmask.s_addr)+1;
scans[i].last = ntohl(interface->broadcast_address.sin_addr.s_addr)-1;
scans[i].last = ntohl(interface->destination->address.sin_addr.s_addr)-1;
if (scans[i].last - scans[i].current>0x10000){
INFOF("Skipping scan on interface %s as the address space is too large",interface->name);
continue;

@ -74,23 +74,15 @@ int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
int send_broadcast=1;
if (dest){
if (!(dest->reachable&REACHABLE_DIRECT))
send_broadcast=0;
if (dest->reachable&REACHABLE_UNICAST && dest->interface && dest->interface->prefer_unicast)
send_broadcast=0;
}
if (send_broadcast){
if (dest && dest->reachable&REACHABLE_UNICAST){
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(dest->sid, reply.out.dst.sid, SID_SIZE);
}else{
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
memset(reply.out.dst.sid,0xff,SID_SIZE);
reply.out.ttl=1;
}else{
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(dest->sid, reply.out.dst.sid, SID_SIZE);
}
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
@ -404,13 +396,13 @@ int overlay_mdp_try_interal_services(struct overlay_frame *frame, overlay_mdp_fr
{
IN();
switch(mdp->out.dst.port) {
case MDP_PORT_LINKSTATE: RETURN(link_receive(mdp));
case MDP_PORT_LINKSTATE: RETURN(link_receive(frame, mdp));
case MDP_PORT_VOMP: RETURN(vomp_mdp_received(mdp));
case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp));
case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp));
case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp));
case MDP_PORT_TRACE: RETURN(overlay_mdp_service_trace(mdp));
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(mdp));
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(frame, mdp));
case MDP_PORT_STUNREQ: RETURN(overlay_mdp_service_stun_req(mdp));
case MDP_PORT_STUN: RETURN(overlay_mdp_service_stun(mdp));
case MDP_PORT_RHIZOME_REQUEST:

@ -144,18 +144,9 @@ static void parse_frame(struct overlay_buffer *buff){
goto end;
// locate the interface we should send outgoing unicast packets to
overlay_interface *interface = overlay_interface_find(*addr, 1);
if (interface){
// always update the IP address we heard them from, even if we don't need to use it right now
context.sender->address.sin_family = AF_INET;
context.sender->address.sin_addr = *addr;
// assume the port number of the other servald matches our local port number configuration
context.sender->address.sin_port = htons(interface->port);
if (context.sender->reachable==REACHABLE_NONE){
set_reachable(context.sender, REACHABLE_UNICAST|REACHABLE_ASSUMED);
overlay_send_probe(context.sender, context.sender->address, interface, OQ_MESH_MANAGEMENT);
}
context.interface = overlay_interface_find(*addr, 1);
if (context.interface){
link_received_packet(&context, -1, 0);
}
// read subscriber id of payload origin

@ -26,6 +26,15 @@
#define FRAME_NOT_SENT -1
#define FRAME_DONT_SEND -2
#define MAX_PACKET_DESTINATIONS OVERLAY_MAX_INTERFACES
struct packet_destination{
// if we've sent this packet once, what was the envelope sequence number?
int sent_sequence;
time_ms_t delay_until;
struct network_destination *destination;
};
struct overlay_frame {
struct overlay_frame *prev;
struct overlay_frame *next;
@ -39,33 +48,33 @@ struct overlay_frame {
void *send_context;
int (*send_hook)(struct overlay_frame *, int seq, void *context);
/* What sequence number have we used to send this packet on this interface.
*/
int interface_sent_sequence[OVERLAY_MAX_INTERFACES];
time_ms_t delay_until;
struct packet_destination destinations[MAX_PACKET_DESTINATIONS];
int destination_count;
int transmit_count;
// each payload gets a sequence number that is reused on retransmission
int32_t mdp_sequence;
time_ms_t interface_dont_send_until[OVERLAY_MAX_INTERFACES];
struct broadcast broadcast_id;
// null if destination is broadcast
struct subscriber *destination;
struct broadcast broadcast_id;
struct subscriber *next_hop;
// should we force the encoding to include the entire source public key?
int source_full;
struct subscriber *source;
/* IPv4 address the frame was received from, or should be sent to */
int destination_resolved;
/* IPv4 address the frame was received from */
struct sockaddr_in recvaddr;
overlay_interface *interface;
char unicast;
int packet_version;
time_ms_t dont_send_until;
int sender_interface;
/* Actual payload */
struct overlay_buffer *payload;
time_ms_t enqueued_at;
};

@ -35,7 +35,6 @@ struct sockaddr_in loopback;
int overlay_packet_init_header(int packet_version, int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, int seq){
if (packet_version <0 || packet_version > SUPPORTED_PACKET_VERSION)
@ -47,9 +46,17 @@ int overlay_packet_init_header(int packet_version, int encapsulation,
return -1;
if (ob_append_byte(buff, encapsulation))
return -1;
if (context->interface->point_to_point
&& context->interface->other_device
&& packet_version>=1)
context->point_to_point_device = context->interface->other_device;
context->encoding_header=1;
if (overlay_address_append(context, buff, my_subscriber))
return -1;
context->encoding_header=0;
context->sender = my_subscriber;
@ -84,7 +91,7 @@ int process_incoming_frame(time_ms_t now, struct overlay_interface *interface, s
break;
// data frames
case OF_TYPE_RHIZOME_ADVERT:
overlay_rhizome_saw_advertisements(id,f,now);
overlay_rhizome_saw_advertisements(id,context,f,now);
break;
case OF_TYPE_DATA:
case OF_TYPE_DATA_VOICE:
@ -257,9 +264,11 @@ int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *f
int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface *interface,
struct sockaddr_in *addr, struct overlay_buffer *buffer){
IN();
time_ms_t now = gettime_ms();
context->interface = interface;
if (interface->point_to_point && interface->other_device)
context->point_to_point_device = interface->other_device;
context->sender_interface = 0;
context->packet_version = ob_get(buffer);
@ -284,60 +293,32 @@ int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface
if (packet_flags & PACKET_SEQ)
sender_seq = ob_get(buffer)&0xFF;
if (addr)
context->addr=*addr;
if (context->sender){
// ignore packets that have been reflected back to me
if (context->sender->reachable==REACHABLE_SELF){
if (config.debug.verbose && config.debug.overlayframes)
DEBUG("Completely ignore packets I sent");
RETURN(1);
}
if (context->sender->max_packet_version < context->packet_version)
context->sender->max_packet_version = context->packet_version;
if (context->packet_version > context->sender->max_packet_version)
context->sender->max_packet_version=context->packet_version;
if (interface->point_to_point && interface->other_device!=context->sender){
INFOF("Established point to point link with %s on %s", alloca_tohex_sid(context->sender->sid), interface->name);
context->interface->other_device = context->sender;
}
// TODO probe unicast links when we detect an address change.
// if this is a dummy announcement for a node that isn't in our routing table
if (context->sender->reachable == REACHABLE_NONE) {
context->sender->interface = interface;
if (addr)
context->sender->address = *addr;
else
bzero(&context->sender->address, sizeof context->sender->address);
context->sender->last_probe = 0;
// assume for the moment, that we can reply with the same packet type
if (packet_flags&PACKET_UNICAST){
set_reachable(context->sender, REACHABLE_UNICAST|REACHABLE_ASSUMED);
}else{
set_reachable(context->sender, REACHABLE_BROADCAST|REACHABLE_ASSUMED);
}
context->point_to_point_device = context->interface->other_device = context->sender;
}
/* Note the probe payload must be queued before any SID/SAS request so we can force the packet to have a full sid */
if (addr && (context->sender->last_probe==0 || now - context->sender->last_probe > interface->tick_ms*10))
overlay_send_probe(context->sender, *addr, interface, OQ_MESH_MANAGEMENT);
link_received_packet(context->sender, interface, context->sender_interface, sender_seq, packet_flags & PACKET_UNICAST);
}else{
// send a unicast probe, just incase they never hear our broadcasts.
if (addr)
overlay_send_probe(NULL, *addr, interface, OQ_MESH_MANAGEMENT);
if (config.debug.overlayframes)
DEBUGF("Received %s packet seq %d from %s on %s",
packet_flags & PACKET_UNICAST?"unicast":"broadcast",
sender_seq, alloca_tohex_sid(context->sender->sid), interface->name);
}
if (addr){
if (packet_flags & PACKET_UNICAST)
context->addr=*addr;
else
context->addr=interface->broadcast_address;
}
link_received_packet(context, sender_seq, packet_flags & PACKET_UNICAST);
RETURN(0);
OUT();
}
@ -409,7 +390,7 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
struct overlay_buffer *b = ob_static(packet, len);
ob_limitsize(b, len);
context.interface = f.interface = interface;
f.interface = interface;
if (recvaddr)
f.recvaddr = *((struct sockaddr_in *)recvaddr);
else
@ -423,6 +404,7 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
ob_free(b);
RETURN(ret);
}
f.sender_interface = context.sender_interface;
while(ob_remaining(b)>0){
context.invalid_addresses=0;
@ -436,7 +418,7 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
break;
}
// TODO allow for one byte length?
// TODO allow for single byte length?
unsigned int payload_len;
switch (context.encapsulation){
@ -449,7 +431,8 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
if (payload_len > ob_remaining(b)){
unsigned char *current = ob_ptr(b)+ob_position(b);
dump("Payload Header", header_start, current - header_start);
if (config.debug.overlayframes)
dump("Payload Header", header_start, current - header_start);
ret = WHYF("Invalid payload length (%d)", payload_len);
goto end;
}

@ -80,7 +80,7 @@ static int overlay_frame_build_header(int packet_version, struct decode_context
return 0;
}
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
int overlay_frame_append_payload(struct decode_context *context, int encapsulation,
struct overlay_frame *p, struct overlay_buffer *b)
{
/* Convert a payload (frame) structure into a series of bytes.
@ -110,7 +110,7 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa
p->destination, p->source))
goto cleanup;
if (interface->encapsulation == ENCAP_OVERLAY){
if (encapsulation == ENCAP_OVERLAY){
if (ob_append_ui16(b, ob_position(p->payload)))
goto cleanup;
}

@ -37,13 +37,11 @@ typedef struct overlay_txqueue {
overlay_txqueue overlay_tx[OQ_MAX];
// short lived data while we are constructing an outgoing packet
struct outgoing_packet{
overlay_interface *interface;
int32_t seq;
struct network_destination *destination;
int seq;
int packet_version;
int i;
struct subscriber *unicast_subscriber;
struct sockaddr_in dest;
int header_length;
struct overlay_buffer *buffer;
struct decode_context context;
@ -93,6 +91,9 @@ overlay_queue_remove(overlay_txqueue *queue, struct overlay_frame *frame){
queue->length--;
while(frame->destination_count>0)
release_destination_ref(frame->destinations[--frame->destination_count].destination);
op_free(frame);
return next;
@ -151,32 +152,9 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (!p) return WHY("Cannot queue NULL");
do{
if (p->destination_resolved)
break;
if (!p->destination)
break;
int r = subscriber_is_reachable(p->destination);
if (r&REACHABLE)
break;
if (directory_service){
r = subscriber_is_reachable(directory_service);
if (r&REACHABLE)
break;
}
return WHYF("Cannot send %x packet, destination %s is %s", p->type,
alloca_tohex_sid(p->destination->sid), r==REACHABLE_SELF?"myself":"unreachable");
} while(0);
if (p->queue>=OQ_MAX)
return WHY("Invalid queue specified");
/* queue a unicast probe if we haven't for a while. */
if (p->destination && (p->destination->last_probe==0 || gettime_ms() - p->destination->last_probe > 5000))
overlay_send_probe(p->destination, p->destination->address, p->destination->interface, OQ_MESH_MANAGEMENT);
overlay_txqueue *queue = &overlay_tx[p->queue];
if (config.debug.packettx)
@ -192,12 +170,7 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (queue->length>=queue->maxLength)
return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength);
{
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
p->interface_sent_sequence[i]=FRAME_DONT_SEND;
}
if (ob_position(p->payload)>=MDP_MTU)
FATAL("Queued packet is too big");
@ -205,49 +178,35 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (p->packet_version<=0)
p->packet_version=1;
if (p->destination_resolved){
p->interface_sent_sequence[p->interface - overlay_interfaces]=FRAME_NOT_SENT;
}else{
if (p->destination){
// allow the packet to be resent
if (p->resend == 0)
p->resend = 1;
}else{
int i;
int interface_copies = 0;
if (config.debug.verbose && config.debug.overlayframes)
DEBUGF("Enqueue packet %p", p);
if (p->destination_count==0){
if (!p->destination){
// hook to allow for flooding via olsr
olsr_send(p);
// make sure there is an interface up that allows broadcasts
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP
|| !overlay_interfaces[i].send_broadcasts)
continue;
int oldest_version = link_state_interface_oldest_neighbour(&overlay_interfaces[i]);
if (oldest_version <0){
if (config.debug.verbose && config.debug.overlayframes)
DEBUGF("Skipping broadcast on interface %s, as we have no neighbours", overlay_interfaces[i].name);
continue;
}
// make sure all neighbours can hear this packet
if (oldest_version < p->packet_version)
p->packet_version = oldest_version;
p->interface_sent_sequence[i]=FRAME_NOT_SENT;
interface_copies++;
}
link_add_destinations(p);
// just drop it now
if (interface_copies == 0){
if (p->destination_count == 0){
if (config.debug.verbose && config.debug.overlayframes)
DEBUGF("Not transmitting broadcast packet, as we have no neighbours on any interface");
DEBUGF("Not transmitting, as we have no neighbours on any interface");
return -1;
}
// allow the packet to be resent
if (p->resend == 0)
p->resend = 1;
}
// allow the packet to be resent
if (p->resend == 0)
p->resend = 1;
}
if (config.debug.verbose && config.debug.overlayframes){
int i=0;
for (i=0;i<p->destination_count;i++)
DEBUGF("Sending %s on interface %s",
p->destinations[i].destination->unicast?"unicast":"broadcast",
p->destinations[i].destination->interface->name);
}
struct overlay_frame *l=queue->last;
@ -267,31 +226,31 @@ int overlay_payload_enqueue(struct overlay_frame *p)
}
static void
overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination,
int unicast, int packet_version,
overlay_interface *interface, struct sockaddr_in addr){
packet->interface = interface;
packet->context.interface = interface;
packet->i = (interface - overlay_interfaces);
packet->dest=addr;
overlay_init_packet(struct outgoing_packet *packet, int packet_version,
struct network_destination *destination){
packet->context.interface = destination->interface;
packet->buffer=ob_new();
packet->seq=-1;
packet->packet_version = packet_version;
packet->context.packet_version = packet_version;
if (unicast)
packet->unicast_subscriber = destination;
packet->destination = add_destination_ref(destination);
if (destination->sequence_number<0)
packet->seq=-1;
else
packet->seq = interface->sequence_number = (interface->sequence_number + 1)&0xFFFF;
ob_limitsize(packet->buffer, packet->interface->mtu);
packet->seq = destination->sequence_number = (destination->sequence_number + 1) & 0xFFFF;
overlay_packet_init_header(packet_version, interface->encapsulation, &packet->context, packet->buffer,
destination, unicast, packet->i, packet->seq);
ob_limitsize(packet->buffer, destination->interface->mtu);
int i=destination->interface - overlay_interfaces;
overlay_packet_init_header(packet_version, destination->encapsulation,
&packet->context, packet->buffer,
destination->unicast,
i, packet->seq);
packet->header_length = ob_position(packet->buffer);
if (config.debug.overlayframes)
DEBUGF("Creating packet for interface %s, seq %d, %s",
interface->name, packet->seq,
unicast?"unicast":"broadcast");
DEBUGF("Creating %d packet for interface %s, seq %d, %s",
packet_version,
destination->interface->name, destination->sequence_number,
destination->unicast?"unicast":"broadcast");
}
int overlay_queue_schedule_next(time_ms_t next_allowed_packet){
@ -311,52 +270,43 @@ int overlay_queue_schedule_next(time_ms_t next_allowed_packet){
return 0;
}
static void remove_destination(struct overlay_frame *frame, int i){
release_destination_ref(frame->destinations[i].destination);
frame->destination_count --;
if (i<frame->destination_count)
frame->destinations[i]=frame->destinations[frame->destination_count];
}
// update the alarm time and return 1 if changed
static int
overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
do{
if (frame->destination_resolved)
break;
if (!frame->destination)
break;
if (subscriber_is_reachable(frame->destination)&REACHABLE)
break;
if (directory_service){
if (subscriber_is_reachable(directory_service)&REACHABLE)
break;
}
// ignore payload alarm if the destination is currently unreachable
return 0;
}while(0);
time_ms_t next_allowed_packet=0;
if (frame->destination_resolved && frame->interface){
// don't include interfaces which are currently transmitting using a serial buffer
if (frame->interface->tx_bytes_pending>0)
return 0;
next_allowed_packet = limit_next_allowed(&frame->interface->transfer_limit);
}else{
// check all interfaces
// check all interfaces
if (frame->destination_count>0){
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
for(i=0;i<frame->destination_count;i++)
{
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue;
if ((!frame->destination) && (frame->interface_sent_sequence[i]==FRAME_DONT_SEND ||
link_state_interface_oldest_neighbour(&overlay_interfaces[i])<0))
continue;
time_ms_t next_packet = limit_next_allowed(&overlay_interfaces[i].transfer_limit);
if (next_packet < frame->interface_dont_send_until[i])
next_packet = frame->interface_dont_send_until[i];
time_ms_t next_packet = limit_next_allowed(&frame->destinations[i].destination->transfer_limit);
if (next_packet < frame->destinations[i].delay_until)
next_packet = frame->destinations[i].delay_until;
if (next_allowed_packet==0||next_packet < next_allowed_packet)
next_allowed_packet = next_packet;
}
if (next_allowed_packet==0)
if (next_allowed_packet==0){
return 0;
}
}else{
if (!frame->destination){
return 0;
}
}
if (next_allowed_packet < frame->dont_send_until)
next_allowed_packet = frame->dont_send_until;
if (next_allowed_packet < frame->delay_until)
next_allowed_packet = frame->delay_until;
if (next_allowed_packet < frame->enqueued_at)
next_allowed_packet = frame->enqueued_at;
if (ob_position(frame->payload)<SMALL_PACKET_SIZE &&
next_allowed_packet < frame->enqueued_at + overlay_tx[frame->queue].small_packet_grace_interval)
@ -374,143 +324,94 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// TODO stop when the packet is nearly full?
while(frame){
if (frame->enqueued_at + queue->latencyTarget < now){
if (config.debug.rejecteddata)
if (config.debug.overlayframes)
DEBUGF("Dropping frame type %x for %s due to expiry timeout",
frame->type, frame->destination?alloca_tohex_sid(frame->destination->sid):"All");
frame = overlay_queue_remove(queue, frame);
continue;
}
/* Note, once we queue a broadcast packet we are committed to sending it out every interface,
even if we hear it from somewhere else in the mean time
/* Note, once we queue a broadcast packet we are currently
* committed to sending it to every destination,
* even if we hear it from somewhere else in the mean time
*/
// ignore payloads that are waiting for ack / nack resends
if (frame->dont_send_until > now)
if (frame->delay_until > now)
goto skip;
if (packet->buffer && packet->destination->encapsulation==ENCAP_SINGLE)
goto skip;
// quickly skip payloads that have no chance of fitting
if (packet->buffer && ob_limit(frame->payload) > ob_remaining(packet->buffer))
goto skip;
if (!frame->destination_resolved){
frame->next_hop = frame->destination;
if (frame->destination_count==0 && frame->destination){
link_add_destinations(frame);
if (frame->next_hop){
// Where do we need to route this payload next?
int r = subscriber_is_reachable(frame->next_hop);
// first, should we try to bounce this payload off the directory service?
if (r==REACHABLE_NONE &&
directory_service &&
frame->next_hop!=directory_service){
frame->next_hop=directory_service;
r=subscriber_is_reachable(directory_service);
// degrade packet version if required to reach the destination
if (frame->packet_version > frame->next_hop->max_packet_version)
frame->packet_version = frame->next_hop->max_packet_version;
}
int destination_index=-1;
{
int i;
for (i=frame->destination_count -1;i>=0;i--){
struct network_destination *dest = frame->destinations[i].destination;
if (!dest)
FATALF("Destination %d is NULL", i);
if (!dest->interface)
FATALF("Destination interface %d is NULL", i);
if (dest->interface->state!=INTERFACE_STATE_UP){
// remove this destination
remove_destination(frame, i);
continue;
}
// do we need to route via a neighbour?
if (r&REACHABLE_INDIRECT){
frame->next_hop = frame->next_hop->next_hop;
r = subscriber_is_reachable(frame->next_hop);
}
if (!(r&REACHABLE_DIRECT)){
goto skip;
}
frame->interface = frame->next_hop->interface;
// if both broadcast and unicast are available, pick on based on interface preference
if ((r&(REACHABLE_UNICAST|REACHABLE_BROADCAST))==(REACHABLE_UNICAST|REACHABLE_BROADCAST)){
if (frame->interface->prefer_unicast){
r=REACHABLE_UNICAST;
// used by tests
if (config.debug.overlayframes)
DEBUGF("Choosing to send via unicast for %s", alloca_tohex_sid(frame->destination->sid));
}else
r=REACHABLE_BROADCAST;
}
if(r&REACHABLE_UNICAST){
frame->recvaddr = frame->next_hop->address;
frame->unicast = 1;
}else
frame->recvaddr = frame->interface->broadcast_address;
// degrade packet version if required to reach the destination
if (frame->packet_version > frame->next_hop->max_packet_version)
frame->packet_version = frame->next_hop->max_packet_version;
frame->destination_resolved=1;
}else{
if (frame->destinations[i].delay_until>now)
continue;
if (packet->buffer){
// check if we can stuff into this packet
if (frame->interface_sent_sequence[packet->i]==FRAME_DONT_SEND || frame->interface_dont_send_until[packet->i] >now)
goto skip;
frame->interface = packet->interface;
frame->recvaddr = packet->interface->broadcast_address;
if (frame->packet_version!=packet->packet_version)
continue;
}else{
// find an interface that we haven't broadcast on yet
frame->interface = NULL;
int i, keep=0;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP ||
frame->interface_sent_sequence[i]==FRAME_DONT_SEND ||
link_state_interface_oldest_neighbour(&overlay_interfaces[i])<0)
continue;
keep=1;
if (frame->interface_dont_send_until[i] >now)
continue;
time_ms_t next_allowed = limit_next_allowed(&overlay_interfaces[i].transfer_limit);
if (next_allowed > now)
continue;
frame->interface = &overlay_interfaces[i];
frame->recvaddr = overlay_interfaces[i].broadcast_address;
// is this packet going our way?
if (dest==packet->destination){
destination_index=i;
break;
}
if (!keep){
// huh, we don't need to send it anywhere?
frame = overlay_queue_remove(queue, frame);
}else{
// skip this interface if the stream tx buffer has data
if (dest->interface->socket_type==SOCK_STREAM
&& dest->interface->tx_bytes_pending>0)
continue;
}
if (!frame->interface)
goto skip;
// can we send a packet on this interface now?
if (limit_is_allowed(&dest->transfer_limit))
continue;
// send a packet to this destination
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->packet_version, dest);
destination_index=i;
frame->destinations[i].sent_sequence = dest->sequence_number;
break;
}
}
}
if (!packet->buffer){
if (frame->interface->socket_type==SOCK_STREAM){
// skip this interface if the stream tx buffer has data
if (frame->interface->tx_bytes_pending>0)
goto skip;
}
// can we send a packet on this interface now?
if (limit_is_allowed(&frame->interface->transfer_limit))
goto skip;
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->packet_version, frame->interface, frame->recvaddr);
}else{
// is this packet going our way?
if (frame->interface!=packet->interface ||
frame->interface->encapsulation==ENCAP_SINGLE ||
frame->packet_version!=packet->packet_version ||
memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0){
goto skip;
}
if (frame->destination_count==0){
frame = overlay_queue_remove(queue, frame);
continue;
}
if (destination_index==-1)
goto skip;
if (frame->send_hook){
// last minute check if we really want to send this frame, or track when we sent it
if (frame->send_hook(frame, packet->seq, frame->send_context)){
@ -525,64 +426,42 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
}else if(((mdp_sequence - frame->mdp_sequence)&0xFFFF) >= 64){
// too late, we've sent too many packets for the next hop to correctly de-duplicate
if (config.debug.overlayframes)
DEBUGF("Retransmition of frame %p mdp seq %d, is too late to be de-duplicated", frame, frame->mdp_sequence);
DEBUGF("Retransmition of frame %p mdp seq %d, is too late to be de-duplicated",
frame, frame->mdp_sequence);
frame = overlay_queue_remove(queue, frame);
continue;
}else{
if (config.debug.overlayframes)
DEBUGF("Retransmitting frame %p mdp seq %d, from packet seq %d in seq %d", frame, frame->mdp_sequence, frame->interface_sent_sequence[packet->i], packet->seq);
}
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)){
if (overlay_frame_append_payload(&packet->context, packet->destination->encapsulation, frame, packet->buffer)){
// payload was not queued, delay the next attempt slightly
frame->dont_send_until = now + 5;
frame->delay_until = now + 5;
goto skip;
}
frame->interface_sent_sequence[packet->i] = packet->seq;
frame->interface_dont_send_until[packet->i] = now+200;
frame->destinations[destination_index].sent_sequence = frame->destinations[destination_index].destination->sequence_number;
frame->destinations[destination_index].delay_until = now+200;
frame->transmit_count++;
if (config.debug.overlayframes){
DEBUGF("Sent payload %p, %d type %x len %d for %s via %s, seq %d",
DEBUGF("Appended payload %p, %d type %x len %d for %s via %s",
frame, frame->mdp_sequence,
frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN),
frame->interface_sent_sequence[packet->i]);
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
}
if (frame->destination)
frame->destination->last_tx=now;
if (frame->next_hop)
frame->next_hop->last_tx=now;
// mark the payload as sent
if (frame->destination_resolved){
if (frame->resend>0 && frame->packet_version>=1 && frame->next_hop && packet->seq !=-1 && (!frame->unicast)){
frame->dont_send_until = now+200;
frame->destination_resolved = 0;
if (config.debug.overlayframes)
DEBUGF("Holding onto payload for ack/nack resend in %lldms", frame->dont_send_until - now);
goto skip;
}
}else{
if (frame->resend<=0 || frame->packet_version<1 || packet->seq==-1 || frame->unicast){
// dont retransmit if we aren't sending sequence numbers, or we've run out of allowed resends
frame->interface_sent_sequence[packet->i] = FRAME_DONT_SEND;
}
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state==INTERFACE_STATE_UP &&
link_state_interface_oldest_neighbour(&overlay_interfaces[i])>=0 &&
frame->interface_sent_sequence[i]!=FRAME_DONT_SEND){
goto skip;
}
// dont retransmit if we aren't sending sequence numbers, or we've been asked not to
if (frame->packet_version<1 || frame->resend<=0 || packet->seq==-1){
if (config.debug.overlayframes)
DEBUGF("Not waiting for retransmission (%d, %d, %d)", frame->packet_version, frame->resend, packet->seq);
remove_destination(frame, destination_index);
if (frame->destination_count==0){
frame = overlay_queue_remove(queue, frame);
continue;
}
}
frame = overlay_queue_remove(queue, frame);
continue;
// TODO recalc route on retransmittion??
skip:
// if we can't send the payload now, check when we should try next
@ -594,8 +473,9 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// fill a packet from our outgoing queues and send it
static int
overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
int i;
IN();
int i;
int ret=0;
// while we're looking at queues, work out when to schedule another packet
unschedule(&next_packet);
next_packet.alarm=0;
@ -611,16 +491,13 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
if (config.debug.packetconstruction)
ob_dump(packet->buffer,"assembled packet");
if (overlay_broadcast_ensemble(packet->interface, &packet->dest, ob_ptr(packet->buffer), ob_position(packet->buffer))){
// sendto failed. We probably don't have a valid route
if (packet->unicast_subscriber){
set_reachable(packet->unicast_subscriber, REACHABLE_NONE);
}
}
overlay_broadcast_ensemble(packet->destination, ob_ptr(packet->buffer), ob_position(packet->buffer));
ob_free(packet->buffer);
RETURN(1);
ret=1;
}
RETURN(0);
if (packet->destination)
release_destination_ref(packet->destination);
RETURN(ret);
OUT();
}
@ -632,61 +509,57 @@ static void overlay_send_packet(struct sched_ent *alarm){
overlay_fill_send_packet(&packet, gettime_ms());
}
int overlay_send_tick_packet(struct overlay_interface *interface){
int overlay_send_tick_packet(struct network_destination *destination){
struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet));
packet.seq=-1;
overlay_init_packet(&packet, NULL, 0, 0, interface, interface->broadcast_address);
overlay_init_packet(&packet, 0, destination);
overlay_fill_send_packet(&packet, gettime_ms());
return 0;
}
// de-queue all packets that have been sent to this subscriber & have arrived.
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq)
int overlay_queue_ack(struct subscriber *neighbour, struct network_destination *destination, uint32_t ack_mask, int ack_seq)
{
int interface_id = interface - overlay_interfaces;
int i;
int i, j;
time_ms_t now = gettime_ms();
for (i=0;i<OQ_MAX;i++){
struct overlay_frame *frame = overlay_tx[i].first;
while(frame){
int frame_seq = frame->interface_sent_sequence[interface_id];
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;
for (j=frame->destination_count -1;j>=0;j--)
if (frame->destinations[j].destination==destination)
break;
if (j>=0){
int frame_seq = frame->destinations[j].sent_sequence;
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;
if (acked){
frame->interface_sent_sequence[interface_id] = FRAME_DONT_SEND;
int discard = 1;
if (!frame->destination){
int j;
for(j=0;j<OVERLAY_MAX_INTERFACES;j++){
if (overlay_interfaces[j].state==INTERFACE_STATE_UP &&
frame->interface_sent_sequence[j]!=FRAME_DONT_SEND &&
link_state_interface_oldest_neighbour(&overlay_interfaces[j])>=0){
discard = 0;
break;
}
}
}
if (discard){
if (acked){
if (config.debug.overlayframes)
DEBUGF("Dequeing packet %p to %s sent by seq %d, due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame = overlay_queue_remove(&overlay_tx[i], frame);
continue;
DEBUGF("Packet %p to %s sent by seq %d, acked with seq %d",
frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
// drop packets that don't need to be retransmitted
if (frame->destination || frame->destination_count<=1){
frame = overlay_queue_remove(&overlay_tx[i], frame);
continue;
}
remove_destination(frame, j);
}else if (seq_delta < 128 && frame->destination && frame->delay_until>now){
// retransmit asap
if (config.debug.overlayframes)
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->delay_until = now;
overlay_calc_queue_time(&overlay_tx[i], frame);
}
}
if (seq_delta < 128 && frame->destination && frame->dont_send_until>now){
// resend immediately
if (config.debug.overlayframes)
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->dont_send_until = now;
overlay_calc_queue_time(&overlay_tx[i], frame);
}
}
frame = frame->next;
}
}

@ -262,7 +262,7 @@ error:
time_ms_t lookup_time=0;
int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long now)
int overlay_rhizome_saw_advertisements(int i, struct decode_context *context, struct overlay_frame *f, time_ms_t now)
{
IN();
if (!f)
@ -272,7 +272,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
RETURN(0);
int ad_frame_type=ob_get(f->payload);
struct sockaddr_in httpaddr = f->recvaddr;
struct sockaddr_in httpaddr = context->addr;
httpaddr.sin_port = htons(RHIZOME_HTTP_PORT);
int manifest_length;
rhizome_manifest *m=NULL;

@ -29,6 +29,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MSG_TYPE_REQ 1
#define CACHE_BARS 60
#define MAX_OLD_BARS 40
#define BARS_PER_RESPONSE ((int)400/RHIZOME_BAR_BYTES)
#define HEAD_FLAG INT64_MAX
@ -139,7 +140,8 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
if (state->sync_end < state->highest_seen){
rhizome_sync_request(subscriber, state->sync_end, 1);
}else if(state->sync_start >0){
rhizome_sync_request(subscriber, state->sync_start, 0);
if (state->bar_count < MAX_OLD_BARS)
rhizome_sync_request(subscriber, state->sync_start, 0);
}else if(!state->sync_complete){
state->sync_complete = 1;
if (config.debug.rhizome)
@ -278,6 +280,8 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
added=1;
}
for (i=mid_point -1; i>=0; i--){
if (state->bar_count >= MAX_OLD_BARS)
break;
if (sync_cache_bar(state, bars[i], bar_tokens[i]))
added=1;
}

@ -4,6 +4,7 @@
#include "overlay_packet.h"
#include "str.h"
#include "conf.h"
#include <assert.h>
/*
Link state routing;
@ -17,7 +18,7 @@ Link state routing;
*/
#define INCLUDE_ANYWAY (500)
#define INCLUDE_ANYWAY (200)
#define MAX_LINK_STATES 512
#define FLAG_HAS_INTERFACE (1<<0)
@ -35,7 +36,7 @@ struct link{
struct subscriber *transmitter;
struct link *parent;
struct overlay_interface *interface;
struct network_destination *destination;
struct subscriber *receiver;
// What's the last ack we've heard so we don't process nacks twice.
@ -56,22 +57,32 @@ struct link{
char calculating;
};
struct neighbour_link{
struct neighbour_link *_next;
// statistics of incoming half of network links
struct link_in{
struct link_in *_next;
// which of their interfaces are these stats for?
// which of our interfaces did we hear it on?
overlay_interface *interface;
// which of their interfaces did they send it from?
int neighbour_interface;
// which interface did we hear it on?
struct overlay_interface *interface;
// very simple time based link up/down detection;
// when will we consider the link broken?
time_ms_t link_timeout;
char unicast;
// unicast or broadcast?
int unicast;
int ack_sequence;
uint64_t ack_mask;
int ack_counter;
};
struct link_out{
struct link_out *_next;
time_ms_t timeout;
struct network_destination *destination;
};
struct neighbour{
@ -83,15 +94,20 @@ struct neighbour{
char path_version;
// when do we assume the link is dead because they stopped hearing us or vice versa?
time_ms_t neighbour_link_timeout;
time_ms_t link_in_timeout;
// if a neighbour is telling the world that they are using us as a next hop, we need to send acks & nacks with high priority
// otherwise we don't care too much about packet loss.
char using_us;
// is this neighbour still sending selfacks?
char legacy_protocol;
// when a neighbour is using us as a next hop *and* they are using us to send packets to one of our neighbours,
// we must forward their broadcasts
time_ms_t routing_through_us;
// which of their mdp packets have we already heard and can be dropped as duplicates?
int mdp_ack_sequence;
uint64_t mdp_ack_mask;
@ -100,16 +116,15 @@ struct neighbour{
time_ms_t last_update;
int last_update_seq;
time_ms_t rtt;
int ack_counter;
// un-balanced tree of known link states
struct link *root;
// list of incoming link stats
struct neighbour_link *links, *best_link;
// is this neighbour still using selfacks?
char legacy_protocol;
struct link_in *links, *best_link;
// list of outgoing links
struct link_out *out_links;
};
// one struct per subscriber, where we track all routing information, allocated on first use
@ -120,7 +135,7 @@ struct link_state{
int hop_count;
int route_version;
// if a neighbour is free'd this link will point to invalid memory.
// do not trust this pointer unless you have just called find_best_link
// don't use this pointer directly, call find_best_link instead
struct link *link;
char calculating;
@ -141,6 +156,71 @@ static struct sched_ent link_send_alarm={
struct neighbour *neighbours=NULL;
int route_version=0;
struct network_destination * new_destination(struct overlay_interface *interface, char encapsulation){
assert(interface);
struct network_destination *ret = emalloc_zero(sizeof(struct network_destination));
if (ret){
ret->_ref_count=1;
ret->encapsulation = encapsulation;
ret->interface = interface;
// DEBUGF("Create ref %p, %d - %s", ret, ret->_ref_count, ret->interface->name);
}
return ret;
}
struct network_destination * create_unicast_destination(struct sockaddr_in addr, struct overlay_interface *interface){
if (!interface)
interface = overlay_interface_find(addr.sin_addr, 1);
if (!interface){
WHY("I don't know which interface to use");
return NULL;
}
if (interface->state!=INTERFACE_STATE_UP){
WHY("The interface is down.");
return NULL;
}
if (addr.sin_addr.s_addr==0 || addr.sin_port==0){
// WHY("Invalid unicast address");
return NULL;
}
struct network_destination *ret = new_destination(interface, ENCAP_OVERLAY);
if (ret){
ret->address = addr;
ret->unicast = 1;
ret->tick_ms = interface->destination->tick_ms;
ret->sequence_number = -1;
}
return ret;
}
struct network_destination * add_destination_ref(struct network_destination *ref){
ref->_ref_count++;
// DEBUGF("Add ref %p, %d - %s", ref, ref->_ref_count, ref->interface->name);
return ref;
}
void release_destination_ref(struct network_destination *ref){
if (ref->_ref_count<=1){
// DEBUGF("Free ref %p, %d - %s", ref, ref->_ref_count, ref->interface->name);
free(ref);
}else{
ref->_ref_count--;
// DEBUGF("Drop ref %p, %d - %s", ref, ref->_ref_count, ref->interface->name);
}
}
int set_destination_ref(struct network_destination **ptr, struct network_destination *ref){
if (ref==*ptr)
return 0;
if (ref)
add_destination_ref(ref);
if (*ptr)
release_destination_ref(*ptr);
*ptr = ref;
return 1;
}
static int NumberOfSetBits(uint32_t i)
{
i = i - ((i >> 1) & 0x55555555);
@ -188,6 +268,8 @@ static void free_links(struct link *link)
link->_left=NULL;
free_links(link->_right);
link->_right=NULL;
if (link->destination)
release_destination_ref(link->destination);
free(link);
}
@ -272,22 +354,23 @@ static void update_path_score(struct neighbour *neighbour, struct link *link){
link->calculating = 0;
}
static int find_best_link(struct subscriber *subscriber)
// pick the best path to this network destination
static struct link * find_best_link(struct subscriber *subscriber)
{
IN();
if (subscriber->reachable==REACHABLE_SELF)
RETURN(0);
RETURN(NULL);
struct link_state *state = get_link_state(subscriber);
if (state->route_version == route_version)
RETURN(0);
RETURN(state->link);
if (state->calculating)
RETURN(-1);
RETURN(NULL);
state->calculating = 1;
struct neighbour *neighbour = neighbours;
struct overlay_interface *interface = NULL;
struct network_destination *destination = NULL;
int best_hop_count = 99;
int best_drop_rate = 99;
struct link *best_link = NULL;
@ -295,7 +378,7 @@ static int find_best_link(struct subscriber *subscriber)
time_ms_t now = gettime_ms();
while (neighbour){
if (neighbour->neighbour_link_timeout < now)
if (neighbour->link_in_timeout < now)
goto next;
struct link *link = find_link(neighbour, subscriber, 0);
@ -318,7 +401,7 @@ static int find_best_link(struct subscriber *subscriber)
best_hop_count = link->hop_count;
best_drop_rate = link->path_drop_rate;
transmitter = link->transmitter;
interface = link->interface;
destination = link->destination;
best_link = link;
}
}
@ -328,9 +411,7 @@ next:
}
int changed =0;
if (state->next_hop != next_hop || state->transmitter != transmitter || state->link != best_link)
changed = 1;
if (next_hop == subscriber && (interface != subscriber->interface))
if (state->transmitter != transmitter || state->link != best_link)
changed = 1;
state->next_hop = next_hop;
@ -339,46 +420,18 @@ next:
state->route_version = route_version;
state->calculating = 0;
state->link = best_link;
int reachable = subscriber->reachable;
if (next_hop == NULL){
if ((subscriber->reachable&REACHABLE_DIRECT) != REACHABLE_UNICAST)
reachable = REACHABLE_NONE;
} else if (next_hop == subscriber){
// reset the state of any unicast probe's if the interface has changed
if (subscriber->interface != interface){
reachable = 0;
subscriber->last_probe=0;
bzero(&subscriber->address, sizeof subscriber->address);
}
reachable = REACHABLE_BROADCAST | (reachable & REACHABLE_UNICAST);
if (next_hop == subscriber)
next_hop = NULL;
subscriber->interface = interface;
} else {
reachable = REACHABLE_INDIRECT;
}
subscriber->next_hop = next_hop;
set_reachable(subscriber, reachable);
if (set_reachable(subscriber, destination, next_hop))
changed = 1;
if (changed){
if (config.debug.linkstate){
if (reachable & REACHABLE_DIRECT){
DEBUGF("LINK STATE; neighbour %s is reachable on interface %s",
alloca_tohex_sid(subscriber->sid),
interface->name);
} else {
DEBUGF("LINK STATE; next hop for %s is now %d hops, %s via %s",
alloca_tohex_sid(subscriber->sid),
best_hop_count,
next_hop?alloca_tohex_sid(next_hop->sid):"UNREACHABLE",
transmitter?alloca_tohex_sid(transmitter->sid):"NONE");
}
}
monitor_announce_link(best_hop_count, transmitter, subscriber);
state->next_update = now+5;
}
RETURN(0);
RETURN(best_link);
}
static int monitor_announce(struct subscriber *subscriber, void *context){
@ -463,25 +516,28 @@ static int append_link(struct subscriber *subscriber, void *context)
time_ms_t now = gettime_ms();
if (find_best_link(subscriber))
return 0;
if (state->next_update - INCLUDE_ANYWAY <= now){
if (subscriber->reachable==REACHABLE_SELF){
if (subscriber->reachable==REACHABLE_SELF){
if (state->next_update - INCLUDE_ANYWAY <= now){
// Other entries in our keyring are always one hop away from us.
if (append_link_state(payload, 0, my_subscriber, subscriber, -1, 1, -1, 0, 0)){
link_send_alarm.alarm = now+5;
return 1;
}
} else {
struct link *link = state->link;
if (append_link_state(payload, 0, state->transmitter, subscriber, -1, link?link->link_version:-1, -1, 0, link?link->drop_rate:32)){
// include information about this link every 5s
state->next_update = now + 5000;
}
} else {
struct link *best_link = find_best_link(subscriber);
if (state->next_update - INCLUDE_ANYWAY <= now){
if (append_link_state(payload, 0, state->transmitter, subscriber, -1,
best_link?best_link->link_version:-1, -1, 0, best_link?best_link->drop_rate:32)){
link_send_alarm.alarm = now+5;
return 1;
}
// include information about this link every 5s
state->next_update = now + 5000;
}
// include information about this link every 5s
state->next_update = now + 5000;
}
if (state->next_update < link_send_alarm.alarm)
@ -495,13 +551,21 @@ static void free_neighbour(struct neighbour **neighbour_ptr){
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; all links from neighbour %s have died", alloca_tohex_sid(n->subscriber->sid));
struct neighbour_link *link = n->links;
struct link_in *link = n->links;
while(link){
struct neighbour_link *l=link;
struct link_in *l=link;
link = l->_next;
free(l);
}
struct link_out *out = n->out_links;
while (out){
struct link_out *l=out;
out = l->_next;
release_destination_ref(out->destination);
free(l);
}
free_links(n->root);
n->root=NULL;
*neighbour_ptr = n->_next;
@ -513,11 +577,12 @@ static void clean_neighbours(time_ms_t now)
struct neighbour **n_ptr = &neighbours;
while (*n_ptr){
struct neighbour *n = *n_ptr;
struct neighbour_link **list = &n->links;
struct link_in **list = &n->links;
while(*list){
struct neighbour_link *link = *list;
struct link_in *link = *list;
if (link->interface->state!=INTERFACE_STATE_UP || link->link_timeout < now){
if (config.debug.linkstate && config.debug.verbose)
if (config.debug.linkstate)
DEBUGF("LINK STATE; link expired from neighbour %s on interface %s",
alloca_tohex_sid(n->subscriber->sid),
link->interface->name);
@ -527,11 +592,32 @@ static void clean_neighbours(time_ms_t now)
list = &link->_next;
}
}
struct link_out **out = &n->out_links;
while(*out){
struct link_out *link = *out;
if (link->destination->interface->state!=INTERFACE_STATE_UP || link->timeout < now){
*out = link->_next;
if (config.debug.linkstate)
DEBUGF("LINK STATE; %s link_out expired for neighbour %s on interface %s",
link->destination->unicast?"unicast":"broadcast",
alloca_tohex_sid(n->subscriber->sid),
link->destination->interface->name);
release_destination_ref(link->destination);
free(link);
}else{
out = &link->_next;
}
}
// when all links to a neighbour that we are routing through expire, force a routing calculation update
struct link_state *state = get_link_state(n->subscriber);
if (state->next_hop == n->subscriber && (n->neighbour_link_timeout < now || !n->links) && state->route_version == route_version)
if (state->next_hop == n->subscriber &&
(n->link_in_timeout < now || !n->links || !n->out_links) &&
state->route_version == route_version)
route_version++;
if (!n->links){
if (!n->links || !n->out_links){
free_neighbour(n_ptr);
}else{
n_ptr = &n->_next;
@ -539,7 +625,7 @@ static void clean_neighbours(time_ms_t now)
}
}
static int send_legacy_self_announce_ack(struct neighbour *neighbour, struct neighbour_link *link, time_ms_t now){
static int send_legacy_self_announce_ack(struct neighbour *neighbour, struct link_in *link, time_ms_t now){
struct overlay_frame *frame=emalloc_zero(sizeof(struct overlay_frame));
frame->type = OF_TYPE_SELFANNOUNCE_ACK;
frame->ttl = 6;
@ -560,13 +646,21 @@ static int send_legacy_self_announce_ack(struct neighbour *neighbour, struct nei
static int neighbour_find_best_link(struct neighbour *n)
{
// TODO compare other link stats to find the best...
struct neighbour_link *best_link=n->links;
struct link_in *best_link=n->links;
if (best_link){
struct neighbour_link *link=best_link->_next;
struct link_in *link=best_link->_next;
while(link){
if (link->interface != best_link->interface &&
overlay_interface_compare(best_link->interface, link->interface))
best_link = link;
// find the link with the best interface
switch(overlay_interface_compare(best_link->interface, link->interface)){
case -1:
break;
case 0:
if (link->unicast < best_link->unicast)
break;
// fall through
case 1:
best_link = link;
}
link = link->_next;
}
}
@ -574,10 +668,17 @@ static int neighbour_find_best_link(struct neighbour *n)
if (n->best_link != best_link){
n->best_link = best_link;
n->next_neighbour_update = gettime_ms()+5;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; best link from neighbour %s is now on interface %s",
alloca_tohex_sid(n->subscriber->sid),
best_link?best_link->interface->name:"NONE");
if (config.debug.linkstate){
if (best_link){
DEBUGF("LINK STATE; best link from neighbour %s is %s on interface %s",
alloca_tohex_sid(n->subscriber->sid),
best_link->unicast?"unicast":"broadcast",
best_link->interface->name);
}else{
DEBUGF("LINK STATE; no best link from neighbour %s",
alloca_tohex_sid(n->subscriber->sid));
}
}
}
return 0;
@ -615,13 +716,27 @@ static int send_neighbour_link(struct neighbour *n)
frame->payload = ob_new();
frame->send_hook = neighbour_link_sent;
frame->send_context = n->subscriber;
frame->resend=-1;
if (n->subscriber->reachable & REACHABLE_DIRECT && (!(n->subscriber->reachable&REACHABLE_ASSUMED))){
frame->destination_resolved = 1;
frame->interface = n->subscriber->interface;
frame->recvaddr = frame->interface->broadcast_address;
frame->resend=-1;
if (n->subscriber->reachable & REACHABLE_INDIRECT){
frame->destination = n->subscriber;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Sending link state ack indirectly");
}else if (n->subscriber->reachable & REACHABLE){
// we don't wont to address the next hop to this neighbour as that will force another reply ack
// but we still want them to receive it.
frame->destinations[frame->destination_count++].destination = add_destination_ref(n->subscriber->destination);
}else{
// no routing decision yet? send this packet to all probable destinations.
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Sending link state ack to all possibilities");
struct link_out *out = n->out_links;
while(out){
frame->destinations[frame->destination_count++].destination = add_destination_ref(out->destination);
out = out->_next;
}
}
ob_limitsize(frame->payload, 400);
overlay_mdp_encode_ports(frame->payload, MDP_PORT_LINKSTATE, MDP_PORT_LINKSTATE);
@ -639,10 +754,12 @@ static int send_neighbour_link(struct neighbour *n)
if (overlay_payload_enqueue(frame))
op_free(frame);
n->best_link->ack_counter = ACK_WINDOW;
n->last_update = now;
}
n->next_neighbour_update = n->last_update + n->best_link->interface->tick_ms;
n->ack_counter = ACK_WINDOW;
n->next_neighbour_update = n->last_update + n->best_link->interface->destination->tick_ms;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Next update for %s in %lldms", alloca_tohex_sid(n->subscriber->sid), n->next_neighbour_update - gettime_ms());
OUT();
return 0;
}
@ -663,6 +780,17 @@ static int link_send_neighbours()
if (n->next_neighbour_update < link_send_alarm.alarm)
link_send_alarm.alarm = n->next_neighbour_update;
struct link_out *out = n->out_links;
while(out){
if (out->destination->tick_ms>0 && out->destination->unicast){
if (out->destination->last_tx + out->destination->tick_ms < now)
overlay_send_tick_packet(out->destination);
if (out->destination->last_tx + out->destination->tick_ms < link_send_alarm.alarm)
link_send_alarm.alarm = out->destination->last_tx + out->destination->tick_ms;
}
out=out->_next;
}
n = n->_next;
}
return 0;
@ -715,23 +843,32 @@ static void update_alarm(time_ms_t limit){
}
}
struct neighbour_link * get_neighbour_link(struct neighbour *neighbour, struct overlay_interface *interface, int sender_interface, char unicast)
struct link_in * get_neighbour_link(struct neighbour *neighbour, struct overlay_interface *interface, int sender_interface, int unicast)
{
struct neighbour_link *link = neighbour->links;
struct link_in *link = neighbour->links;
if (unicast){
if (interface->prefer_unicast)
unicast=1;
else
unicast=-1;
}
while(link){
if (link->interface == interface && link->neighbour_interface == sender_interface && link->unicast == unicast)
if (link->interface == interface
&& link->neighbour_interface == sender_interface
&& link->unicast == unicast)
return link;
link=link->_next;
}
link = emalloc_zero(sizeof(struct neighbour_link));
link = emalloc_zero(sizeof(struct link_in));
link->interface = interface;
link->neighbour_interface = sender_interface;
link->unicast = unicast;
link->neighbour_interface = sender_interface;
link->ack_sequence = -1;
link->ack_mask = 0;
link->_next = neighbour->links;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; new possible link from neighbour %s on interface %s/%d",
if (config.debug.linkstate)
DEBUGF("LINK STATE; new possible %s link from neighbour %s on interface %s/%d",
unicast?"unicast":"broadcast",
alloca_tohex_sid(neighbour->subscriber->sid),
interface->name,
sender_interface);
@ -739,19 +876,67 @@ struct neighbour_link * get_neighbour_link(struct neighbour *neighbour, struct o
return link;
}
int link_state_interface_oldest_neighbour(struct overlay_interface *interface)
int link_add_destinations(struct overlay_frame *frame)
{
struct neighbour *neighbour = neighbours;
int packet_version =-1;
while(neighbour){
if (neighbour->best_link && neighbour->best_link->interface == interface &&
(neighbour->subscriber->max_packet_version < packet_version || packet_version == -1)){
packet_version = neighbour->subscriber->max_packet_version;
if (frame->destination){
frame->next_hop = frame->destination;
// if the destination is unreachable, but we have a reachable directory service
// forward it through the directory service
if (frame->next_hop->reachable==REACHABLE_NONE
&& directory_service
&& frame->next_hop!=directory_service
&& directory_service->reachable&REACHABLE)
frame->next_hop = directory_service;
if (frame->next_hop->reachable==REACHABLE_NONE){
// if the destination is a neighbour, add all probable destinations
struct neighbour *n = get_neighbour(frame->destination, 0);
if (n){
struct link_out *out = n->out_links;
while(out){
if (frame->destination_count < MAX_PACKET_DESTINATIONS)
frame->destinations[frame->destination_count++].destination = add_destination_ref(out->destination);
out = out->_next;
}
}
}
if ((frame->next_hop->reachable&REACHABLE)==REACHABLE_INDIRECT)
frame->next_hop = frame->next_hop->next_hop;
if (frame->next_hop->reachable&REACHABLE_DIRECT){
if (frame->destination_count < MAX_PACKET_DESTINATIONS)
frame->destinations[frame->destination_count++].destination=add_destination_ref(frame->next_hop->destination);
}
}else{
char added_interface[OVERLAY_MAX_INTERFACES];
bzero(added_interface, sizeof(added_interface));
struct neighbour *neighbour = neighbours;
for(;neighbour;neighbour = neighbour->_next){
if (neighbour->subscriber->reachable&REACHABLE_DIRECT){
struct network_destination *dest = neighbour->subscriber->destination;
// TODO set packet version per destination
if (frame->packet_version > neighbour->subscriber->max_packet_version)
frame->packet_version = neighbour->subscriber->max_packet_version;
if (!dest->unicast){
if (!dest->interface->send_broadcasts)
continue;
// make sure we only add broadcast interfaces once
int id = dest->interface - overlay_interfaces;
if (added_interface[id]){
continue;
}
}
if (frame->destination_count < MAX_PACKET_DESTINATIONS)
frame->destinations[frame->destination_count++].destination=add_destination_ref(dest);
}
}
neighbour = neighbour->_next;
}
return packet_version;
return 0;
}
// do we need to forward any broadcast packets transmitted by this neighbour?
@ -815,21 +1000,72 @@ int link_received_duplicate(struct subscriber *subscriber, struct overlay_interf
return 0;
}
// track stats for receiving packets from this neighbour
int link_received_packet(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int sender_seq, int unicast)
// remote peer has confirmed hearing a recent unicast packet
int link_unicast_ack(struct subscriber *subscriber, struct overlay_interface *interface, struct sockaddr_in addr)
{
// TODO better handling of unicast routes
if (unicast){
if (config.debug.verbose && config.debug.linkstate)
DEBUG("LINK STATE; Ignoring unicast packet");
return 0;
}
// TODO find / create network destination, keep it alive
return 0;
}
struct neighbour *neighbour = get_neighbour(subscriber, 1);
struct neighbour_link *link=get_neighbour_link(neighbour, interface, sender_interface, unicast);
static struct link_out *create_out_link(struct neighbour *neighbour, overlay_interface *interface, struct sockaddr_in addr, char unicast){
struct link_out *ret=emalloc_zero(sizeof(struct link_out));
if (ret){
ret->_next=neighbour->out_links;
neighbour->out_links=ret;
if (unicast)
ret->destination = create_unicast_destination(addr, interface);
else
ret->destination = add_destination_ref(interface->destination);
if (config.debug.linkstate)
DEBUGF("LINK STATE; Create possible %s link_out for neighbour %s on interface %s",
unicast?"unicast":"broadcast",
alloca_tohex_sid(neighbour->subscriber->sid),
interface->name);
ret->timeout = gettime_ms()+ret->destination->tick_ms*3;
update_alarm(gettime_ms()+5);
}
return ret;
}
static struct link_out *find_out_link(struct neighbour *neighbour, overlay_interface *interface, char unicast){
struct link_out *ret = neighbour->out_links;
while(ret){
if (ret->destination->interface==interface
&& ret->destination->unicast==unicast)
return ret;
ret=ret->_next;
}
return NULL;
}
// track stats for receiving packets from this neighbour
int link_received_packet(struct decode_context *context, int sender_seq, char unicast)
{
if (!context->sender)
return 0;
struct neighbour *neighbour = get_neighbour(context->sender, 1);
// get stats about incoming packets
struct link_in *link=get_neighbour_link(neighbour, context->interface, context->sender_interface, unicast);
time_ms_t now = gettime_ms();
neighbour->ack_counter --;
if (!neighbour->out_links){
// if this packet arrived in an IPv4 packet, assume we need to send them unicast packets
if (context->addr.sin_family==AF_INET && context->addr.sin_port!=0 && context->addr.sin_addr.s_addr!=0)
create_out_link(neighbour, context->interface, context->addr, 1);
// if this packet arrived from the same IPv4 subnet, or a different type of network, assume they can hear our broadcasts
if (context->addr.sin_family!=AF_INET ||
(context->addr.sin_addr.s_addr & context->interface->netmask.s_addr)
== (context->interface->address.sin_addr.s_addr & context->interface->netmask.s_addr))
create_out_link(neighbour, context->interface, context->addr, 0);
}
link->ack_counter --;
// for now we'll use a simple time based link up/down flag + dropped packet count
if (sender_seq >=0){
@ -838,7 +1074,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
if (offset < 64){
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; late seq %d from %s on %s",
sender_seq, alloca_tohex_sid(subscriber->sid), interface->name);
sender_seq, alloca_tohex_sid(context->sender->sid), context->interface->name);
link->ack_mask |= (1<<offset);
}else{
link->ack_mask = (link->ack_mask << 1) | 1;
@ -849,15 +1085,15 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
// missed a packet? send a link state soon
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; missed seq %d from %s on %s",
link->ack_sequence, alloca_tohex_sid(subscriber->sid), interface->name);
link->ack_sequence, alloca_tohex_sid(context->sender->sid), context->interface->name);
link->ack_mask = link->ack_mask << 1;
neighbour->ack_counter --;
link->ack_counter --;
// if we need to nack promptly
if (neighbour->using_us){
if (neighbour->using_us && link==neighbour->best_link){
neighbour->next_neighbour_update = now + 10;
if (neighbour->ack_counter <=0){
if (link->ack_counter <=0){
neighbour_find_best_link(neighbour);
send_neighbour_link(neighbour);
}
@ -873,10 +1109,10 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
if (neighbour->next_neighbour_update > now + 10);
neighbour->next_neighbour_update = now + 10;
}
link->link_timeout = now + (interface->tick_ms *5);
link->link_timeout = now + (context->interface->destination->tick_ms *5);
// force an update soon when we need to promptly ack packets
if (neighbour->using_us > now && neighbour->ack_counter <=0){
if (neighbour->using_us > now && link == neighbour->best_link && link->ack_counter <=0){
neighbour_find_best_link(neighbour);
send_neighbour_link(neighbour);
}
@ -886,7 +1122,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
}
// parse incoming link details
int link_receive(overlay_mdp_frame *mdp)
int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
{
IN();
struct overlay_buffer *payload = ob_static(mdp->out.payload, mdp->out.payload_length);
@ -897,6 +1133,7 @@ int link_receive(overlay_mdp_frame *mdp)
struct decode_context context;
bzero(&context, sizeof(context));
context.interface = frame->interface;
time_ms_t now = gettime_ms();
char changed = 0;
@ -959,7 +1196,8 @@ int link_receive(overlay_mdp_frame *mdp)
continue;
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; record - %s, %s, %d, %d, %x, %d",
DEBUGF("LINK STATE; record - %d, %s, %s, %d, %d, %x, %d",
flags,
receiver?alloca_tohex_sid(receiver->sid):"NULL",
transmitter?alloca_tohex_sid(transmitter->sid):"NULL",
interface_id,
@ -976,9 +1214,10 @@ int link_receive(overlay_mdp_frame *mdp)
continue;
}
struct network_destination *destination=NULL;
if (receiver == sender){
// ignore other incoming links to our neighbour
// TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding?
if (transmitter!=my_subscriber || interface_id==-1)
continue;
@ -987,6 +1226,15 @@ int link_receive(overlay_mdp_frame *mdp)
if (interface->state != INTERFACE_STATE_UP)
continue;
struct link_out *out = find_out_link(neighbour, interface, flags&FLAG_UNICAST?1:0);
if (!out)
continue;
// start sending sequence numbers when our neighbour has acked a packet
if (out->destination->sequence_number<0)
out->destination->sequence_number=0;
out->timeout=now + out->destination->tick_ms * 5;
destination = out->destination;
}else if(transmitter == my_subscriber){
// if our neighbour starts using us to reach this receiver, we have to treat the link in our routing table as if it just died.
transmitter = NULL;
@ -1001,25 +1249,31 @@ int link_receive(overlay_mdp_frame *mdp)
if (!link)
continue;
if (transmitter == my_subscriber && receiver == sender && interface_id != -1){
// TODO get matching neighbour link and combine scores
if (transmitter == my_subscriber && receiver == sender && interface_id != -1 && destination){
// they can hear us? we can route through them!
version = link->link_version;
if (neighbour->neighbour_link_timeout < now || version<0){
// which network destination can they hear us from?
if (set_destination_ref(&link->destination, destination)){
changed = 1;
version++;
}
neighbour->neighbour_link_timeout = now + interface->tick_ms * 5;
if (neighbour->link_in_timeout < now || version<0){
changed = 1;
version++;
}
neighbour->link_in_timeout = now + interface->destination->tick_ms * 5;
if (drop_rate != link->drop_rate || transmitter != link->transmitter)
version++;
// process acks / nacks
if (ack_seq!=-1){
overlay_queue_ack(sender, interface, ack_mask, ack_seq);
// TODO unicast
overlay_queue_ack(sender, interface->destination, ack_mask, ack_seq);
// did they miss our last ack?
if (neighbour->last_update_seq!=-1){
@ -1037,13 +1291,14 @@ int link_receive(overlay_mdp_frame *mdp)
}
link->last_ack_seq = ack_seq;
}else{
set_destination_ref(&link->destination, NULL);
}
if (link->transmitter != transmitter || link->link_version != version){
changed = 1;
link->transmitter = transmitter;
link->link_version = version & 0xFF;
link->interface = interface;
link->drop_rate = drop_rate;
// TODO other link attributes...
}
@ -1093,6 +1348,7 @@ int link_state_legacy_ack(struct overlay_frame *frame, time_ms_t now)
ob_get_ui32(frame->payload);
ob_get_ui32(frame->payload);
int iface=ob_get(frame->payload);
overlay_interface *interface = &overlay_interfaces[iface];
// record that we have a possible link to this neighbour
struct neighbour *neighbour = get_neighbour(frame->source, 1);
@ -1104,24 +1360,24 @@ int link_state_legacy_ack(struct overlay_frame *frame, time_ms_t now)
if (config.debug.linkstate)
DEBUGF("LINK STATE; new legacy neighbour %s", alloca_tohex_sid(frame->source->sid));
}
if (neighbour->neighbour_link_timeout < now)
if (neighbour->link_in_timeout < now)
changed = 1;
if (link->transmitter != my_subscriber)
changed = 1;
link->transmitter = my_subscriber;
link->link_version = 1;
link->interface = &overlay_interfaces[iface];
link->destination = interface->destination;
// give this link a high cost, we aren't going to route through it anyway...
link->drop_rate = 32;
// track the incoming link so we remember to send broadcasts
struct neighbour_link *nl = get_neighbour_link(neighbour, frame->interface, iface, 0);
nl->link_timeout = now + (frame->interface->tick_ms *5);
struct link_in *nl = get_neighbour_link(neighbour, frame->interface, iface, 0);
nl->link_timeout = now + (link->destination->tick_ms *5);
neighbour->legacy_protocol = 1;
neighbour->neighbour_link_timeout = now + link->interface->tick_ms * 5;
neighbour->link_in_timeout = now + link->destination->tick_ms * 5;
if (changed){
route_version++;

144
serval.h

@ -381,36 +381,36 @@ struct slip_decode_state{
int dst_offset;
};
typedef struct overlay_interface {
struct sched_ent alarm;
char name[256];
int recv_offset; /* file offset */
unsigned char txbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
int tx_bytes_pending;
struct slip_decode_state slip_decode_state;
struct overlay_interface;
// copy of ifconfig flags
uint16_t drop_broadcasts;
char drop_unicasts;
int port;
int type;
int socket_type;
int encapsulation;
char send_broadcasts;
char prefer_unicast;
// can we use this interface for routes to addresses in other subnets?
int default_route;
// should we log more debug info on this interace? eg hex dumps of packets
char debug;
char local_echo;
// can we assume there will only be two devices on this interface?
char point_to_point;
struct subscriber *other_device;
// where should packets be sent to?
struct network_destination {
int _ref_count;
// which interface are we actually sending packets out of
struct overlay_interface *interface;
// The IPv4 destination address, this may be the interface broadcast address.
struct sockaddr_in address;
// should outgoing packets be marked as unicast?
char unicast;
char packet_version;
// should we aggregate packets, or send one at a time
char encapsulation;
// time last packet was sent
time_ms_t last_tx;
// sequence number of last packet sent to this destination.
// Used to allow NACKs that can request retransmission of recent packets.
int sequence_number;
// rate limit for outgoing packets
struct limit_state transfer_limit;
/* Number of milli-seconds per tick for this interface, which is basically related to the
the typical TX range divided by the maximum expected speed of nodes in the network.
This means that short-range communications has a higher bandwidth requirement than
@ -423,34 +423,56 @@ typedef struct overlay_interface {
These figures will be refined over time, and we will allow people to set them per-interface.
*/
unsigned tick_ms; /* milliseconds per tick */
unsigned int uartbps; // set serial port speed (which might be different from link speed)
int ctsrts; // enabled hardware flow control if non-zero
};
// time last packet was sent on this interface
time_ms_t last_tx;
struct network_destination * new_destination(struct overlay_interface *interface, char encapsulation);
struct network_destination * create_unicast_destination(struct sockaddr_in addr, struct overlay_interface *interface);
struct network_destination * add_destination_ref(struct network_destination *ref);
void release_destination_ref(struct network_destination *ref);
int set_destination_ref(struct network_destination **ptr, struct network_destination *ref);
struct subscriber *next_advert;
typedef struct overlay_interface {
struct sched_ent alarm;
/* sequence number of last packet sent on this interface.
Used to allow NACKs that can request retransmission of recent packets.
*/
int sequence_number;
/* XXX need recent packet buffers to support the above */
char name[256];
struct limit_state transfer_limit;
int recv_offset; /* file offset */
unsigned char txbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
int tx_bytes_pending;
/* We need to make sure that interface name and broadcast address is unique for all interfaces that are UP.
We bind a separate socket per interface / broadcast address Broadcast address and netmask, if known
We really only case about distinct broadcast addresses on interfaces.
Also simplifies aliases on interfaces. */
struct sockaddr_in address;
struct sockaddr_in broadcast_address;
struct in_addr netmask;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX on this interface.
struct slip_decode_state slip_decode_state;
// copy of ifconfig flags
uint16_t drop_packets;
char drop_broadcasts;
char drop_unicasts;
int port;
int type;
int socket_type;
char send_broadcasts;
char prefer_unicast;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX.
For radio links the actual maximum and the maximum that is likely to be delivered reliably are
potentially two quite different values. */
int mtu;
// can we use this interface for routes to addresses in other subnets?
int default_route;
// should we log more debug info on this interace? eg hex dumps of packets
char debug;
char local_echo;
unsigned int uartbps; // set serial port speed (which might be different from link speed)
int ctsrts; // enabled hardware flow control if non-zero
struct network_destination *destination;
// can we assume that we will only receive packets from one device?
char point_to_point;
struct subscriber *other_device;
// the actual address of the interface.
struct sockaddr_in address;
struct in_addr netmask;
/* Use one of the INTERFACE_STATE_* constants to indicate the state of this interface.
If the interface stops working or disappears, it will be marked as DOWN and the socket closed.
@ -518,11 +540,10 @@ int overlay_frame_resolve_addresses(struct overlay_frame *f);
time_ms_t overlay_time_until_next_tick();
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
int overlay_frame_append_payload(struct decode_context *context, int encapsulation,
struct overlay_frame *p, struct overlay_buffer *b);
int overlay_packet_init_header(int packet_version, int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, int seq);
int overlay_interface_args(const char *arg);
void overlay_rhizome_advertise(struct sched_ent *alarm);
@ -541,10 +562,10 @@ int overlayServerMode();
int overlay_payload_enqueue(struct overlay_frame *p);
int overlay_queue_remaining(int queue);
int overlay_queue_schedule_next(time_ms_t next_allowed_packet);
int overlay_send_tick_packet(struct overlay_interface *interface);
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq);
int overlay_send_tick_packet(struct network_destination *destination);
int overlay_queue_ack(struct subscriber *neighbour, struct network_destination *destination, uint32_t ack_mask, int ack_seq);
int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, time_ms_t now);
int overlay_rhizome_saw_advertisements(int i, struct decode_context *context, struct overlay_frame *f, time_ms_t now);
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int rhizome_saw_voice_traffic();
int overlay_saw_mdp_containing_frame(struct overlay_frame *f, time_ms_t now);
@ -657,10 +678,8 @@ overlay_interface * overlay_interface_get_default();
overlay_interface * overlay_interface_find(struct in_addr addr, int return_default);
overlay_interface * overlay_interface_find_name(const char *name);
int overlay_interface_compare(overlay_interface *one, overlay_interface *two);
int
overlay_broadcast_ensemble(overlay_interface *interface,
struct sockaddr_in *recipientaddr,
unsigned char *bytes,int len);
int overlay_broadcast_ensemble(struct network_destination *destination,
unsigned char *bytes,int len);
int directory_registration();
int directory_service_init();
@ -758,7 +777,7 @@ void server_config_reload(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm);
void overlay_mdp_poll(struct sched_ent *alarm);
int overlay_mdp_try_interal_services(struct overlay_frame *frame, overlay_mdp_frame *mdp);
int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay_interface *interface, int queue);
int overlay_send_probe(struct subscriber *peer, struct network_destination *destination, int queue);
int overlay_send_stun_request(struct subscriber *server, struct subscriber *request);
void fd_periodicstats(struct sched_ent *alarm);
void rhizome_check_connections(struct sched_ent *alarm);
@ -774,7 +793,7 @@ void rhizome_server_poll(struct sched_ent *alarm);
int overlay_mdp_service_stun_req(overlay_mdp_frame *mdp);
int overlay_mdp_service_stun(overlay_mdp_frame *mdp);
int overlay_mdp_service_probe(overlay_mdp_frame *mdp);
int overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp);
time_ms_t limit_next_allowed(struct limit_state *state);
int limit_is_allowed(struct limit_state *state);
@ -826,15 +845,16 @@ extern char crash_handler_clue[1024];
int link_received_duplicate(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int previous_seq, int unicast);
int link_received_packet(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int sender_seq, int unicode);
int link_receive(overlay_mdp_frame *mdp);
int link_received_packet(struct decode_context *context, int sender_seq, char unicast);
int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp);
void link_explained(struct subscriber *subscriber);
void link_interface_down(struct overlay_interface *interface);
int link_state_announce_links();
int link_state_legacy_ack(struct overlay_frame *frame, time_ms_t now);
int link_state_interface_oldest_neighbour(struct overlay_interface *interface);
int link_state_ack_soon(struct subscriber *sender);
int link_state_should_forward_broadcast(struct subscriber *transmitter);
int link_unicast_ack(struct subscriber *subscriber, struct overlay_interface *interface, struct sockaddr_in addr);
int link_add_destinations(struct overlay_frame *frame);
int generate_nonce(unsigned char *nonce,int bytes);

@ -37,13 +37,13 @@ teardown() {
}
is_published() {
tfw_log "grep \"PUBLISHED.*$1\" $LOGA"
grep "PUBLISHED.*$1" $LOGA || return 1
tfw_log "grep \"PUBLISHED.*$1\" $instance_servald_log"
grep "PUBLISHED.*$1" $instance_servald_log || return 1
return 0
}
sent_directory_request() {
grep "Sending directory registration" $1 || return 1
grep "Sending directory registration" $instance_servald_log || return 1
return 0
}
@ -68,9 +68,9 @@ setup_publish() {
doc_publish="Publish and retrieve a directory entry"
test_publish() {
wait_until sent_directory_request $LOGB
wait_until sent_directory_request $LOGC
wait_until sent_directory_request $LOGD
foreach_instance +B +C +D
wait_until sent_directory_request
set_instance +A
wait_until is_published $SIDB
wait_until is_published $SIDC
wait_until is_published $SIDD
@ -107,51 +107,52 @@ start_routing_instance() {
wait_until interface_up
}
configure_node() {
executeOk_servald config \
set interfaces.0.file dummy1 \
set interfaces.0.send_broadcasts 0 \
set interfaces.0.drop_broadcasts on \
set interfaces.0.default_route 1 \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set interfaces.0.dummy_netmask 255.255.255.0
}
setup_routing() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B +C create_single_identity
>$SERVALD_VAR/dummy1
foreach_instance +A +B +C \
executeOk_servald config \
set interfaces.0.file dummy1 \
set interfaces.0.mdp_tick_ms 0 \
set interfaces.0.default_route 1 \
set interfaces.0.dummy_netmask 255.255.255.0
configure_node
set_instance +A
executeOk_servald config \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set dna.helper.executable "$servald_build_root/directory_service" \
set debug.dnahelper on
set_instance +B
executeOk_servald config \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set directory.service $SIDA \
set hosts.$SIDA.address 10.0.1.1
set_instance +C
executeOk_servald config \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set directory.service $SIDA \
set hosts.$SIDA.address 10.0.1.1
foreach_instance +B +C \
executeOk_servald config \
set directory.service $SIDA \
set hosts.$SIDA.address 10.0.1.1
foreach_instance +A +B +C start_routing_instance
wait_until grep "DNAHELPER got STARTED ACK" $LOGA
set_instance +A
wait_until grep "DNAHELPER got STARTED ACK" $instance_servald_log
}
doc_routing="Ping via relay node"
test_routing() {
foreach_instance +B +C \
wait_until sent_directory_request $instance_servald_log
wait_until sent_directory_request
set_instance +A
wait_until is_published $SIDB
wait_until is_published $SIDC
set_instance +B
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA:UNICAST :"
assertStdoutGrep --matches=1 "^$SIDA:UNICAST:"
executeOk_servald dna lookup "$DIDC"
assertStdoutLineCount '==' 1
assertStdoutGrep --matches=1 "^sid://$SIDC/local/$DIDC:$DIDC:$NAMEC\$"
set_instance +C
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA:UNICAST :"
assertStdoutGrep --matches=1 "^$SIDA:UNICAST:"
executeOk_servald dna lookup "$DIDB"
assertStdoutLineCount '==' 1
assertStdoutGrep --matches=1 "^sid://$SIDB/local/$DIDB:$DIDB:$NAMEB\$"

@ -66,16 +66,7 @@ setup_common() {
set_instance +B
}
doc_FileTransfer="New bundle and update transfer to one node"
setup_FileTransfer() {
setup_common
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_FileTransfer() {
receive_and_update_bundle() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list
@ -90,6 +81,19 @@ test_FileTransfer() {
assert_rhizome_received file2
}
doc_FileTransfer="New bundle and update transfer to one node"
setup_FileTransfer() {
setup_common
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_FileTransfer() {
receive_and_update_bundle
}
doc_EncryptedTransfer="Encrypted payload can be opened by destination"
setup_EncryptedTransfer() {
setup_common
@ -159,18 +163,7 @@ setup_HTTPTransport() {
foreach_instance +B assert_peers_are_instances +A
}
test_HTTPTransport() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
set_instance +A
rhizome_update_file file1 file2
set_instance +B
wait_until bundle_received_by $BID:$VERSION +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file2
assert_rhizome_received file2
receive_and_update_bundle
}
doc_MDPTransport="Rhizome over MDP transport"
@ -186,20 +179,30 @@ setup_MDPTransport() {
foreach_instance +B assert_peers_are_instances +A
}
test_MDPTransport() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
set_instance +A
rhizome_update_file file1 file2
set_instance +B
wait_until bundle_received_by $BID:$VERSION +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file2
assert_rhizome_received file2
receive_and_update_bundle
}
doc_UnicastTransfer="Rhizome over unicast MDP link"
setup_UnicastTransfer() {
setup_common
foreach_instance +A +B \
executeOk_servald config \
set rhizome.http.enable 0
set_instance +A
rhizome_add_file file1
set_instance +B
executeOk_servald config \
set interfaces.1.file foo \
set interfaces.1.drop_broadcasts on
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_UnicastTransfer() {
receive_and_update_bundle
}
#common setup and test routines for transferring a 1MB file
setup_bigfile_common() {
set_instance +A
@ -213,7 +216,7 @@ setup_bigfile_common() {
}
bigfile_common_test() {
set_instance +B
wait_until bundle_received_by $BID:$VERSION +B
wait_until --timeout=120 bundle_received_by $BID:$VERSION +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
@ -237,7 +240,7 @@ setup_FileTransferUnreliableBigMDP() {
executeOk_servald config \
set rhizome.http.enable 0 \
set interfaces.1.file dummy \
set interfaces.1.drop_broadcasts 20
set interfaces.1.drop_packets 10
setup_bigfile_common
}
test_FileTransferUnreliableBigMDP() {

@ -18,13 +18,6 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# Routing conditions that are known to not be working or covered by tests;
# - No support for multi-hop paths involving any unicast links
# - unicast only links need to be shared in the routing table
# - routing table needs to be shared to unicast only peers
# - unicast IP information should be shared among unicast peers
#
source "${0%/*}/../testframework.sh"
source "${0%/*}/../testdefs.sh"
@ -46,17 +39,17 @@ has_link() {
set_instance $1
executeOk_servald route print
tfw_log "Looking for link from $1 to $2 ($4)"
if ! grep "^${4}:\(BROADCAST \|UNICAST \)\{1,\}:dummy.*:0*\$" $_tfw_tmp/stdout; then
if ! grep "^${4}:\(BROADCAST\|UNICAST\):dummy.*:0*\$" $_tfw_tmp/stdout; then
tfw_log "Link not found"
# tfw_log "^${4}:\(BROADCAST \|UNICAST \)\{1,\}:dummy.*:0*\$"
# tfw_log "^${4}:\(BROADCAST\|UNICAST\):dummy.*:0*\$"
# tfw_cat --stdout --stderr
return 1
fi
[ $4 = $5 ] && return 0;
tfw_log "Path from $1 to $3 should be via $2 ($5, $4)"
if ! grep "^${5}:INDIRECT ::${4}\$" $_tfw_tmp/stdout; then
if ! grep "^${5}:INDIRECT::${4}\$" $_tfw_tmp/stdout; then
tfw_log "No path found"
# tfw_log "^${5}:INDIRECT ::${4}\$"
# tfw_log "^${5}:INDIRECT::${4}\$"
# tfw_cat --stdout --stderr
return 1
fi
@ -117,8 +110,8 @@ setup_single_link() {
foreach_instance +A +B start_routing_instance
}
test_single_link() {
wait_until path_exists +A +B
wait_until path_exists +B +A
wait_until --timeout=10 path_exists +A +B
wait_until --timeout=5 path_exists +B +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
@ -141,14 +134,14 @@ test_multiple_ids() {
executeOk_servald mdp ping --timeout=3 $SIDB2 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB1:BROADCAST UNICAST :dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDB2:INDIRECT ::$SIDB1"
assertStdoutGrep --matches=1 "^$SIDB1:BROADCAST:dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDB2:INDIRECT::$SIDB1"
set_instance +B
executeOk_servald mdp ping --timeout=3 $SIDA2 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA1:BROADCAST UNICAST :dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDA2:INDIRECT ::$SIDA1"
assertStdoutGrep --matches=1 "^$SIDA1:BROADCAST:dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDA2:INDIRECT::$SIDA1"
}
doc_single_mdp="Use single MDP per packet encapsulation"
@ -228,6 +221,9 @@ test_multiple_nodes() {
wait_until path_exists +A +B
wait_until path_exists +A +C
wait_until path_exists +A +D
wait_until path_exists +B +A
wait_until path_exists +C +A
wait_until path_exists +D +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
@ -237,33 +233,53 @@ test_multiple_nodes() {
tfw_cat --stdout --stderr
}
doc_scan="Simulate isolated clients"
doc_scan="Network scan with isolated clients"
setup_scan() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B add_interface 1
set_instance +B
executeOk_servald config \
set interfaces.1.dummy_address 127.0.1.11
foreach_instance +A +B \
foreach_instance +A +B +C create_single_identity
foreach_instance +A +B +C add_interface 1
foreach_instance +A +B +C \
executeOk_servald config \
set interfaces.1.drop_broadcasts 100
foreach_instance +A +B start_routing_instance
set interfaces.1.drop_broadcasts on
foreach_instance +A +B +C start_routing_instance
}
test_scan() {
set_instance +A
executeOk_servald scan
wait_until scan_completed
wait_until has_seen_instances +B
wait_until --timeout=10 has_seen_instances +B +C
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
assertStdoutGrep --matches=1 "^$SIDC:UNICAST:"
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
set_instance +B
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:UNICAST :"
assertStdoutGrep --matches=1 "^$SIDA:UNICAST:"
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT:"
executeOk_servald mdp ping --timeout=3 $SIDC 1
tfw_cat --stdout --stderr
}
doc_scan_one="Network scan a single address"
setup_scan_one() {
setup_scan
}
test_scan_one() {
set_instance +A
executeOk_servald scan 127.0.1.2
wait_until scan_completed
wait_until --timeout=10 has_seen_instances +B
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
assertStdoutGrep --matches=0 "^$SIDC:"
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
}
scan_completed() {
grep "Scan completed" $LOG||return1
grep "Scan completed" $instance_servald_log || return 1
return 0
}
@ -275,16 +291,20 @@ setup_single_filter() {
foreach_instance +A +B add_interface 1
set_instance +B
executeOk_servald config \
set interfaces.1.dummy_address 127.0.1.11 \
set interfaces.1.drop_broadcasts 100
set interfaces.1.drop_broadcasts on
foreach_instance +A +B start_routing_instance
}
test_single_filter() {
wait_until path_exists +A +B
wait_until path_exists +B +A
wait_until --timeout=10 path_exists +A +B
wait_until --timeout=5 path_exists +B +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
set_instance +B
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA:BROADCAST:"
}
doc_broadcast_only="Broadcast packets only"
@ -304,7 +324,7 @@ test_broadcast_only() {
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST :"
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST:"
}
doc_prefer_unicast="Prefer unicast packets"
@ -326,8 +346,7 @@ test_prefer_unicast() {
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST UNICAST :"
assertGrep "$instance_servald_log" 'Choosing to send via unicast'
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
}
doc_multihop_linear="Start 4 instances in a linear arrangement"
@ -356,6 +375,31 @@ test_multihop_linear() {
assertStdoutGrep --matches=1 "^6:$SIDA\$"
}
doc_unicast_route="Route across unicast links"
setup_unicast_route() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B +C +D create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +B +C add_interface 2
foreach_instance +C +D add_interface 3
set_instance +A
executeOk_servald config \
set interfaces.1.drop_broadcasts on
set_instance +C
executeOk_servald config \
set interfaces.2.drop_broadcasts on \
set interfaces.3.drop_broadcasts on
foreach_instance +A +B +C +D start_routing_instance
}
test_unicast_route() {
wait_until --timeout=20 path_exists +A +B +C +D
wait_until --timeout=5 path_exists +D +C +B +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDD 1
tfw_cat --stdout --stderr
}
setup_offline() {
setup_servald
assert_no_servald_processes
@ -372,7 +416,7 @@ instance_offline() {
for ((N=1; 1; ++N)); do
local sidvar=SID${I#+}$N
[ -n "${!sidvar}" ] || break
if ! grep "LINK STATE.*${!sidvar}.*UNREACHABLE" $instance_servald_log; then
if ! grep "NOT REACHABLE sid=${!sidvar}" $instance_servald_log; then
return 1
fi
done
@ -431,7 +475,7 @@ setup_multi_interface() {
multi_has_link() {
executeOk_servald route print
grep "^$1:BROADCAST UNICAST :dummyeth" $_tfw_tmp/stdout || return 1
grep "^$1:BROADCAST:dummyeth" $_tfw_tmp/stdout || return 1
return 0
}
@ -452,7 +496,7 @@ setup_ping_unreliable() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 40
set interfaces.1.drop_packets 40
foreach_instance +A +B start_routing_instance
}
test_ping_unreliable() {
@ -471,11 +515,11 @@ setup_ping_unreliable2() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 40
set interfaces.1.drop_packets 40
foreach_instance +B +C add_interface 2
foreach_instance +B +C \
executeOk_servald config \
set interfaces.2.drop_broadcasts 40
set interfaces.2.drop_packets 40
foreach_instance +A +B +C start_routing_instance
}
test_ping_unreliable2() {
@ -494,7 +538,7 @@ setup_brping_unreliable() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 20
set interfaces.1.drop_packets 20
foreach_instance +A +B start_routing_instance
}
test_brping_unreliable() {
@ -515,16 +559,16 @@ setup_unreliable_links() {
foreach_instance +A +C add_interface 3
set_instance +A
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.3.drop_broadcasts 70
set interfaces.1.drop_packets 5 \
set interfaces.3.drop_packets 70
set_instance +B
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.2.drop_broadcasts 5
set interfaces.1.drop_packets 5 \
set interfaces.2.drop_packets 5
set_instance +C
executeOk_servald config \
set interfaces.2.drop_broadcasts 5 \
set interfaces.3.drop_broadcasts 70
set interfaces.2.drop_packets 5 \
set interfaces.3.drop_packets 70
foreach_instance +A +B +C start_routing_instance
}
test_unreliable_links() {
@ -550,24 +594,24 @@ setup_unreliable_links2() {
foreach_instance +C +D add_interface 6
set_instance +A
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.2.drop_broadcasts 40 \
set interfaces.3.drop_broadcasts 90
set interfaces.1.drop_packets 5 \
set interfaces.2.drop_packets 40 \
set interfaces.3.drop_packets 90
set_instance +B
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.4.drop_broadcasts 5 \
set interfaces.5.drop_broadcasts 40
set interfaces.1.drop_packets 5 \
set interfaces.4.drop_packets 5 \
set interfaces.5.drop_packets 40
set_instance +C
executeOk_servald config \
set interfaces.2.drop_broadcasts 40 \
set interfaces.4.drop_broadcasts 5 \
set interfaces.6.drop_broadcasts 5
set interfaces.2.drop_packets 40 \
set interfaces.4.drop_packets 5 \
set interfaces.6.drop_packets 5
set_instance +D
executeOk_servald config \
set interfaces.3.drop_broadcasts 90 \
set interfaces.5.drop_broadcasts 40 \
set interfaces.6.drop_broadcasts 5
set interfaces.3.drop_packets 90 \
set interfaces.5.drop_packets 40 \
set interfaces.6.drop_packets 5
foreach_instance +A +B +C +D start_routing_instance
}
test_unreliable_links2() {
@ -603,7 +647,7 @@ test_circle() {
executeOk_servald mdp ping --timeout=3 $SIDC 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT :"
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT:"
stop_servald_server +B
foreach_instance +A +C \
wait_until --timeout=10 instance_offline +B
@ -613,7 +657,7 @@ test_circle() {
executeOk_servald mdp ping --timeout=3 $SIDC 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT :"
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT:"
}
setup_crowded_mess() {
@ -636,10 +680,10 @@ test_crowded_mess() {
foreach_instance +A +H \
wait_until has_seen_instances +A +H
set_instance +A
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDH:INDIRECT:"
executeOk_servald mdp ping --timeout=3 $SIDH 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDH:INDIRECT :"
}
runTests "$@"