Major refactor to packet queuing and delivery in preparation for unicast link tracking.

Unicast tests broken
This commit is contained in:
Jeremy Lakeman 2013-08-08 15:20:31 +09:30
parent d47d1b1684
commit 81afc42d8b
19 changed files with 712 additions and 811 deletions

View File

@ -2133,16 +2133,22 @@ int app_route_print(const struct cli_parsed *parsed, struct cli_context *context
char flags[32];
strbuf b = strbuf_local(flags, sizeof flags);
if (p->reachable & REACHABLE_SELF)
strbuf_puts(b, "SELF ");
if (p->reachable & REACHABLE_ASSUMED)
strbuf_puts(b, "ASSUMED ");
if (p->reachable & REACHABLE_BROADCAST)
strbuf_puts(b, "BROADCAST ");
if (p->reachable & REACHABLE_UNICAST)
strbuf_puts(b, "UNICAST ");
if (p->reachable & REACHABLE_INDIRECT)
strbuf_puts(b, "INDIRECT ");
switch (p->reachable){
case REACHABLE_SELF:
strbuf_puts(b, "SELF");
break;
case REACHABLE_BROADCAST:
strbuf_puts(b, "BROADCAST");
break;
case REACHABLE_UNICAST:
strbuf_puts(b, "UNICAST");
break;
case REACHABLE_INDIRECT:
strbuf_puts(b, "INDIRECT");
break;
default:
strbuf_sprintf(b, "%d", p->reachable);
}
cli_put_string(context, strbuf_str(b), ":");
cli_put_string(context, p->interface_name, ":");
cli_put_string(context, alloca_tohex_sid(p->neighbour), "\n");

View File

@ -94,7 +94,7 @@ static void directory_update(struct sched_ent *alarm){
load_directory_config();
if (directory_service){
if (subscriber_is_reachable(directory_service) & REACHABLE){
if (directory_service->reachable & REACHABLE){
directory_send_keyring(directory_service);
unschedule(alarm);

View File

@ -242,7 +242,8 @@ void keyring_free_identity(keyring_identity *id)
if (id->subscriber){
id->subscriber->identity=NULL;
set_reachable(id->subscriber, REACHABLE_NONE);
if (id->subscriber->reachable == REACHABLE_SELF)
id->subscriber->reachable = REACHABLE_NONE;
}
bzero(id,sizeof(keyring_identity));
@ -858,7 +859,8 @@ int keyring_decrypt_pkr(keyring_file *k,keyring_context *c,
if (id->keypairs[i]->type == KEYTYPE_CRYPTOBOX){
id->subscriber = find_subscriber(id->keypairs[i]->public_key, SID_SIZE, 1);
if (id->subscriber){
set_reachable(id->subscriber, REACHABLE_SELF);
if (id->subscriber->reachable==REACHABLE_NONE)
id->subscriber->reachable=REACHABLE_SELF;
id->subscriber->identity = id;
if (!my_subscriber)
my_subscriber=id->subscriber;
@ -1067,7 +1069,8 @@ keyring_identity *keyring_create_identity(keyring_file *k,keyring_context *c, co
// add new identity to in memory table
id->subscriber = find_subscriber(id->keypairs[0]->public_key, SID_SIZE, 1);
if (id->subscriber){
set_reachable(id->subscriber, REACHABLE_SELF);
if (id->subscriber->reachable==REACHABLE_NONE)
id->subscriber->reachable=REACHABLE_SELF;
id->subscriber->identity = id;
if (!my_subscriber)
my_subscriber=id->subscriber;

View File

@ -208,20 +208,14 @@ int overlay_address_append(struct decode_context *context, struct overlay_buffer
return WHY("No address supplied");
if(context
&& context->packet_version>=1
&& context->interface
&& subscriber == context->interface->other_device
&& context->interface->point_to_point){
&& subscriber == context->point_to_point_device){
if (ob_append_byte(b, OA_CODE_P2P_YOU))
return -1;
}else if(context
&& context->packet_version>=1
&& context->interface
&& !subscriber->send_full
&& subscriber == my_subscriber
&& context->interface->other_device
&& context->interface->point_to_point
&& (!context->encoding_header || !context->interface->local_echo)){
&& context->point_to_point_device
&& (context->encoding_header==0 || !context->interface->local_echo)){
if (ob_append_byte(b, OA_CODE_P2P_ME))
return -1;
}else if (context && subscriber==context->sender){
@ -346,7 +340,8 @@ int overlay_address_parse(struct decode_context *context, struct overlay_buffer
switch(len){
case OA_CODE_P2P_YOU:
if (context->interface && context->interface->point_to_point){
// if we don't know who they are, we can't assume they mean us.
if (context->point_to_point_device){
*subscriber=my_subscriber;
context->previous=my_subscriber;
}else{
@ -356,11 +351,11 @@ int overlay_address_parse(struct decode_context *context, struct overlay_buffer
return 0;
case OA_CODE_P2P_ME:
if (context->interface && context->interface->point_to_point && context->interface->other_device){
*subscriber=context->interface->other_device;
if (context->point_to_point_device){
*subscriber=context->point_to_point_device;
context->previous=*subscriber;
}else{
WHYF("Could not resolve address on %s, I don't know who is on the other end of this link!", context->interface->name);
WHYF("Could not resolve address, I don't know who is on the other end of this link!");
context->invalid_addresses=1;
}
return 0;
@ -404,18 +399,23 @@ int send_please_explain(struct decode_context *context, struct subscriber *sourc
if (!context->sender)
frame->source_full=1;
frame->destination = destination;
if (destination && (destination->reachable & REACHABLE)){
frame->ttl = PAYLOAD_TTL_DEFAULT; // MAX?
frame->destination = destination;
}else{
DEBUGF("Need to send explanation to destination that isn't routable");
// send both a broadcast & unicast response out the same interface this packet arrived on.
frame->ttl=1;// how will this work with olsr??
overlay_broadcast_generate_address(&frame->broadcast_id);
if (context->interface){
frame->destination_resolved=1;
frame->next_hop = destination;
frame->recvaddr = context->addr;
frame->interface = context->interface;
frame->destination = destination;
frame->destinations[frame->destination_count++].destination=add_destination_ref(context->interface->destination);
struct network_destination *dest = create_unicast_destination(context->addr, context->interface);
if (dest)
frame->destinations[frame->destination_count++].destination=dest;
}else{
FATAL("This context doesn't have an interface?");
}
}
@ -434,7 +434,8 @@ int process_explain(struct overlay_frame *frame){
struct decode_context context;
bzero(&context, sizeof context);
context.sender = frame->source;
context.interface = frame->interface;
while(ob_remaining(b)>0){
int len = ob_get(b);
if (len<=0 || len>SID_SIZE)

View File

@ -54,9 +54,9 @@ struct subscriber{
// should we send the full address once?
int send_full;
// sequence number for this unicast or broadcast destination
int sequence;
int max_packet_version;
// overlay routing information
struct overlay_node *node;
@ -69,22 +69,14 @@ struct subscriber{
// result of routing calculations;
int reachable;
// highest seen packet version
int max_packet_version;
// if indirect, who is the next hop?
struct subscriber *next_hop;
// if direct, or unicast, where do we send packets?
struct overlay_interface *interface;
struct network_destination *destination;
// if reachable&REACHABLE_UNICAST send packets to this address, else use the interface broadcast address
struct sockaddr_in address;
time_ms_t last_stun_request;
time_ms_t last_probe;
time_ms_t last_probe_response;
time_ms_t last_tx;
time_ms_t last_explained;
// public signing key details for remote peers
@ -115,6 +107,7 @@ struct decode_context{
struct overlay_frame *please_explain;
struct subscriber *sender;
struct subscriber *previous;
struct subscriber *point_to_point_device;
};
extern struct subscriber *my_subscriber;
@ -122,9 +115,7 @@ extern struct subscriber *directory_service;
struct subscriber *find_subscriber(const unsigned char *sid, int len, int create);
void enum_subscribers(struct subscriber *start, int(*callback)(struct subscriber *, void *), void *context);
int subscriber_is_reachable(struct subscriber *subscriber);
int set_reachable(struct subscriber *subscriber, int reachable);
int reachable_unicast(struct subscriber *subscriber, overlay_interface *interface, struct in_addr addr, int port);
int set_reachable(struct subscriber *subscriber, struct network_destination *destination, struct subscriber *next_hop);
int load_subscriber_address(struct subscriber *subscriber);
int process_explain(struct overlay_frame *frame);

View File

@ -53,19 +53,11 @@ static int re_init_socket(int interface_index);
#define DEBUG_packet_visualise(M,P,N) logServalPacket(LOG_LEVEL_DEBUG, __WHENCE__, (M), (P), (N))
static int mark_subscriber_down(struct subscriber *subscriber, void *context)
{
overlay_interface *interface=context;
if ((subscriber->reachable & REACHABLE_DIRECT) && subscriber->interface == interface)
set_reachable(subscriber, REACHABLE_NONE);
return 0;
}
static void
overlay_interface_close(overlay_interface *interface){
link_interface_down(interface);
enum_subscribers(NULL, mark_subscriber_down, interface);
INFOF("Interface %s addr %s is down", interface->name, inet_ntoa(interface->destination.address.sin_addr));
INFOF("Interface %s addr %s is down",
interface->name, inet_ntoa(interface->address.sin_addr));
unschedule(&interface->alarm);
unwatch(&interface->alarm);
close(interface->alarm.poll.fd);
@ -193,7 +185,7 @@ static int interface_type_priority(int type)
}
// Which interface is better for routing packets?
// returns 0 to indicate the first is better, 1 for the second
// returns -1 to indicate the first is better, 0 for equal, 1 for the second
int overlay_interface_compare(overlay_interface *one, overlay_interface *two)
{
if (one==two)
@ -201,11 +193,9 @@ int overlay_interface_compare(overlay_interface *one, overlay_interface *two)
int p1 = interface_type_priority(one->type);
int p2 = interface_type_priority(two->type);
if (p1<p2)
return 0;
return -1;
if (p2<p1)
return 1;
if (two<one)
return 1;
return 0;
}
@ -318,9 +308,10 @@ overlay_interface_init_socket(int interface_index)
overlay_interface_init_any(interface->port);
const struct sockaddr *addr = (const struct sockaddr *)&interface->address;
interface->alarm.poll.fd = overlay_bind_socket(addr, sizeof(interface->destination.address), interface->name);
interface->alarm.poll.fd = overlay_bind_socket(
(const struct sockaddr *)&interface->address,
sizeof(interface->address), interface->name);
if (interface->alarm.poll.fd<0){
interface->state=INTERFACE_STATE_DOWN;
return WHYF("Failed to bind interface %s", interface->name);
@ -328,8 +319,8 @@ overlay_interface_init_socket(int interface_index)
if (config.debug.packetrx || config.debug.io) {
char srctxt[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, (const void *)&interface->destination.address.sin_addr, srctxt, INET_ADDRSTRLEN))
DEBUGF("Bound to %s:%d", srctxt, ntohs(interface->destination.address.sin_port));
if (inet_ntop(AF_INET, (const void *)&interface->address.sin_addr, srctxt, INET_ADDRSTRLEN))
DEBUGF("Bound to %s:%d", srctxt, ntohs(interface->address.sin_port));
}
interface->alarm.poll.events=POLLIN;
@ -382,18 +373,18 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
interface->prefer_unicast = ifconfig->prefer_unicast;
interface->default_route = ifconfig->default_route;
interface->socket_type = ifconfig->socket_type;
interface->destination.interface = interface;
interface->destination.encapsulation = ifconfig->encapsulation;
interface->uartbps = ifconfig->uartbps;
interface->ctsrts = ifconfig->ctsrts;
interface->destination = new_destination(interface, ifconfig->encapsulation);
/* Pick a reasonable default MTU.
This will ultimately get tuned by the bandwidth and other properties of the interface */
interface->destination.mtu=1200;
interface->mtu = 1200;
interface->point_to_point = ifconfig->point_to_point;
interface->state=INTERFACE_STATE_DOWN;
interface->alarm.poll.fd=0;
interface->debug = ifconfig->debug;
interface->point_to_point = ifconfig->point_to_point;
// How often do we announce ourselves on this interface?
int tick_ms=-1;
@ -449,19 +440,15 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
if (tick_ms<0)
return WHYF("No tick interval specified for interface %s", name);
interface->destination.tick_ms = tick_ms;
interface->destination->tick_ms = tick_ms;
limit_init(&interface->destination.transfer_limit, packet_interval);
limit_init(&interface->destination->transfer_limit, packet_interval);
interface->address.sin_family=AF_INET;
interface->address.sin_port = htons(ifconfig->port);
interface->address.sin_addr = ifconfig->dummy_address;
interface->netmask=ifconfig->dummy_netmask;
interface->destination.address.sin_family=AF_INET;
interface->destination.address.sin_port = htons(ifconfig->port);
interface->destination.address.sin_addr.s_addr = interface->address.sin_addr.s_addr | ~interface->netmask.s_addr;
interface->destination->address.sin_family=AF_INET;
interface->destination->address.sin_port = htons(ifconfig->port);
interface->alarm.function = overlay_interface_poll;
interface_poll_stats.name="overlay_interface_poll";
@ -469,7 +456,7 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
if (ifconfig->socket_type==SOCK_DGRAM){
interface->address.sin_addr = src_addr;
interface->destination.address.sin_addr = broadcast;
interface->destination->address.sin_addr = broadcast;
interface->netmask = netmask;
interface->local_echo = 1;
@ -478,6 +465,9 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
}else{
char read_file[1024];
interface->address.sin_addr = ifconfig->dummy_address;
interface->netmask = ifconfig->dummy_netmask;
interface->destination->address.sin_addr.s_addr = interface->address.sin_addr.s_addr | ~interface->netmask.s_addr;
interface->local_echo = interface->point_to_point?0:1;
strbuf d = strbuf_local(read_file, sizeof read_file);
@ -525,8 +515,8 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
directory_registration();
INFOF("Allowing a maximum of %d packets every %lldms",
interface->destination.transfer_limit.burst_size,
interface->destination.transfer_limit.burst_length);
interface->destination->transfer_limit.burst_size,
interface->destination->transfer_limit.burst_length);
overlay_interface_count++;
return 0;
@ -603,7 +593,7 @@ static int should_drop(struct overlay_interface *interface, struct sockaddr_in a
if (memcmp(&addr, &interface->address, sizeof(addr))==0){
return interface->drop_unicasts;
}
if (memcmp(&addr, &interface->destination.address, sizeof(addr))==0){
if (memcmp(&addr, &interface->destination->address, sizeof(addr))==0){
if (interface->drop_broadcasts == 0)
return 0;
if (interface->drop_broadcasts >= 100)
@ -760,11 +750,11 @@ static void overlay_interface_poll(struct sched_ent *alarm)
alarm->alarm=-1;
time_ms_t now = gettime_ms();
if (interface->state==INTERFACE_STATE_UP && interface->destination.tick_ms>0){
if (now >= interface->destination.last_tx+interface->destination.tick_ms)
overlay_send_tick_packet(interface);
alarm->alarm=interface->destination.last_tx+interface->destination.tick_ms;
alarm->deadline=alarm->alarm+interface->destination.tick_ms/2;
if (interface->state==INTERFACE_STATE_UP && interface->destination->tick_ms>0){
if (now >= interface->destination->last_tx+interface->destination->tick_ms)
overlay_send_tick_packet(interface->destination);
alarm->alarm=interface->destination->last_tx+interface->destination->tick_ms;
alarm->deadline=alarm->alarm+interface->destination->tick_ms/2;
}
switch(interface->socket_type){
@ -816,17 +806,18 @@ static void overlay_interface_poll(struct sched_ent *alarm)
}
int
overlay_broadcast_ensemble(overlay_interface *interface,
struct sockaddr_in *recipientaddr,
overlay_broadcast_ensemble(struct network_destination *destination,
unsigned char *bytes,int len)
{
interface->destination.last_tx = gettime_ms();
assert(destination && destination->interface);
struct overlay_interface *interface = destination->interface;
destination->last_tx = gettime_ms();
if (config.debug.packettx)
{
DEBUGF("Sending this packet via interface %s (len=%d)",interface->name,len);
DEBUG_packet_visualise(NULL,bytes,len);
}
if (config.debug.packettx){
DEBUGF("Sending this packet via interface %s (len=%d)",interface->name,len);
//DEBUG_packet_visualise(NULL,bytes,len);
}
if (interface->state!=INTERFACE_STATE_UP){
return WHYF("Cannot send to interface %s as it is down", interface->name);
@ -874,7 +865,7 @@ overlay_broadcast_ensemble(overlay_interface *interface,
{
struct file_packet packet={
.src_addr = interface->address,
.dst_addr = *recipientaddr,
.dst_addr = destination->address,
.pid = getpid(),
};
@ -907,14 +898,15 @@ overlay_broadcast_ensemble(overlay_interface *interface,
case SOCK_DGRAM:
{
if (config.debug.overlayinterfaces)
DEBUGF("Sending %d byte overlay frame on %s to %s",len,interface->name,inet_ntoa(recipientaddr->sin_addr));
DEBUGF("Sending %d byte overlay frame on %s to %s",len,interface->name,inet_ntoa(destination->address.sin_addr));
if(sendto(interface->alarm.poll.fd,
bytes, len, 0, (struct sockaddr *)recipientaddr, sizeof(struct sockaddr_in)) != len){
bytes, len, 0, (struct sockaddr *)&destination->address, sizeof(destination->address)) != len){
int e=errno;
WHY_perror("sendto(c)");
// only close the interface on some kinds of errors
if (e==ENETDOWN || e==EINVAL)
overlay_interface_close(interface);
// TODO mark unicast destination as failed
return -1;
}
return 0;
@ -972,7 +964,7 @@ overlay_interface_register(char *name,
int broadcast_match = 0;
int name_match =0;
if (overlay_interfaces[i].destination.address.sin_addr.s_addr == broadcast.s_addr)
if (overlay_interfaces[i].destination->address.sin_addr.s_addr == broadcast.s_addr)
broadcast_match = 1;
name_match = !strcasecmp(overlay_interfaces[i].name, name);
@ -1001,7 +993,7 @@ overlay_interface_register(char *name,
if (found_interface>=0){
// try to reactivate the existing interface
overlay_interfaces[found_interface].address.sin_addr = addr;
overlay_interfaces[found_interface].destination.address.sin_addr = broadcast;
overlay_interfaces[found_interface].destination->address.sin_addr = broadcast;
overlay_interfaces[found_interface].netmask = mask;
return re_init_socket(found_interface);
}

View File

@ -28,6 +28,8 @@ static void update_limit_state(struct limit_state *state, time_ms_t now){
/* When should we next allow this thing to occur? */
time_ms_t limit_next_allowed(struct limit_state *state){
time_ms_t now = gettime_ms();
if (!state->burst_length)
return now;
update_limit_state(state, now);
if (state->sent < state->burst_size)
@ -38,6 +40,8 @@ time_ms_t limit_next_allowed(struct limit_state *state){
/* Can we do this now? if so, track it */
int limit_is_allowed(struct limit_state *state){
time_ms_t now = gettime_ms();
if (!state->burst_length)
return 0;
update_limit_state(state, now);
if (state->sent >= state->burst_size){
return -1;
@ -50,6 +54,7 @@ int limit_is_allowed(struct limit_state *state){
int limit_init(struct limit_state *state, int rate_micro_seconds){
if (rate_micro_seconds==0){
state->burst_size=0;
state->burst_length=1;
}else{
state->burst_size = (MIN_BURST_LENGTH / rate_micro_seconds)+1;
state->burst_length = (state->burst_size * rate_micro_seconds) / 1000.0;
@ -57,76 +62,40 @@ int limit_init(struct limit_state *state, int rate_micro_seconds){
return 0;
}
// quick test to make sure the specified route is valid.
int subscriber_is_reachable(struct subscriber *subscriber){
if (!subscriber)
return REACHABLE_NONE;
int set_reachable(struct subscriber *subscriber,
struct network_destination *destination, struct subscriber *next_hop){
int ret = subscriber->reachable;
int reachable = REACHABLE_NONE;
if (destination)
reachable = destination->unicast?REACHABLE_UNICAST:REACHABLE_BROADCAST;
else if(next_hop)
reachable = REACHABLE_INDIRECT;
if (ret==REACHABLE_INDIRECT){
if (!subscriber->next_hop)
ret = REACHABLE_NONE;
// avoid infinite recursion...
else if (!(subscriber->next_hop->reachable & REACHABLE_DIRECT))
ret = REACHABLE_NONE;
else{
int r = subscriber_is_reachable(subscriber->next_hop);
if (r&REACHABLE_ASSUMED)
ret = REACHABLE_NONE;
else if (!(r & REACHABLE_DIRECT))
ret = REACHABLE_NONE;
}
}
if (ret & REACHABLE_DIRECT){
// make sure the interface is still up
if (!subscriber->interface)
ret=REACHABLE_NONE;
else if (subscriber->interface->state!=INTERFACE_STATE_UP)
ret=REACHABLE_NONE;
}
return ret;
}
int set_reachable(struct subscriber *subscriber, int reachable){
if (subscriber->reachable==reachable)
if (subscriber->reachable==reachable
&& subscriber->next_hop==next_hop
&& subscriber->destination == destination)
return 0;
int old_value = subscriber->reachable;
subscriber->reachable=reachable;
subscriber->reachable = reachable;
set_destination_ref(&subscriber->destination, destination);
subscriber->next_hop = next_hop;
// These log messages are for use in tests. Changing them may break test scripts.
if (config.debug.overlayrouting) {
if (config.debug.overlayrouting || config.debug.linkstate) {
switch (reachable) {
case REACHABLE_NONE:
DEBUGF("NOT REACHABLE sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_SELF:
break;
case REACHABLE_INDIRECT:
DEBUGF("REACHABLE INDIRECTLY sid=%s", alloca_tohex_sid(subscriber->sid));
DEBUGF("(via %s, %d)",subscriber->next_hop?alloca_tohex_sid(subscriber->next_hop->sid):"NOONE!"
,subscriber->next_hop?subscriber->next_hop->reachable:0);
DEBUGF("REACHABLE INDIRECTLY sid=%s, via %s",
alloca_tohex_sid(subscriber->sid), alloca_tohex_sid(next_hop->sid));
break;
case REACHABLE_UNICAST:
DEBUGF("REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
DEBUGF("REACHABLE VIA UNICAST sid=%s, on %s ", alloca_tohex_sid(subscriber->sid), destination->interface->name);
break;
case REACHABLE_BROADCAST:
DEBUGF("REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_UNICAST:
DEBUGF("REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_UNICAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_UNICAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
DEBUGF("REACHABLE VIA BROADCAST sid=%s, on %s ", alloca_tohex_sid(subscriber->sid), destination->interface->name);
break;
}
}
@ -144,24 +113,7 @@ int set_reachable(struct subscriber *subscriber, int reachable){
if ((!(old_value & REACHABLE)) && (reachable & REACHABLE))
monitor_announce_peer(subscriber->sid);
return 0;
}
// mark the subscriber as reachable via reply unicast packet
int reachable_unicast(struct subscriber *subscriber, overlay_interface *interface, struct in_addr addr, int port){
if (subscriber->reachable&REACHABLE)
return -1;
if (subscriber->node)
return -1;
subscriber->interface = interface;
subscriber->address.sin_family = AF_INET;
subscriber->address.sin_addr = addr;
subscriber->address.sin_port = htons(port);
set_reachable(subscriber, REACHABLE_UNICAST);
return 0;
return 1;
}
int resolve_name(const char *name, struct in_addr *addr){
@ -191,7 +143,7 @@ int resolve_name(const char *name, struct in_addr *addr){
// load a unicast address from configuration
int load_subscriber_address(struct subscriber *subscriber)
{
if (subscriber_is_reachable(subscriber)&REACHABLE)
if (!subscriber || subscriber->reachable&REACHABLE)
return 0;
int i = config_host_list__get(&config.hosts, (const sid_t*)subscriber->sid);
// No unicast configuration? just return.
@ -220,12 +172,17 @@ int load_subscriber_address(struct subscriber *subscriber)
}
if (config.debug.overlayrouting)
DEBUGF("Loaded address %s:%d for %s", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), alloca_tohex_sid(subscriber->sid));
return overlay_send_probe(subscriber, addr, interface, OQ_MESH_MANAGEMENT);
struct network_destination *destination = create_unicast_destination(addr, interface);
if (!destination)
return -1;
int ret=overlay_send_probe(subscriber, destination, OQ_MESH_MANAGEMENT);
release_destination_ref(destination);
return ret;
}
/* Collection of unicast echo responses to detect working links */
int
overlay_mdp_service_probe(overlay_mdp_frame *mdp)
overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp)
{
IN();
if (mdp->out.src.port!=MDP_PORT_ECHO || mdp->out.payload_length != sizeof(struct probe_contents)){
@ -233,8 +190,7 @@ overlay_mdp_service_probe(overlay_mdp_frame *mdp)
RETURN(-1);
}
struct subscriber *peer = find_subscriber(mdp->out.src.sid, SID_SIZE, 0);
if (peer->reachable == REACHABLE_SELF)
if (frame->source->reachable == REACHABLE_SELF)
RETURN(0);
struct probe_contents probe;
@ -242,63 +198,19 @@ overlay_mdp_service_probe(overlay_mdp_frame *mdp)
if (probe.addr.sin_family!=AF_INET)
RETURN(WHY("Unsupported address family"));
struct overlay_interface *interface = &overlay_interfaces[probe.interface];
// if a peer is already reachable, and this probe would change the interface, ignore it
// TODO track unicast links better in route_link.c
if (peer->reachable & REACHABLE_INDIRECT)
RETURN(0);
if (peer->reachable & REACHABLE_DIRECT && peer->interface && peer->interface != interface)
RETURN(0);
peer->last_probe_response = gettime_ms();
peer->interface = &overlay_interfaces[probe.interface];
peer->address.sin_family = AF_INET;
peer->address.sin_addr = probe.addr.sin_addr;
peer->address.sin_port = probe.addr.sin_port;
int r=REACHABLE_UNICAST;
// Don't turn assumed|broadcast into unicast|broadcast
if (!(peer->reachable & REACHABLE_ASSUMED))
r |= (peer->reachable & REACHABLE_DIRECT);
set_reachable(peer, r);
RETURN(0);
RETURN(link_unicast_ack(frame->source, &overlay_interfaces[probe.interface], probe.addr));
OUT();
}
int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay_interface *interface, int queue){
if (interface==NULL)
interface = overlay_interface_find(addr.sin_addr, 1);
if (!interface)
return WHY("I don't know which interface to use");
if (interface->state!=INTERFACE_STATE_UP)
return WHY("I can't send a probe if the interface is down.");
// don't send a unicast probe unless its on the same interface that is already known to be reachable
if (peer && peer->reachable & REACHABLE_INDIRECT)
return -1;
if (peer && (peer->reachable & REACHABLE_DIRECT) && peer->interface && peer->interface != interface)
return -1;
if (addr.sin_addr.s_addr==0) {
if (config.debug.overlayinterfaces)
DEBUG("I can't send a probe to address 0.0.0.0");
return -1;
}
if (addr.sin_port==0) {
if (config.debug.overlayinterfaces)
DEBUG("I can't send a probe to port 0");
return -1;
}
int overlay_send_probe(struct subscriber *peer, struct network_destination *destination, int queue){
// never send unicast probes over a stream interface
if (interface->socket_type==SOCK_STREAM)
if (destination->interface->socket_type==SOCK_STREAM)
return 0;
time_ms_t now = gettime_ms();
if (peer && peer->last_probe+1000>now)
return -1;
// XXXTODO throttle probes...
// time_ms_t now = gettime_ms();
// if (peer && peer->last_probe+1000>now)
// return -1;
struct overlay_frame *frame=malloc(sizeof(struct overlay_frame));
bzero(frame,sizeof(struct overlay_frame));
@ -307,16 +219,14 @@ int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay
frame->next_hop = frame->destination = peer;
frame->ttl=1;
frame->queue=queue;
frame->destination_resolved=1;
frame->recvaddr=addr;
frame->unicast=1;
frame->interface=interface;
frame->destination_count=1;
frame->destinations[0].destination=add_destination_ref(destination);
frame->payload = ob_new();
frame->source_full = 1;
// TODO call mdp payload encryption / signing without calling overlay_mdp_dispatch...
if (peer)
peer->last_probe=gettime_ms();
// XXXTODO rate limit
// if (peer)
// peer->last_probe=gettime_ms();
if (overlay_mdp_encode_ports(frame->payload, MDP_PORT_ECHO, MDP_PORT_PROBE)){
op_free(frame);
@ -329,9 +239,9 @@ int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay
return -1;
}
struct probe_contents probe;
probe.addr=addr;
probe.addr=destination->address;
// get interface number
probe.interface = interface - overlay_interfaces;
probe.interface = destination->interface - overlay_interfaces;
bcopy(&probe, dst, sizeof(struct probe_contents));
if (overlay_payload_enqueue(frame)){
op_free(frame);
@ -339,13 +249,17 @@ int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay
}
if (config.debug.overlayrouting)
DEBUGF("Queued probe packet on interface %s to %s:%d for %s",
interface->name, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), peer?alloca_tohex_sid(peer->sid):"ANY");
destination->interface->name,
inet_ntoa(destination->address.sin_addr), ntohs(destination->address.sin_port),
peer?alloca_tohex_sid(peer->sid):"ANY");
return 0;
}
// append the address of a unicast link into a packet buffer
static int overlay_append_unicast_address(struct subscriber *subscriber, struct overlay_buffer *buff)
{
return -1;
/*XXXTODO
if (subscriber->reachable & REACHABLE_ASSUMED || !(subscriber->reachable & REACHABLE_UNICAST)){
if (config.debug.overlayrouting)
DEBUGF("Unable to give address of %s, %d", alloca_tohex_sid(subscriber->sid),subscriber->reachable);
@ -362,6 +276,7 @@ static int overlay_append_unicast_address(struct subscriber *subscriber, struct
if (config.debug.overlayrouting)
DEBUGF("Added STUN info for %s", alloca_tohex_sid(subscriber->sid));
return 0;
*/
}
// append the address of all neighbour unicast links into a packet buffer
@ -455,7 +370,11 @@ int overlay_mdp_service_stun(overlay_mdp_frame *mdp)
if (!subscriber || (subscriber->reachable!=REACHABLE_NONE))
continue;
overlay_send_probe(subscriber, addr, NULL, OQ_MESH_MANAGEMENT);
struct network_destination *destination = create_unicast_destination(addr, NULL);
if (destination){
overlay_send_probe(subscriber, destination, OQ_MESH_MANAGEMENT);
release_destination_ref(destination);
}
}
ob_free(buff);
@ -465,11 +384,10 @@ int overlay_mdp_service_stun(overlay_mdp_frame *mdp)
int overlay_send_stun_request(struct subscriber *server, struct subscriber *request){
if ((!server) || (!request))
return -1;
if (!(subscriber_is_reachable(server)&REACHABLE))
if (!(server->reachable&REACHABLE))
return -1;
// don't bother with a stun request if the peer is already reachable directly
// TODO link timeouts
if (subscriber_is_reachable(request)&REACHABLE_DIRECT)
if (request->reachable&REACHABLE_DIRECT)
return -1;
time_ms_t now = gettime_ms();

View File

@ -842,8 +842,10 @@ static int routing_table(struct subscriber *subscriber, void *context){
if (subscriber->reachable==REACHABLE_INDIRECT && subscriber->next_hop)
memcpy(r->neighbour, subscriber->next_hop->sid, SID_SIZE);
if (subscriber->reachable & REACHABLE_DIRECT && subscriber->interface)
strcpy(r->interface_name, subscriber->interface->name);
if (subscriber->reachable & REACHABLE_DIRECT
&& subscriber->destination
&& subscriber->destination->interface)
strcpy(r->interface_name, subscriber->destination->interface->name);
else
r->interface_name[0]=0;
overlay_mdp_reply(mdp_named.poll.fd, state->recvaddr_un, state->recvaddrlen, &reply);
@ -872,7 +874,12 @@ static void overlay_mdp_scan(struct sched_ent *alarm)
while(state->current <= stop){
addr.sin_addr.s_addr=htonl(state->current);
if (addr.sin_addr.s_addr != state->interface->address.sin_addr.s_addr){
if (overlay_send_probe(NULL, addr, state->interface, OQ_ORDINARY))
struct network_destination *destination = create_unicast_destination(addr, state->interface);
if (!destination)
break;
int ret = overlay_send_probe(NULL, destination, OQ_ORDINARY);
release_destination_ref(destination);
if (ret)
break;
}
state->current++;
@ -999,7 +1006,7 @@ void overlay_mdp_poll(struct sched_ent *alarm)
scans[i].interface = interface;
scans[i].current = ntohl(interface->address.sin_addr.s_addr & interface->netmask.s_addr)+1;
scans[i].last = ntohl(interface->destination.address.sin_addr.s_addr)-1;
scans[i].last = ntohl(interface->destination->address.sin_addr.s_addr)-1;
if (scans[i].last - scans[i].current>0x10000){
INFOF("Skipping scan on interface %s as the address space is too large",interface->name);
continue;

View File

@ -74,23 +74,15 @@ int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
int send_broadcast=1;
if (dest){
if (!(dest->reachable&REACHABLE_DIRECT))
send_broadcast=0;
if (dest->reachable&REACHABLE_UNICAST && dest->interface && dest->interface->prefer_unicast)
send_broadcast=0;
}
if (send_broadcast){
if (dest && dest->reachable&REACHABLE_UNICAST){
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(dest->sid, reply.out.dst.sid, SID_SIZE);
}else{
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
memset(reply.out.dst.sid,0xff,SID_SIZE);
reply.out.ttl=1;
}else{
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(dest->sid, reply.out.dst.sid, SID_SIZE);
}
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
@ -404,13 +396,13 @@ int overlay_mdp_try_interal_services(struct overlay_frame *frame, overlay_mdp_fr
{
IN();
switch(mdp->out.dst.port) {
case MDP_PORT_LINKSTATE: RETURN(link_receive(mdp));
case MDP_PORT_LINKSTATE: RETURN(link_receive(frame, mdp));
case MDP_PORT_VOMP: RETURN(vomp_mdp_received(mdp));
case MDP_PORT_KEYMAPREQUEST: RETURN(keyring_mapping_request(keyring,mdp));
case MDP_PORT_DNALOOKUP: RETURN(overlay_mdp_service_dnalookup(mdp));
case MDP_PORT_ECHO: RETURN(overlay_mdp_service_echo(mdp));
case MDP_PORT_TRACE: RETURN(overlay_mdp_service_trace(mdp));
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(mdp));
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(frame, mdp));
case MDP_PORT_STUNREQ: RETURN(overlay_mdp_service_stun_req(mdp));
case MDP_PORT_STUN: RETURN(overlay_mdp_service_stun(mdp));
case MDP_PORT_RHIZOME_REQUEST:

View File

@ -144,18 +144,9 @@ static void parse_frame(struct overlay_buffer *buff){
goto end;
// locate the interface we should send outgoing unicast packets to
overlay_interface *interface = overlay_interface_find(*addr, 1);
if (interface){
// always update the IP address we heard them from, even if we don't need to use it right now
context.sender->address.sin_family = AF_INET;
context.sender->address.sin_addr = *addr;
// assume the port number of the other servald matches our local port number configuration
context.sender->address.sin_port = htons(interface->port);
if (context.sender->reachable==REACHABLE_NONE){
set_reachable(context.sender, REACHABLE_UNICAST|REACHABLE_ASSUMED);
overlay_send_probe(context.sender, context.sender->address, interface, OQ_MESH_MANAGEMENT);
}
context.interface = overlay_interface_find(*addr, 1);
if (context.interface){
link_received_packet(&context, -1, 0);
}
// read subscriber id of payload origin

View File

@ -26,6 +26,15 @@
#define FRAME_NOT_SENT -1
#define FRAME_DONT_SEND -2
#define MAX_PACKET_DESTINATIONS OVERLAY_MAX_INTERFACES
struct packet_destination{
// if we've sent this packet once, what was the envelope sequence number?
int sent_sequence;
time_ms_t delay_until;
struct network_destination *destination;
};
struct overlay_frame {
struct overlay_frame *prev;
struct overlay_frame *next;
@ -39,33 +48,31 @@ struct overlay_frame {
void *send_context;
int (*send_hook)(struct overlay_frame *, int seq, void *context);
/* What sequence number have we used to send this packet on this interface.
*/
int interface_sent_sequence[OVERLAY_MAX_INTERFACES];
time_ms_t delay_until;
struct packet_destination destinations[MAX_PACKET_DESTINATIONS];
int destination_count;
// each payload gets a sequence number that is reused on retransmission
int32_t mdp_sequence;
time_ms_t interface_dont_send_until[OVERLAY_MAX_INTERFACES];
struct broadcast broadcast_id;
// null if destination is broadcast
struct subscriber *destination;
struct broadcast broadcast_id;
struct subscriber *next_hop;
// should we force the encoding to include the entire source public key?
int source_full;
struct subscriber *source;
/* IPv4 address the frame was received from, or should be sent to */
int destination_resolved;
/* IPv4 address the frame was received from */
struct sockaddr_in recvaddr;
overlay_interface *interface;
char unicast;
int packet_version;
time_ms_t dont_send_until;
int sender_interface;
/* Actual payload */
struct overlay_buffer *payload;
time_ms_t enqueued_at;
};

View File

@ -35,7 +35,6 @@ struct sockaddr_in loopback;
int overlay_packet_init_header(int packet_version, int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, int seq){
if (packet_version <0 || packet_version > SUPPORTED_PACKET_VERSION)
@ -47,9 +46,17 @@ int overlay_packet_init_header(int packet_version, int encapsulation,
return -1;
if (ob_append_byte(buff, encapsulation))
return -1;
if (context->interface->point_to_point
&& context->interface->other_device
&& packet_version>=1)
context->point_to_point_device = context->interface->other_device;
context->encoding_header=1;
if (overlay_address_append(context, buff, my_subscriber))
return -1;
context->encoding_header=0;
context->sender = my_subscriber;
@ -84,7 +91,7 @@ int process_incoming_frame(time_ms_t now, struct overlay_interface *interface, s
break;
// data frames
case OF_TYPE_RHIZOME_ADVERT:
overlay_rhizome_saw_advertisements(id,f,now);
overlay_rhizome_saw_advertisements(id,context,f,now);
break;
case OF_TYPE_DATA:
case OF_TYPE_DATA_VOICE:
@ -257,9 +264,11 @@ int parseMdpPacketHeader(struct decode_context *context, struct overlay_frame *f
int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface *interface,
struct sockaddr_in *addr, struct overlay_buffer *buffer){
IN();
time_ms_t now = gettime_ms();
context->interface = interface;
if (interface->point_to_point && interface->other_device)
context->point_to_point_device = interface->other_device;
context->sender_interface = 0;
context->packet_version = ob_get(buffer);
@ -284,60 +293,27 @@ int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface
if (packet_flags & PACKET_SEQ)
sender_seq = ob_get(buffer)&0xFF;
if (addr)
context->addr=*addr;
if (context->sender){
// ignore packets that have been reflected back to me
if (context->sender->reachable==REACHABLE_SELF){
if (config.debug.verbose && config.debug.overlayframes)
DEBUG("Completely ignore packets I sent");
RETURN(1);
}
if (context->sender->max_packet_version < context->packet_version)
context->sender->max_packet_version = context->packet_version;
if (interface->point_to_point && interface->other_device!=context->sender){
INFOF("Established point to point link with %s on %s", alloca_tohex_sid(context->sender->sid), interface->name);
context->interface->other_device = context->sender;
}
// TODO probe unicast links when we detect an address change.
// if this is a dummy announcement for a node that isn't in our routing table
if (context->sender->reachable == REACHABLE_NONE) {
context->sender->interface = interface;
if (addr)
context->sender->address = *addr;
else
bzero(&context->sender->address, sizeof context->sender->address);
context->sender->last_probe = 0;
// assume for the moment, that we can reply with the same packet type
if (packet_flags&PACKET_UNICAST){
set_reachable(context->sender, REACHABLE_UNICAST|REACHABLE_ASSUMED);
}else{
set_reachable(context->sender, REACHABLE_BROADCAST|REACHABLE_ASSUMED);
}
context->point_to_point_device = context->interface->other_device = context->sender;
}
/* Note the probe payload must be queued before any SID/SAS request so we can force the packet to have a full sid */
if (addr && (context->sender->last_probe==0 || now - context->sender->last_probe > interface->destination.tick_ms*10))
overlay_send_probe(context->sender, *addr, interface, OQ_MESH_MANAGEMENT);
link_received_packet(context->sender, interface, context->sender_interface, sender_seq, packet_flags & PACKET_UNICAST);
}else{
// send a unicast probe, just incase they never hear our broadcasts.
if (addr)
overlay_send_probe(NULL, *addr, interface, OQ_MESH_MANAGEMENT);
DEBUGF("Received packet seq %d from %s on %s",
sender_seq, alloca_tohex_sid(context->sender->sid), interface->name);
}
if (addr){
if (packet_flags & PACKET_UNICAST)
context->addr=*addr;
else
context->addr=interface->destination.address;
}
link_received_packet(context, sender_seq, packet_flags & PACKET_UNICAST);
RETURN(0);
OUT();
}
@ -409,7 +385,7 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
struct overlay_buffer *b = ob_static(packet, len);
ob_limitsize(b, len);
context.interface = f.interface = interface;
f.interface = interface;
if (recvaddr)
f.recvaddr = *((struct sockaddr_in *)recvaddr);
else
@ -423,6 +399,7 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
ob_free(b);
RETURN(ret);
}
f.sender_interface = context.sender_interface;
while(ob_remaining(b)>0){
context.invalid_addresses=0;
@ -436,7 +413,7 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
break;
}
// TODO allow for one byte length?
// TODO allow for single byte length?
unsigned int payload_len;
switch (context.encapsulation){

View File

@ -80,7 +80,7 @@ static int overlay_frame_build_header(int packet_version, struct decode_context
return 0;
}
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
int overlay_frame_append_payload(struct decode_context *context, int encapsulation,
struct overlay_frame *p, struct overlay_buffer *b)
{
/* Convert a payload (frame) structure into a series of bytes.
@ -110,7 +110,7 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa
p->destination, p->source))
goto cleanup;
if (interface->destination.encapsulation == ENCAP_OVERLAY){
if (encapsulation == ENCAP_OVERLAY){
if (ob_append_ui16(b, ob_position(p->payload)))
goto cleanup;
}

View File

@ -37,13 +37,11 @@ typedef struct overlay_txqueue {
overlay_txqueue overlay_tx[OQ_MAX];
// short lived data while we are constructing an outgoing packet
struct outgoing_packet{
overlay_interface *interface;
int32_t seq;
struct network_destination *destination;
int seq;
int packet_version;
int i;
struct subscriber *unicast_subscriber;
struct sockaddr_in dest;
int header_length;
struct overlay_buffer *buffer;
struct decode_context context;
@ -93,6 +91,9 @@ overlay_queue_remove(overlay_txqueue *queue, struct overlay_frame *frame){
queue->length--;
while(frame->destination_count>0)
release_destination_ref(frame->destinations[--frame->destination_count].destination);
op_free(frame);
return next;
@ -152,31 +153,21 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (!p) return WHY("Cannot queue NULL");
do{
if (p->destination_resolved)
if (p->destination_count>0)
break;
if (!p->destination)
break;
int r = subscriber_is_reachable(p->destination);
if (r&REACHABLE)
if (p->destination->reachable&REACHABLE)
break;
if (directory_service){
r = subscriber_is_reachable(directory_service);
if (r&REACHABLE)
break;
}
return WHYF("Cannot send %x packet, destination %s is %s", p->type,
alloca_tohex_sid(p->destination->sid), r==REACHABLE_SELF?"myself":"unreachable");
if (directory_service&&directory_service->reachable&REACHABLE)
break;
return WHYF("Cannot send %x packet, destination %s is unreachable", p->type,
alloca_tohex_sid(p->destination->sid));
} while(0);
if (p->queue>=OQ_MAX)
return WHY("Invalid queue specified");
/* queue a unicast probe if we haven't for a while. */
if (p->destination && (p->destination->last_probe==0 || gettime_ms() - p->destination->last_probe > 5000))
overlay_send_probe(p->destination, p->destination->address, p->destination->interface, OQ_MESH_MANAGEMENT);
overlay_txqueue *queue = &overlay_tx[p->queue];
if (config.debug.packettx)
@ -192,12 +183,7 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (queue->length>=queue->maxLength)
return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength);
{
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
p->interface_sent_sequence[i]=FRAME_DONT_SEND;
}
if (ob_position(p->payload)>=MDP_MTU)
FATAL("Queued packet is too big");
@ -205,49 +191,35 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (p->packet_version<=0)
p->packet_version=1;
if (p->destination_resolved){
p->interface_sent_sequence[p->interface - overlay_interfaces]=FRAME_NOT_SENT;
}else{
if (p->destination){
// allow the packet to be resent
if (p->resend == 0)
p->resend = 1;
}else{
int i;
int interface_copies = 0;
if (config.debug.verbose && config.debug.overlayframes)
DEBUGF("Enqueue packet %p", p);
if (p->destination_count==0){
if (!p->destination){
// hook to allow for flooding via olsr
olsr_send(p);
// make sure there is an interface up that allows broadcasts
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP
|| !overlay_interfaces[i].send_broadcasts)
continue;
int oldest_version = link_state_interface_oldest_neighbour(&overlay_interfaces[i]);
if (oldest_version <0){
if (config.debug.verbose && config.debug.overlayframes)
DEBUGF("Skipping broadcast on interface %s, as we have no neighbours", overlay_interfaces[i].name);
continue;
}
// make sure all neighbours can hear this packet
if (oldest_version < p->packet_version)
p->packet_version = oldest_version;
p->interface_sent_sequence[i]=FRAME_NOT_SENT;
interface_copies++;
}
link_add_broadcast_destinations(p);
// just drop it now
if (interface_copies == 0){
if (p->destination_count == 0){
if (config.debug.verbose && config.debug.overlayframes)
DEBUGF("Not transmitting broadcast packet, as we have no neighbours on any interface");
DEBUGF("Not transmitting, as we have no neighbours on any interface");
return -1;
}
// allow the packet to be resent
if (p->resend == 0)
p->resend = 1;
}
// allow the packet to be resent
if (p->resend == 0)
p->resend = 1;
}
if (config.debug.verbose && config.debug.overlayframes){
int i=0;
for (i=0;i<p->destination_count;i++)
DEBUGF("Sending %s on interface %s",
p->destinations[i].destination->unicast?"unicast":"broadcast",
p->destinations[i].destination->interface->name);
}
struct overlay_frame *l=queue->last;
@ -267,32 +239,28 @@ int overlay_payload_enqueue(struct overlay_frame *p)
}
static void
overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination,
int unicast, int packet_version,
overlay_interface *interface, struct sockaddr_in addr){
packet->interface = interface;
packet->context.interface = interface;
packet->i = (interface - overlay_interfaces);
packet->dest=addr;
overlay_init_packet(struct outgoing_packet *packet, int packet_version,
struct network_destination *destination){
packet->context.interface = destination->interface;
packet->buffer=ob_new();
packet->seq=-1;
packet->packet_version = packet_version;
packet->context.packet_version = packet_version;
if (unicast)
packet->unicast_subscriber = destination;
else
packet->seq = interface->destination.sequence_number = (interface->destination.sequence_number + 1)&0xFFFF;
ob_limitsize(packet->buffer, packet->interface->destination.mtu);
packet->destination = destination;
packet->seq = destination->sequence_number = (destination->sequence_number + 1) & 0xFFFF;
overlay_packet_init_header(packet_version, interface->destination.encapsulation,
ob_limitsize(packet->buffer, destination->interface->mtu);
int i=destination->interface - overlay_interfaces;
overlay_packet_init_header(packet_version, destination->encapsulation,
&packet->context, packet->buffer,
destination, unicast, packet->i, packet->seq);
destination->unicast,
i, packet->seq);
packet->header_length = ob_position(packet->buffer);
if (config.debug.overlayframes)
DEBUGF("Creating packet for interface %s, seq %d, %s",
interface->name, packet->seq,
unicast?"unicast":"broadcast");
DEBUGF("Creating %d packet for interface %s, seq %d, %s",
packet_version,
destination->interface->name, destination->sequence_number,
destination->unicast?"unicast":"broadcast");
}
int overlay_queue_schedule_next(time_ms_t next_allowed_packet){
@ -312,52 +280,49 @@ int overlay_queue_schedule_next(time_ms_t next_allowed_packet){
return 0;
}
static void remove_destination(struct overlay_frame *frame, int i){
release_destination_ref(frame->destinations[i].destination);
frame->destination_count --;
if (i<frame->destination_count)
frame->destinations[i]=frame->destinations[frame->destination_count];
}
// update the alarm time and return 1 if changed
static int
overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
do{
if (frame->destination_resolved)
break;
if (!frame->destination)
break;
if (subscriber_is_reachable(frame->destination)&REACHABLE)
break;
if (directory_service){
if (subscriber_is_reachable(directory_service)&REACHABLE)
break;
}
// ignore payload alarm if the destination is currently unreachable
return 0;
}while(0);
time_ms_t next_allowed_packet=0;
if (frame->destination_resolved && frame->interface){
// don't include interfaces which are currently transmitting using a serial buffer
if (frame->interface->tx_bytes_pending>0)
return 0;
next_allowed_packet = limit_next_allowed(&frame->interface->destination.transfer_limit);
}else{
// check all interfaces
// check all interfaces
if (frame->destination_count>0){
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
for(i=0;i<frame->destination_count;i++)
{
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue;
if ((!frame->destination) && (frame->interface_sent_sequence[i]==FRAME_DONT_SEND ||
link_state_interface_oldest_neighbour(&overlay_interfaces[i])<0))
continue;
time_ms_t next_packet = limit_next_allowed(&overlay_interfaces[i].destination.transfer_limit);
if (next_packet < frame->interface_dont_send_until[i])
next_packet = frame->interface_dont_send_until[i];
time_ms_t next_packet = limit_next_allowed(&frame->destinations[i].destination->transfer_limit);
if (next_packet < frame->destinations[i].delay_until)
next_packet = frame->destinations[i].delay_until;
if (next_allowed_packet==0||next_packet < next_allowed_packet)
next_allowed_packet = next_packet;
}
if (next_allowed_packet==0)
if (next_allowed_packet==0){
return 0;
}
}else{
// ignore payload alarm if the destination is currently unreachable
if (!frame->destination){
return 0;
}
if (!(frame->destination->reachable&REACHABLE)){
if (!directory_service || !(directory_service->reachable&REACHABLE)){
return 0;
}
}
}
if (next_allowed_packet < frame->dont_send_until)
next_allowed_packet = frame->dont_send_until;
if (next_allowed_packet < frame->delay_until)
next_allowed_packet = frame->delay_until;
if (next_allowed_packet < frame->enqueued_at)
next_allowed_packet = frame->enqueued_at;
if (ob_position(frame->payload)<SMALL_PACKET_SIZE &&
next_allowed_packet < frame->enqueued_at + overlay_tx[frame->queue].small_packet_grace_interval)
@ -382,136 +347,114 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
continue;
}
/* Note, once we queue a broadcast packet we are committed to sending it out every interface,
even if we hear it from somewhere else in the mean time
/* Note, once we queue a broadcast packet we are currently
* committed to sending it to every destination,
* even if we hear it from somewhere else in the mean time
*/
// ignore payloads that are waiting for ack / nack resends
if (frame->dont_send_until > now)
if (frame->delay_until > now)
goto skip;
if (packet->buffer && packet->destination->encapsulation==ENCAP_SINGLE)
goto skip;
// quickly skip payloads that have no chance of fitting
if (packet->buffer && ob_limit(frame->payload) > ob_remaining(packet->buffer))
goto skip;
if (!frame->destination_resolved){
if (frame->destination_count==0){
frame->next_hop = frame->destination;
if (frame->next_hop){
// Where do we need to route this payload next?
int r = subscriber_is_reachable(frame->next_hop);
int r = frame->next_hop->reachable;
// first, should we try to bounce this payload off the directory service?
if (r==REACHABLE_NONE &&
directory_service &&
frame->next_hop!=directory_service){
frame->next_hop=directory_service;
r=subscriber_is_reachable(directory_service);
r=directory_service->reachable;
}
// do we need to route via a neighbour?
if (r&REACHABLE_INDIRECT){
frame->next_hop = frame->next_hop->next_hop;
r = subscriber_is_reachable(frame->next_hop);
r = frame->next_hop->reachable;
}
if (!(r&REACHABLE_DIRECT)){
goto skip;
}
frame->interface = frame->next_hop->interface;
frame->destinations[frame->destination_count++].destination=add_destination_ref(frame->next_hop->destination);
// if both broadcast and unicast are available, pick on based on interface preference
if ((r&(REACHABLE_UNICAST|REACHABLE_BROADCAST))==(REACHABLE_UNICAST|REACHABLE_BROADCAST)){
if (frame->interface->prefer_unicast){
r=REACHABLE_UNICAST;
// used by tests
if (config.debug.overlayframes)
DEBUGF("Choosing to send via unicast for %s", alloca_tohex_sid(frame->destination->sid));
}else
r=REACHABLE_BROADCAST;
}
if(r&REACHABLE_UNICAST){
frame->recvaddr = frame->next_hop->address;
frame->unicast = 1;
}else
frame->recvaddr = frame->interface->destination.address;
// degrade packet version if required to reach the destination
if (frame->packet_version > frame->next_hop->max_packet_version)
frame->packet_version = frame->next_hop->max_packet_version;
frame->destination_resolved=1;
}else{
}
}
int destination_index=-1;
{
int i;
for (i=frame->destination_count -1;i>=0;i--){
struct network_destination *dest = frame->destinations[i].destination;
if (!dest)
FATALF("Destination %d is NULL", i);
if (!dest->interface)
FATALF("Destination interface %d is NULL", i);
if (dest->interface->state!=INTERFACE_STATE_UP){
// remove this destination
remove_destination(frame, i);
continue;
}
if (frame->destinations[i].delay_until>now)
continue;
if (packet->buffer){
// check if we can stuff into this packet
if (frame->interface_sent_sequence[packet->i]==FRAME_DONT_SEND || frame->interface_dont_send_until[packet->i] >now)
goto skip;
frame->interface = packet->interface;
frame->recvaddr = packet->interface->destination.address;
if (frame->packet_version!=packet->packet_version)
continue;
}else{
// find an interface that we haven't broadcast on yet
frame->interface = NULL;
int i, keep=0;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP ||
frame->interface_sent_sequence[i]==FRAME_DONT_SEND ||
link_state_interface_oldest_neighbour(&overlay_interfaces[i])<0)
continue;
keep=1;
if (frame->interface_dont_send_until[i] >now)
continue;
time_ms_t next_allowed = limit_next_allowed(&overlay_interfaces[i].destination.transfer_limit);
if (next_allowed > now)
continue;
frame->interface = &overlay_interfaces[i];
frame->recvaddr = overlay_interfaces[i].destination.address;
// is this packet going our way?
if (dest==packet->destination){
destination_index=i;
frame->destinations[i].sent_sequence = dest->sequence_number;
break;
}
if (!keep){
// huh, we don't need to send it anywhere?
frame = overlay_queue_remove(queue, frame);
}else{
// skip this interface if the stream tx buffer has data
if (dest->interface->socket_type==SOCK_STREAM
&& dest->interface->tx_bytes_pending>0)
continue;
}
if (!frame->interface)
goto skip;
// can we send a packet on this interface now?
if (limit_is_allowed(&dest->transfer_limit))
continue;
// send a packet to this destination
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->packet_version, dest);
destination_index=i;
frame->destinations[i].sent_sequence = dest->sequence_number;
break;
}
}
}
if (!packet->buffer){
if (frame->interface->socket_type==SOCK_STREAM){
// skip this interface if the stream tx buffer has data
if (frame->interface->tx_bytes_pending>0)
goto skip;
}
// can we send a packet on this interface now?
if (limit_is_allowed(&frame->interface->destination.transfer_limit))
goto skip;
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->packet_version, frame->interface, frame->recvaddr);
}else{
// is this packet going our way?
if (frame->interface!=packet->interface ||
frame->interface->destination.encapsulation==ENCAP_SINGLE ||
frame->packet_version!=packet->packet_version ||
memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0){
goto skip;
}
if (frame->destination_count==0){
frame = overlay_queue_remove(queue, frame);
continue;
}
if (destination_index==-1)
goto skip;
if (frame->send_hook){
// last minute check if we really want to send this frame, or track when we sent it
if (frame->send_hook(frame, packet->seq, frame->send_context)){
@ -526,64 +469,39 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
}else if(((mdp_sequence - frame->mdp_sequence)&0xFFFF) >= 64){
// too late, we've sent too many packets for the next hop to correctly de-duplicate
if (config.debug.overlayframes)
DEBUGF("Retransmition of frame %p mdp seq %d, is too late to be de-duplicated", frame, frame->mdp_sequence);
DEBUGF("Retransmition of frame %p mdp seq %d, is too late to be de-duplicated",
frame, frame->mdp_sequence);
frame = overlay_queue_remove(queue, frame);
continue;
}else{
if (config.debug.overlayframes)
DEBUGF("Retransmitting frame %p mdp seq %d, from packet seq %d in seq %d", frame, frame->mdp_sequence, frame->interface_sent_sequence[packet->i], packet->seq);
}
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)){
if (overlay_frame_append_payload(&packet->context, packet->destination->encapsulation, frame, packet->buffer)){
// payload was not queued, delay the next attempt slightly
frame->dont_send_until = now + 5;
frame->delay_until = now + 5;
goto skip;
}
frame->interface_sent_sequence[packet->i] = packet->seq;
frame->interface_dont_send_until[packet->i] = now+200;
frame->destinations[destination_index].delay_until = now+200;
if (config.debug.overlayframes){
DEBUGF("Sent payload %p, %d type %x len %d for %s via %s, seq %d",
DEBUGF("Appended payload %p, %d type %x len %d for %s via %s",
frame, frame->mdp_sequence,
frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN),
frame->interface_sent_sequence[packet->i]);
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
}
if (frame->destination)
frame->destination->last_tx=now;
if (frame->next_hop)
frame->next_hop->last_tx=now;
// mark the payload as sent
if (frame->destination_resolved){
if (frame->resend>0 && frame->packet_version>=1 && frame->next_hop && packet->seq !=-1 && (!frame->unicast)){
frame->dont_send_until = now+200;
frame->destination_resolved = 0;
if (config.debug.overlayframes)
DEBUGF("Holding onto payload for ack/nack resend in %lldms", frame->dont_send_until - now);
goto skip;
}
}else{
if (frame->resend<=0 || frame->packet_version<1 || packet->seq==-1 || frame->unicast){
// dont retransmit if we aren't sending sequence numbers, or we've run out of allowed resends
frame->interface_sent_sequence[packet->i] = FRAME_DONT_SEND;
}
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state==INTERFACE_STATE_UP &&
link_state_interface_oldest_neighbour(&overlay_interfaces[i])>=0 &&
frame->interface_sent_sequence[i]!=FRAME_DONT_SEND){
goto skip;
}
// dont retransmit if we aren't sending sequence numbers, or we've been asked not to
if (frame->packet_version<1 || frame->resend<=0 ||
frame->enqueued_at + queue->latencyTarget < frame->destinations[destination_index].delay_until){
remove_destination(frame, destination_index);
if (frame->destination_count==0){
frame = overlay_queue_remove(queue, frame);
continue;
}
}
frame = overlay_queue_remove(queue, frame);
continue;
// TODO recalc route on retransmittion??
skip:
// if we can't send the payload now, check when we should try next
@ -612,12 +530,7 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
if (config.debug.packetconstruction)
ob_dump(packet->buffer,"assembled packet");
if (overlay_broadcast_ensemble(packet->interface, &packet->dest, ob_ptr(packet->buffer), ob_position(packet->buffer))){
// sendto failed. We probably don't have a valid route
if (packet->unicast_subscriber){
set_reachable(packet->unicast_subscriber, REACHABLE_NONE);
}
}
overlay_broadcast_ensemble(packet->destination, ob_ptr(packet->buffer), ob_position(packet->buffer));
ob_free(packet->buffer);
RETURN(1);
}
@ -633,62 +546,53 @@ static void overlay_send_packet(struct sched_ent *alarm){
overlay_fill_send_packet(&packet, gettime_ms());
}
int overlay_send_tick_packet(struct overlay_interface *interface){
int overlay_send_tick_packet(struct network_destination *destination){
struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet));
packet.seq=-1;
overlay_init_packet(&packet, NULL, 0, 0, interface, interface->destination.address);
overlay_init_packet(&packet, 0, destination);
overlay_fill_send_packet(&packet, gettime_ms());
return 0;
}
// de-queue all packets that have been sent to this subscriber & have arrived.
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq)
int overlay_queue_ack(struct subscriber *neighbour, struct network_destination *destination, uint32_t ack_mask, int ack_seq)
{
int interface_id = interface - overlay_interfaces;
int i;
int i, j;
time_ms_t now = gettime_ms();
for (i=0;i<OQ_MAX;i++){
struct overlay_frame *frame = overlay_tx[i].first;
while(frame){
int frame_seq = frame->interface_sent_sequence[interface_id];
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;
for (j=frame->destination_count -1;j>=0;j--){
if (frame->destinations[j].destination==destination){
int frame_seq = frame->destinations[j].sent_sequence;
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;
if (acked){
frame->interface_sent_sequence[interface_id] = FRAME_DONT_SEND;
int discard = 1;
if (!frame->destination){
int j;
for(j=0;j<OVERLAY_MAX_INTERFACES;j++){
if (overlay_interfaces[j].state==INTERFACE_STATE_UP &&
frame->interface_sent_sequence[j]!=FRAME_DONT_SEND &&
link_state_interface_oldest_neighbour(&overlay_interfaces[j])>=0){
discard = 0;
break;
}
if (acked){
if (config.debug.overlayframes)
DEBUGF("Packet %p to %s sent by seq %d, acked with seq %d",
frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
remove_destination(frame, j);
}else if (seq_delta < 128 && frame->destination && frame->delay_until>now){
// resend immediately
if (config.debug.overlayframes)
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->delay_until = now;
overlay_calc_queue_time(&overlay_tx[i], frame);
}
}
if (discard){
if (config.debug.overlayframes)
DEBUGF("Dequeing packet %p to %s sent by seq %d, due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame = overlay_queue_remove(&overlay_tx[i], frame);
continue;
}
}
if (seq_delta < 128 && frame->destination && frame->dont_send_until>now){
// resend immediately
if (config.debug.overlayframes)
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->dont_send_until = now;
overlay_calc_queue_time(&overlay_tx[i], frame);
break;
}
}
frame = frame->next;
if (frame->destination_count==0){
frame = overlay_queue_remove(&overlay_tx[i], frame);
}else
frame = frame->next;
}
}
return 0;

View File

@ -262,7 +262,7 @@ error:
time_ms_t lookup_time=0;
int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long now)
int overlay_rhizome_saw_advertisements(int i, struct decode_context *context, struct overlay_frame *f, time_ms_t now)
{
IN();
if (!f)
@ -272,7 +272,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
RETURN(0);
int ad_frame_type=ob_get(f->payload);
struct sockaddr_in httpaddr = f->recvaddr;
struct sockaddr_in httpaddr = context->addr;
httpaddr.sin_port = htons(RHIZOME_HTTP_PORT);
int manifest_length;
rhizome_manifest *m=NULL;

View File

@ -29,6 +29,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MSG_TYPE_REQ 1
#define CACHE_BARS 60
#define MAX_OLD_BARS 40
#define BARS_PER_RESPONSE ((int)400/RHIZOME_BAR_BYTES)
#define HEAD_FLAG INT64_MAX
@ -139,7 +140,8 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
if (state->sync_end < state->highest_seen){
rhizome_sync_request(subscriber, state->sync_end, 1);
}else if(state->sync_start >0){
rhizome_sync_request(subscriber, state->sync_start, 0);
if (state->bar_count < MAX_OLD_BARS)
rhizome_sync_request(subscriber, state->sync_start, 0);
}else if(!state->sync_complete){
state->sync_complete = 1;
if (config.debug.rhizome)
@ -278,6 +280,8 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
added=1;
}
for (i=mid_point -1; i>=0; i--){
if (state->bar_count >= MAX_OLD_BARS)
break;
if (sync_cache_bar(state, bars[i], bar_tokens[i]))
added=1;
}

View File

@ -4,6 +4,7 @@
#include "overlay_packet.h"
#include "str.h"
#include "conf.h"
#include <assert.h>
/*
Link state routing;
@ -17,7 +18,7 @@ Link state routing;
*/
#define INCLUDE_ANYWAY (500)
#define INCLUDE_ANYWAY (200)
#define MAX_LINK_STATES 512
#define FLAG_HAS_INTERFACE (1<<0)
@ -35,7 +36,7 @@ struct link{
struct subscriber *transmitter;
struct link *parent;
struct overlay_interface *interface;
struct network_destination *destination;
struct subscriber *receiver;
// What's the last ack we've heard so we don't process nacks twice.
@ -56,22 +57,26 @@ struct link{
char calculating;
};
struct neighbour_link{
struct neighbour_link *_next;
// statistics of incoming half of network links
struct link_in{
struct link_in *_next;
// which of their interfaces are these stats for?
// which of our interfaces did we hear it on?
overlay_interface *interface;
// which of their interfaces did they send it from?
int neighbour_interface;
// which interface did we hear it on?
struct overlay_interface *interface;
// very simple time based link up/down detection;
// when will we consider the link broken?
time_ms_t link_timeout;
// unicast or broadcast?
char unicast;
int ack_sequence;
uint64_t ack_mask;
int ack_counter;
};
struct neighbour{
@ -83,15 +88,20 @@ struct neighbour{
char path_version;
// when do we assume the link is dead because they stopped hearing us or vice versa?
time_ms_t neighbour_link_timeout;
time_ms_t link_in_timeout;
// if a neighbour is telling the world that they are using us as a next hop, we need to send acks & nacks with high priority
// otherwise we don't care too much about packet loss.
char using_us;
// is this neighbour still sending selfacks?
char legacy_protocol;
// when a neighbour is using us as a next hop *and* they are using us to send packets to one of our neighbours,
// we must forward their broadcasts
time_ms_t routing_through_us;
// which of their mdp packets have we already heard and can be dropped as duplicates?
int mdp_ack_sequence;
uint64_t mdp_ack_mask;
@ -100,16 +110,12 @@ struct neighbour{
time_ms_t last_update;
int last_update_seq;
time_ms_t rtt;
int ack_counter;
// un-balanced tree of known link states
struct link *root;
// list of incoming link stats
struct neighbour_link *links, *best_link;
// is this neighbour still using selfacks?
char legacy_protocol;
struct link_in *links, *best_link;
};
// one struct per subscriber, where we track all routing information, allocated on first use
@ -120,7 +126,7 @@ struct link_state{
int hop_count;
int route_version;
// if a neighbour is free'd this link will point to invalid memory.
// do not trust this pointer unless you have just called find_best_link
// don't use this pointer directly, call find_best_link instead
struct link *link;
char calculating;
@ -141,6 +147,69 @@ static struct sched_ent link_send_alarm={
struct neighbour *neighbours=NULL;
int route_version=0;
struct network_destination * new_destination(struct overlay_interface *interface, char encapsulation){
assert(interface);
struct network_destination *ret = emalloc_zero(sizeof(struct network_destination));
if (ret){
ret->_ref_count=1;
ret->encapsulation = encapsulation;
ret->interface = interface;
// DEBUGF("Create ref %p, %d - %s", ret, ret->_ref_count, ret->interface->name);
}
return ret;
}
struct network_destination * create_unicast_destination(struct sockaddr_in addr, struct overlay_interface *interface){
if (!interface)
interface = overlay_interface_find(addr.sin_addr, 1);
if (!interface){
WHY("I don't know which interface to use");
return NULL;
}
if (interface->state!=INTERFACE_STATE_UP){
WHY("The interface is down.");
return NULL;
}
if (addr.sin_addr.s_addr==0 || addr.sin_port==0){
// WHY("Invalid unicast address");
return NULL;
}
struct network_destination *ret = new_destination(interface, ENCAP_OVERLAY);
if (ret){
ret->address = addr;
ret->unicast = 1;
ret->tick_ms = interface->destination->tick_ms;
}
return ret;
}
struct network_destination * add_destination_ref(struct network_destination *ref){
ref->_ref_count++;
// DEBUGF("Add ref %p, %d - %s", ref, ref->_ref_count, ref->interface->name);
return ref;
}
void release_destination_ref(struct network_destination *ref){
if (ref->_ref_count<=1){
// DEBUGF("Free ref %p, %d - %s", ref, ref->_ref_count, ref->interface->name);
free(ref);
}else{
ref->_ref_count--;
// DEBUGF("Drop ref %p, %d - %s", ref, ref->_ref_count, ref->interface->name);
}
}
void set_destination_ref(struct network_destination **ptr, struct network_destination *ref){
if (ref==*ptr)
return;
if (ref)
add_destination_ref(ref);
if (*ptr)
release_destination_ref(*ptr);
*ptr = ref;
}
static int NumberOfSetBits(uint32_t i)
{
i = i - ((i >> 1) & 0x55555555);
@ -188,6 +257,8 @@ static void free_links(struct link *link)
link->_left=NULL;
free_links(link->_right);
link->_right=NULL;
if (link->destination)
release_destination_ref(link->destination);
free(link);
}
@ -272,22 +343,22 @@ static void update_path_score(struct neighbour *neighbour, struct link *link){
link->calculating = 0;
}
static int find_best_link(struct subscriber *subscriber)
static struct link * find_best_link(struct subscriber *subscriber)
{
IN();
if (subscriber->reachable==REACHABLE_SELF)
RETURN(0);
RETURN(NULL);
struct link_state *state = get_link_state(subscriber);
if (state->route_version == route_version)
RETURN(0);
RETURN(state->link);
if (state->calculating)
RETURN(-1);
RETURN(NULL);
state->calculating = 1;
struct neighbour *neighbour = neighbours;
struct overlay_interface *interface = NULL;
struct network_destination *destination = NULL;
int best_hop_count = 99;
int best_drop_rate = 99;
struct link *best_link = NULL;
@ -295,7 +366,7 @@ static int find_best_link(struct subscriber *subscriber)
time_ms_t now = gettime_ms();
while (neighbour){
if (neighbour->neighbour_link_timeout < now)
if (neighbour->link_in_timeout < now)
goto next;
struct link *link = find_link(neighbour, subscriber, 0);
@ -318,7 +389,7 @@ static int find_best_link(struct subscriber *subscriber)
best_hop_count = link->hop_count;
best_drop_rate = link->path_drop_rate;
transmitter = link->transmitter;
interface = link->interface;
destination = link->destination;
best_link = link;
}
}
@ -328,9 +399,7 @@ next:
}
int changed =0;
if (state->next_hop != next_hop || state->transmitter != transmitter || state->link != best_link)
changed = 1;
if (next_hop == subscriber && (interface != subscriber->interface))
if (state->transmitter != transmitter || state->link != best_link)
changed = 1;
state->next_hop = next_hop;
@ -339,46 +408,18 @@ next:
state->route_version = route_version;
state->calculating = 0;
state->link = best_link;
int reachable = subscriber->reachable;
if (next_hop == NULL){
if ((subscriber->reachable&REACHABLE_DIRECT) != REACHABLE_UNICAST)
reachable = REACHABLE_NONE;
} else if (next_hop == subscriber){
// reset the state of any unicast probe's if the interface has changed
if (subscriber->interface != interface){
reachable = 0;
subscriber->last_probe=0;
bzero(&subscriber->address, sizeof subscriber->address);
}
reachable = REACHABLE_BROADCAST | (reachable & REACHABLE_UNICAST);
if (next_hop == subscriber)
next_hop = NULL;
subscriber->interface = interface;
} else {
reachable = REACHABLE_INDIRECT;
}
subscriber->next_hop = next_hop;
set_reachable(subscriber, reachable);
if (set_reachable(subscriber, destination, next_hop))
changed = 1;
if (changed){
if (config.debug.linkstate){
if (reachable & REACHABLE_DIRECT){
DEBUGF("LINK STATE; neighbour %s is reachable on interface %s",
alloca_tohex_sid(subscriber->sid),
interface->name);
} else {
DEBUGF("LINK STATE; next hop for %s is now %d hops, %s via %s",
alloca_tohex_sid(subscriber->sid),
best_hop_count,
next_hop?alloca_tohex_sid(next_hop->sid):"UNREACHABLE",
transmitter?alloca_tohex_sid(transmitter->sid):"NONE");
}
}
monitor_announce_link(best_hop_count, transmitter, subscriber);
state->next_update = now+5;
}
RETURN(0);
RETURN(best_link);
}
static int monitor_announce(struct subscriber *subscriber, void *context){
@ -463,25 +504,28 @@ static int append_link(struct subscriber *subscriber, void *context)
time_ms_t now = gettime_ms();
if (find_best_link(subscriber))
return 0;
if (state->next_update - INCLUDE_ANYWAY <= now){
if (subscriber->reachable==REACHABLE_SELF){
if (subscriber->reachable==REACHABLE_SELF){
if (state->next_update - INCLUDE_ANYWAY <= now){
// Other entries in our keyring are always one hop away from us.
if (append_link_state(payload, 0, my_subscriber, subscriber, -1, 1, -1, 0, 0)){
link_send_alarm.alarm = now+5;
return 1;
}
} else {
struct link *link = state->link;
if (append_link_state(payload, 0, state->transmitter, subscriber, -1, link?link->link_version:-1, -1, 0, link?link->drop_rate:32)){
// include information about this link every 5s
state->next_update = now + 5000;
}
} else {
struct link *best_link = find_best_link(subscriber);
if (state->next_update - INCLUDE_ANYWAY <= now){
if (append_link_state(payload, 0, state->transmitter, subscriber, -1,
best_link?best_link->link_version:-1, -1, 0, best_link?best_link->drop_rate:32)){
link_send_alarm.alarm = now+5;
return 1;
}
// include information about this link every 5s
state->next_update = now + 5000;
}
// include information about this link every 5s
state->next_update = now + 5000;
}
if (state->next_update < link_send_alarm.alarm)
@ -495,9 +539,9 @@ static void free_neighbour(struct neighbour **neighbour_ptr){
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; all links from neighbour %s have died", alloca_tohex_sid(n->subscriber->sid));
struct neighbour_link *link = n->links;
struct link_in *link = n->links;
while(link){
struct neighbour_link *l=link;
struct link_in *l=link;
link = l->_next;
free(l);
}
@ -513,9 +557,9 @@ static void clean_neighbours(time_ms_t now)
struct neighbour **n_ptr = &neighbours;
while (*n_ptr){
struct neighbour *n = *n_ptr;
struct neighbour_link **list = &n->links;
struct link_in **list = &n->links;
while(*list){
struct neighbour_link *link = *list;
struct link_in *link = *list;
if (link->interface->state!=INTERFACE_STATE_UP || link->link_timeout < now){
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; link expired from neighbour %s on interface %s",
@ -529,7 +573,7 @@ static void clean_neighbours(time_ms_t now)
}
// when all links to a neighbour that we are routing through expire, force a routing calculation update
struct link_state *state = get_link_state(n->subscriber);
if (state->next_hop == n->subscriber && (n->neighbour_link_timeout < now || !n->links) && state->route_version == route_version)
if (state->next_hop == n->subscriber && (n->link_in_timeout < now || !n->links) && state->route_version == route_version)
route_version++;
if (!n->links){
free_neighbour(n_ptr);
@ -539,7 +583,7 @@ static void clean_neighbours(time_ms_t now)
}
}
static int send_legacy_self_announce_ack(struct neighbour *neighbour, struct neighbour_link *link, time_ms_t now){
static int send_legacy_self_announce_ack(struct neighbour *neighbour, struct link_in *link, time_ms_t now){
struct overlay_frame *frame=emalloc_zero(sizeof(struct overlay_frame));
frame->type = OF_TYPE_SELFANNOUNCE_ACK;
frame->ttl = 6;
@ -560,13 +604,21 @@ static int send_legacy_self_announce_ack(struct neighbour *neighbour, struct nei
static int neighbour_find_best_link(struct neighbour *n)
{
// TODO compare other link stats to find the best...
struct neighbour_link *best_link=n->links;
struct link_in *best_link=n->links;
if (best_link){
struct neighbour_link *link=best_link->_next;
struct link_in *link=best_link->_next;
while(link){
if (link->interface != best_link->interface &&
overlay_interface_compare(best_link->interface, link->interface))
best_link = link;
// find the link with the best interface
switch(overlay_interface_compare(best_link->interface, link->interface)){
case -1:
break;
case 0:
if (link->unicast < best_link->unicast)
break;
// fall through
case 1:
best_link = link;
}
link = link->_next;
}
}
@ -615,13 +667,18 @@ static int send_neighbour_link(struct neighbour *n)
frame->payload = ob_new();
frame->send_hook = neighbour_link_sent;
frame->send_context = n->subscriber;
frame->resend=-1;
if (n->subscriber->reachable & REACHABLE_DIRECT && (!(n->subscriber->reachable&REACHABLE_ASSUMED))){
frame->destination_resolved = 1;
frame->interface = n->subscriber->interface;
frame->recvaddr = frame->interface->destination.address;
frame->resend=-1;
}
if (n->subscriber->reachable & REACHABLE_INDIRECT){
frame->destination = n->subscriber;
}else if (n->subscriber->reachable & REACHABLE){
// we don't wont to address the next hop to this neighbour as that will force another reply ack
// but we still want them to receive it.
frame->destinations[frame->destination_count++].destination = add_destination_ref(n->subscriber->destination);
}else
// TODO ack over unicast links too
frame->destinations[frame->destination_count++].destination=add_destination_ref(n->best_link->interface->destination);
ob_limitsize(frame->payload, 400);
overlay_mdp_encode_ports(frame->payload, MDP_PORT_LINKSTATE, MDP_PORT_LINKSTATE);
@ -639,10 +696,12 @@ static int send_neighbour_link(struct neighbour *n)
if (overlay_payload_enqueue(frame))
op_free(frame);
n->best_link->ack_counter = ACK_WINDOW;
n->last_update = now;
}
n->next_neighbour_update = n->last_update + n->best_link->interface->destination.tick_ms;
n->ack_counter = ACK_WINDOW;
n->next_neighbour_update = n->last_update + n->best_link->interface->destination->tick_ms;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Next update for %s in %lldms", alloca_tohex_sid(n->subscriber->sid), n->next_neighbour_update - gettime_ms());
OUT();
return 0;
}
@ -715,18 +774,26 @@ static void update_alarm(time_ms_t limit){
}
}
struct neighbour_link * get_neighbour_link(struct neighbour *neighbour, struct overlay_interface *interface, int sender_interface, char unicast)
struct link_in * get_neighbour_link(struct neighbour *neighbour, struct overlay_interface *interface, int sender_interface, char unicast)
{
struct neighbour_link *link = neighbour->links;
struct link_in *link = neighbour->links;
if (unicast){
if (interface->prefer_unicast)
unicast=1;
else
unicast=-1;
}
while(link){
if (link->interface == interface && link->neighbour_interface == sender_interface && link->unicast == unicast)
if (link->interface == interface
&& link->neighbour_interface == sender_interface
&& link->unicast == unicast)
return link;
link=link->_next;
}
link = emalloc_zero(sizeof(struct neighbour_link));
link = emalloc_zero(sizeof(struct link_in));
link->interface = interface;
link->neighbour_interface = sender_interface;
link->unicast = unicast;
link->neighbour_interface = sender_interface;
link->ack_sequence = -1;
link->ack_mask = 0;
link->_next = neighbour->links;
@ -739,19 +806,32 @@ struct neighbour_link * get_neighbour_link(struct neighbour *neighbour, struct o
return link;
}
int link_state_interface_oldest_neighbour(struct overlay_interface *interface)
int link_add_broadcast_destinations(struct overlay_frame *frame)
{
char added_interface[OVERLAY_MAX_INTERFACES];
bzero(added_interface, sizeof(added_interface));
struct neighbour *neighbour = neighbours;
int packet_version =-1;
while(neighbour){
if (neighbour->best_link && neighbour->best_link->interface == interface &&
(neighbour->subscriber->max_packet_version < packet_version || packet_version == -1)){
packet_version = neighbour->subscriber->max_packet_version;
for(;neighbour;neighbour = neighbour->_next){
if (neighbour->subscriber->reachable&REACHABLE_DIRECT){
struct network_destination *dest = neighbour->subscriber->destination;
// TODO set packet version per destination
if (frame->packet_version > neighbour->subscriber->max_packet_version)
frame->packet_version = neighbour->subscriber->max_packet_version;
if (!dest->unicast){
// make sure we only add broadcast interfaces once
int id = dest->interface - overlay_interfaces;
if (added_interface[id]){
continue;
}
}
frame->destinations[frame->destination_count++].destination=add_destination_ref(dest);
}
neighbour = neighbour->_next;
}
return packet_version;
return 0;
}
// do we need to forward any broadcast packets transmitted by this neighbour?
@ -815,21 +895,26 @@ int link_received_duplicate(struct subscriber *subscriber, struct overlay_interf
return 0;
}
// track stats for receiving packets from this neighbour
int link_received_packet(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int sender_seq, int unicast)
// remote peer has confirmed hearing a recent unicast packet
int link_unicast_ack(struct subscriber *subscriber, struct overlay_interface *interface, struct sockaddr_in addr)
{
// TODO better handling of unicast routes
if (unicast){
if (config.debug.verbose && config.debug.linkstate)
DEBUG("LINK STATE; Ignoring unicast packet");
return 0;
}
// TODO find / create network destination, keep it alive
return 0;
}
struct neighbour *neighbour = get_neighbour(subscriber, 1);
struct neighbour_link *link=get_neighbour_link(neighbour, interface, sender_interface, unicast);
// track stats for receiving packets from this neighbour
int link_received_packet(struct decode_context *context, int sender_seq, int unicast)
{
if (unicast || !context->sender)
return 0;
struct neighbour *neighbour = get_neighbour(context->sender, 1);
struct link_in *link=get_neighbour_link(neighbour, context->interface, context->sender_interface, unicast);
time_ms_t now = gettime_ms();
neighbour->ack_counter --;
// TODO track unicast link based on context->addr
link->ack_counter --;
// for now we'll use a simple time based link up/down flag + dropped packet count
if (sender_seq >=0){
@ -838,7 +923,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
if (offset < 64){
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; late seq %d from %s on %s",
sender_seq, alloca_tohex_sid(subscriber->sid), interface->name);
sender_seq, alloca_tohex_sid(context->sender->sid), context->interface->name);
link->ack_mask |= (1<<offset);
}else{
link->ack_mask = (link->ack_mask << 1) | 1;
@ -849,15 +934,15 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
// missed a packet? send a link state soon
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; missed seq %d from %s on %s",
link->ack_sequence, alloca_tohex_sid(subscriber->sid), interface->name);
link->ack_sequence, alloca_tohex_sid(context->sender->sid), context->interface->name);
link->ack_mask = link->ack_mask << 1;
neighbour->ack_counter --;
link->ack_counter --;
// if we need to nack promptly
if (neighbour->using_us){
if (neighbour->using_us && link==neighbour->best_link){
neighbour->next_neighbour_update = now + 10;
if (neighbour->ack_counter <=0){
if (link->ack_counter <=0){
neighbour_find_best_link(neighbour);
send_neighbour_link(neighbour);
}
@ -873,10 +958,10 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
if (neighbour->next_neighbour_update > now + 10);
neighbour->next_neighbour_update = now + 10;
}
link->link_timeout = now + (interface->destination.tick_ms *5);
link->link_timeout = now + (context->interface->destination->tick_ms *5);
// force an update soon when we need to promptly ack packets
if (neighbour->using_us > now && neighbour->ack_counter <=0){
if (neighbour->using_us > now && link == neighbour->best_link && link->ack_counter <=0){
neighbour_find_best_link(neighbour);
send_neighbour_link(neighbour);
}
@ -886,7 +971,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
}
// parse incoming link details
int link_receive(overlay_mdp_frame *mdp)
int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
{
IN();
struct overlay_buffer *payload = ob_static(mdp->out.payload, mdp->out.payload_length);
@ -897,6 +982,7 @@ int link_receive(overlay_mdp_frame *mdp)
struct decode_context context;
bzero(&context, sizeof(context));
context.interface = frame->interface;
time_ms_t now = gettime_ms();
char changed = 0;
@ -1002,24 +1088,32 @@ int link_receive(overlay_mdp_frame *mdp)
continue;
if (transmitter == my_subscriber && receiver == sender && interface_id != -1){
// TODO get matching neighbour link and combine scores
// they can hear us? we can route through them!
if (flags&FLAG_UNICAST){
if (!link->destination){
// TODO create unicast destination!
}
//link->destination = out->destination;
}else{
set_destination_ref(&link->destination, interface->destination);
}
version = link->link_version;
if (neighbour->neighbour_link_timeout < now || version<0){
if (neighbour->link_in_timeout < now || version<0){
changed = 1;
version++;
}
neighbour->neighbour_link_timeout = now + interface->destination.tick_ms * 5;
neighbour->link_in_timeout = now + interface->destination->tick_ms * 5;
if (drop_rate != link->drop_rate || transmitter != link->transmitter)
version++;
// process acks / nacks
if (ack_seq!=-1){
overlay_queue_ack(sender, interface, ack_mask, ack_seq);
// TODO unicast
overlay_queue_ack(sender, interface->destination, ack_mask, ack_seq);
// did they miss our last ack?
if (neighbour->last_update_seq!=-1){
@ -1037,13 +1131,14 @@ int link_receive(overlay_mdp_frame *mdp)
}
link->last_ack_seq = ack_seq;
}else{
set_destination_ref(&link->destination, NULL);
}
if (link->transmitter != transmitter || link->link_version != version){
changed = 1;
link->transmitter = transmitter;
link->link_version = version & 0xFF;
link->interface = interface;
link->drop_rate = drop_rate;
// TODO other link attributes...
}
@ -1093,6 +1188,7 @@ int link_state_legacy_ack(struct overlay_frame *frame, time_ms_t now)
ob_get_ui32(frame->payload);
ob_get_ui32(frame->payload);
int iface=ob_get(frame->payload);
overlay_interface *interface = &overlay_interfaces[iface];
// record that we have a possible link to this neighbour
struct neighbour *neighbour = get_neighbour(frame->source, 1);
@ -1104,24 +1200,24 @@ int link_state_legacy_ack(struct overlay_frame *frame, time_ms_t now)
if (config.debug.linkstate)
DEBUGF("LINK STATE; new legacy neighbour %s", alloca_tohex_sid(frame->source->sid));
}
if (neighbour->neighbour_link_timeout < now)
if (neighbour->link_in_timeout < now)
changed = 1;
if (link->transmitter != my_subscriber)
changed = 1;
link->transmitter = my_subscriber;
link->link_version = 1;
link->interface = &overlay_interfaces[iface];
link->destination = interface->destination;
// give this link a high cost, we aren't going to route through it anyway...
link->drop_rate = 32;
// track the incoming link so we remember to send broadcasts
struct neighbour_link *nl = get_neighbour_link(neighbour, frame->interface, iface, 0);
nl->link_timeout = now + (frame->interface->destination.tick_ms *5);
struct link_in *nl = get_neighbour_link(neighbour, frame->interface, iface, 0);
nl->link_timeout = now + (link->destination->tick_ms *5);
neighbour->legacy_protocol = 1;
neighbour->neighbour_link_timeout = now + link->interface->destination.tick_ms * 5;
neighbour->link_in_timeout = now + link->destination->tick_ms * 5;
if (changed){
route_version++;

View File

@ -383,8 +383,34 @@ struct slip_decode_state{
struct overlay_interface;
// where should packets be sent to?
struct network_destination {
int _ref_count;
// which interface are we actually sending packets out of
struct overlay_interface *interface;
// The IPv4 destination address, this may be the interface broadcast address.
struct sockaddr_in address;
// should outgoing packets be marked as unicast?
char unicast;
char packet_version;
// should we aggregate packets, or send one at a time
char encapsulation;
// time last packet was sent
time_ms_t last_tx;
// sequence number of last packet sent to this destination.
// Used to allow NACKs that can request retransmission of recent packets.
int sequence_number;
// rate limit for outgoing packets
struct limit_state transfer_limit;
/* Number of milli-seconds per tick for this interface, which is basically related to the
the typical TX range divided by the maximum expected speed of nodes in the network.
This means that short-range communications has a higher bandwidth requirement than
@ -397,29 +423,14 @@ struct network_destination {
These figures will be refined over time, and we will allow people to set them per-interface.
*/
unsigned tick_ms; /* milliseconds per tick */
// do we aggregate packets, or send one at a time
int encapsulation;
// time last packet was sent
time_ms_t last_tx;
// sequence number of last packet sent to this destination.
// Used to allow NACKs that can request retransmission of recent packets.
int sequence_number;
// rate limit for outgoing packets
struct limit_state transfer_limit;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX.
For radio links the actual maximum and the maximum that is likely to be delivered reliably are
potentially two quite different values. */
int mtu;
// The IPv4 destination address, for an interface this will be the broadcast address.
struct sockaddr_in address;
};
struct network_destination * new_destination(struct overlay_interface *interface, char encapsulation);
struct network_destination * create_unicast_destination(struct sockaddr_in addr, struct overlay_interface *interface);
struct network_destination * add_destination_ref(struct network_destination *ref);
void release_destination_ref(struct network_destination *ref);
void set_destination_ref(struct network_destination **ptr, struct network_destination *ref);
typedef struct overlay_interface {
struct sched_ent alarm;
@ -439,21 +450,25 @@ typedef struct overlay_interface {
int socket_type;
char send_broadcasts;
char prefer_unicast;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX.
For radio links the actual maximum and the maximum that is likely to be delivered reliably are
potentially two quite different values. */
int mtu;
// can we use this interface for routes to addresses in other subnets?
int default_route;
// should we log more debug info on this interace? eg hex dumps of packets
char debug;
char local_echo;
// can we assume there will only be two devices on this interface?
char point_to_point;
struct subscriber *other_device;
unsigned int uartbps; // set serial port speed (which might be different from link speed)
int ctsrts; // enabled hardware flow control if non-zero
struct network_destination destination;
struct network_destination *destination;
// can we assume that we will only receive packets from one device?
char point_to_point;
struct subscriber *other_device;
// the actual address of the interface.
struct sockaddr_in address;
struct in_addr netmask;
@ -524,11 +539,10 @@ int overlay_frame_resolve_addresses(struct overlay_frame *f);
time_ms_t overlay_time_until_next_tick();
int overlay_frame_append_payload(struct decode_context *context, overlay_interface *interface,
int overlay_frame_append_payload(struct decode_context *context, int encapsulation,
struct overlay_frame *p, struct overlay_buffer *b);
int overlay_packet_init_header(int packet_version, int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, int seq);
int overlay_interface_args(const char *arg);
void overlay_rhizome_advertise(struct sched_ent *alarm);
@ -547,10 +561,10 @@ int overlayServerMode();
int overlay_payload_enqueue(struct overlay_frame *p);
int overlay_queue_remaining(int queue);
int overlay_queue_schedule_next(time_ms_t next_allowed_packet);
int overlay_send_tick_packet(struct overlay_interface *interface);
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq);
int overlay_send_tick_packet(struct network_destination *destination);
int overlay_queue_ack(struct subscriber *neighbour, struct network_destination *destination, uint32_t ack_mask, int ack_seq);
int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, time_ms_t now);
int overlay_rhizome_saw_advertisements(int i, struct decode_context *context, struct overlay_frame *f, time_ms_t now);
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int rhizome_saw_voice_traffic();
int overlay_saw_mdp_containing_frame(struct overlay_frame *f, time_ms_t now);
@ -663,10 +677,8 @@ overlay_interface * overlay_interface_get_default();
overlay_interface * overlay_interface_find(struct in_addr addr, int return_default);
overlay_interface * overlay_interface_find_name(const char *name);
int overlay_interface_compare(overlay_interface *one, overlay_interface *two);
int
overlay_broadcast_ensemble(overlay_interface *interface,
struct sockaddr_in *recipientaddr,
unsigned char *bytes,int len);
int overlay_broadcast_ensemble(struct network_destination *destination,
unsigned char *bytes,int len);
int directory_registration();
int directory_service_init();
@ -764,7 +776,7 @@ void server_config_reload(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm);
void overlay_mdp_poll(struct sched_ent *alarm);
int overlay_mdp_try_interal_services(struct overlay_frame *frame, overlay_mdp_frame *mdp);
int overlay_send_probe(struct subscriber *peer, struct sockaddr_in addr, overlay_interface *interface, int queue);
int overlay_send_probe(struct subscriber *peer, struct network_destination *destination, int queue);
int overlay_send_stun_request(struct subscriber *server, struct subscriber *request);
void fd_periodicstats(struct sched_ent *alarm);
void rhizome_check_connections(struct sched_ent *alarm);
@ -780,7 +792,7 @@ void rhizome_server_poll(struct sched_ent *alarm);
int overlay_mdp_service_stun_req(overlay_mdp_frame *mdp);
int overlay_mdp_service_stun(overlay_mdp_frame *mdp);
int overlay_mdp_service_probe(overlay_mdp_frame *mdp);
int overlay_mdp_service_probe(struct overlay_frame *frame, overlay_mdp_frame *mdp);
time_ms_t limit_next_allowed(struct limit_state *state);
int limit_is_allowed(struct limit_state *state);
@ -832,15 +844,16 @@ extern char crash_handler_clue[1024];
int link_received_duplicate(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int previous_seq, int unicast);
int link_received_packet(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int sender_seq, int unicode);
int link_receive(overlay_mdp_frame *mdp);
int link_received_packet(struct decode_context *context, int sender_seq, int unicast);
int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp);
void link_explained(struct subscriber *subscriber);
void link_interface_down(struct overlay_interface *interface);
int link_state_announce_links();
int link_state_legacy_ack(struct overlay_frame *frame, time_ms_t now);
int link_state_interface_oldest_neighbour(struct overlay_interface *interface);
int link_state_ack_soon(struct subscriber *sender);
int link_state_should_forward_broadcast(struct subscriber *transmitter);
int link_unicast_ack(struct subscriber *subscriber, struct overlay_interface *interface, struct sockaddr_in addr);
int link_add_broadcast_destinations(struct overlay_frame *frame);
int generate_nonce(unsigned char *nonce,int bytes);

View File

@ -46,17 +46,17 @@ has_link() {
set_instance $1
executeOk_servald route print
tfw_log "Looking for link from $1 to $2 ($4)"
if ! grep "^${4}:\(BROADCAST \|UNICAST \)\{1,\}:dummy.*:0*\$" $_tfw_tmp/stdout; then
if ! grep "^${4}:\(BROADCAST\|UNICAST\):dummy.*:0*\$" $_tfw_tmp/stdout; then
tfw_log "Link not found"
# tfw_log "^${4}:\(BROADCAST \|UNICAST \)\{1,\}:dummy.*:0*\$"
# tfw_log "^${4}:\(BROADCAST\|UNICAST\):dummy.*:0*\$"
# tfw_cat --stdout --stderr
return 1
fi
[ $4 = $5 ] && return 0;
tfw_log "Path from $1 to $3 should be via $2 ($5, $4)"
if ! grep "^${5}:INDIRECT ::${4}\$" $_tfw_tmp/stdout; then
if ! grep "^${5}:INDIRECT::${4}\$" $_tfw_tmp/stdout; then
tfw_log "No path found"
# tfw_log "^${5}:INDIRECT ::${4}\$"
# tfw_log "^${5}:INDIRECT::${4}\$"
# tfw_cat --stdout --stderr
return 1
fi
@ -117,8 +117,8 @@ setup_single_link() {
foreach_instance +A +B start_routing_instance
}
test_single_link() {
wait_until path_exists +A +B
wait_until path_exists +B +A
wait_until --timeout=10 path_exists +A +B
wait_until --timeout=5 path_exists +B +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
@ -141,14 +141,14 @@ test_multiple_ids() {
executeOk_servald mdp ping --timeout=3 $SIDB2 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB1:BROADCAST UNICAST :dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDB2:INDIRECT ::$SIDB1"
assertStdoutGrep --matches=1 "^$SIDB1:BROADCAST:dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDB2:INDIRECT::$SIDB1"
set_instance +B
executeOk_servald mdp ping --timeout=3 $SIDA2 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA1:BROADCAST UNICAST :dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDA2:INDIRECT ::$SIDA1"
assertStdoutGrep --matches=1 "^$SIDA1:BROADCAST:dummy.*:0*"
assertStdoutGrep --matches=1 "^$SIDA2:INDIRECT::$SIDA1"
}
doc_single_mdp="Use single MDP per packet encapsulation"
@ -259,7 +259,7 @@ test_scan() {
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:UNICAST :"
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
}
scan_completed() {
@ -304,7 +304,7 @@ test_broadcast_only() {
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST :"
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST:"
}
doc_prefer_unicast="Prefer unicast packets"
@ -326,8 +326,7 @@ test_prefer_unicast() {
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:BROADCAST UNICAST :"
assertGrep "$instance_servald_log" 'Choosing to send via unicast'
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
}
doc_multihop_linear="Start 4 instances in a linear arrangement"
@ -372,7 +371,7 @@ instance_offline() {
for ((N=1; 1; ++N)); do
local sidvar=SID${I#+}$N
[ -n "${!sidvar}" ] || break
if ! grep "LINK STATE.*${!sidvar}.*UNREACHABLE" $instance_servald_log; then
if ! grep "NOT REACHABLE sid=${!sidvar}" $instance_servald_log; then
return 1
fi
done
@ -431,7 +430,7 @@ setup_multi_interface() {
multi_has_link() {
executeOk_servald route print
grep "^$1:BROADCAST UNICAST :dummyeth" $_tfw_tmp/stdout || return 1
grep "^$1:BROADCAST:dummyeth" $_tfw_tmp/stdout || return 1
return 0
}
@ -603,7 +602,7 @@ test_circle() {
executeOk_servald mdp ping --timeout=3 $SIDC 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT :"
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT:"
stop_servald_server +B
foreach_instance +A +C \
wait_until --timeout=10 instance_offline +B
@ -613,7 +612,7 @@ test_circle() {
executeOk_servald mdp ping --timeout=3 $SIDC 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT :"
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT:"
}
setup_crowded_mess() {
@ -639,7 +638,7 @@ test_crowded_mess() {
executeOk_servald mdp ping --timeout=3 $SIDH 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDH:INDIRECT :"
assertStdoutGrep --matches=1 "^$SIDH:INDIRECT:"
}
runTests "$@"