Detect unicast links

This commit is contained in:
Jeremy Lakeman 2013-08-09 12:18:14 +09:30
parent 81afc42d8b
commit 55657623aa
5 changed files with 164 additions and 39 deletions

View File

@ -308,8 +308,10 @@ int parseEnvelopeHeader(struct decode_context *context, struct overlay_interface
context->point_to_point_device = context->interface->other_device = context->sender;
}
DEBUGF("Received packet seq %d from %s on %s",
sender_seq, alloca_tohex_sid(context->sender->sid), interface->name);
if (config.debug.overlayframes)
DEBUGF("Received %s packet seq %d from %s on %s",
packet_flags & PACKET_UNICAST?"unicast":"broadcast",
sender_seq, alloca_tohex_sid(context->sender->sid), interface->name);
}
link_received_packet(context, sender_seq, packet_flags & PACKET_UNICAST);
@ -426,7 +428,8 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
if (payload_len > ob_remaining(b)){
unsigned char *current = ob_ptr(b)+ob_position(b);
dump("Payload Header", header_start, current - header_start);
if (config.debug.overlayframes)
dump("Payload Header", header_start, current - header_start);
ret = WHYF("Invalid payload length (%d)", payload_len);
goto end;
}

View File

@ -246,7 +246,10 @@ overlay_init_packet(struct outgoing_packet *packet, int packet_version,
packet->packet_version = packet_version;
packet->context.packet_version = packet_version;
packet->destination = destination;
packet->seq = destination->sequence_number = (destination->sequence_number + 1) & 0xFFFF;
if (destination->sequence_number<0)
packet->seq=-1;
else
packet->seq = destination->sequence_number = (destination->sequence_number + 1) & 0xFFFF;
ob_limitsize(packet->buffer, destination->interface->mtu);
@ -549,7 +552,6 @@ static void overlay_send_packet(struct sched_ent *alarm){
int overlay_send_tick_packet(struct network_destination *destination){
struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet));
packet.seq=-1;
overlay_init_packet(&packet, 0, destination);
overlay_fill_send_packet(&packet, gettime_ms());

View File

@ -79,6 +79,12 @@ struct link_in{
int ack_counter;
};
struct link_out{
struct link_out *_next;
time_ms_t timeout;
struct network_destination *destination;
};
struct neighbour{
struct neighbour *_next;
@ -116,6 +122,9 @@ struct neighbour{
// list of incoming link stats
struct link_in *links, *best_link;
// list of outgoing links
struct link_out *out_links;
};
// one struct per subscriber, where we track all routing information, allocated on first use
@ -180,6 +189,7 @@ struct network_destination * create_unicast_destination(struct sockaddr_in addr,
ret->address = addr;
ret->unicast = 1;
ret->tick_ms = interface->destination->tick_ms;
ret->sequence_number = -1;
}
return ret;
}
@ -200,14 +210,15 @@ void release_destination_ref(struct network_destination *ref){
}
}
void set_destination_ref(struct network_destination **ptr, struct network_destination *ref){
int set_destination_ref(struct network_destination **ptr, struct network_destination *ref){
if (ref==*ptr)
return;
return 0;
if (ref)
add_destination_ref(ref);
if (*ptr)
release_destination_ref(*ptr);
*ptr = ref;
return 1;
}
static int NumberOfSetBits(uint32_t i)
@ -343,6 +354,7 @@ static void update_path_score(struct neighbour *neighbour, struct link *link){
link->calculating = 0;
}
// pick the best path to this network destination
static struct link * find_best_link(struct subscriber *subscriber)
{
IN();
@ -546,6 +558,14 @@ static void free_neighbour(struct neighbour **neighbour_ptr){
free(l);
}
struct link_out *out = n->out_links;
while (out){
struct link_out *l=out;
out = l->_next;
release_destination_ref(out->destination);
free(l);
}
free_links(n->root);
n->root=NULL;
*neighbour_ptr = n->_next;
@ -557,6 +577,7 @@ static void clean_neighbours(time_ms_t now)
struct neighbour **n_ptr = &neighbours;
while (*n_ptr){
struct neighbour *n = *n_ptr;
struct link_in **list = &n->links;
while(*list){
struct link_in *link = *list;
@ -571,11 +592,27 @@ static void clean_neighbours(time_ms_t now)
list = &link->_next;
}
}
struct link_out **out = &n->out_links;
while(*out){
struct link_out *link = *out;
if (link->destination->interface->state!=INTERFACE_STATE_UP || link->timeout < now){
*out = link->_next;
release_destination_ref(link->destination);
free(link);
}else{
out = &link->_next;
}
}
// when all links to a neighbour that we are routing through expire, force a routing calculation update
struct link_state *state = get_link_state(n->subscriber);
if (state->next_hop == n->subscriber && (n->link_in_timeout < now || !n->links) && state->route_version == route_version)
if (state->next_hop == n->subscriber &&
(n->link_in_timeout < now || !n->links || !n->out_links) &&
state->route_version == route_version)
route_version++;
if (!n->links){
if (!n->links || !n->out_links){
free_neighbour(n_ptr);
}else{
n_ptr = &n->_next;
@ -626,10 +663,17 @@ static int neighbour_find_best_link(struct neighbour *n)
if (n->best_link != best_link){
n->best_link = best_link;
n->next_neighbour_update = gettime_ms()+5;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; best link from neighbour %s is now on interface %s",
alloca_tohex_sid(n->subscriber->sid),
best_link?best_link->interface->name:"NONE");
if (config.debug.linkstate && config.debug.verbose){
if (best_link){
DEBUGF("LINK STATE; best link from neighbour %s is %s on interface %s",
alloca_tohex_sid(n->subscriber->sid),
best_link->unicast?"unicast":"broadcast",
best_link->interface->name);
}else{
DEBUGF("LINK STATE; no best link from neighbour %s",
alloca_tohex_sid(n->subscriber->sid));
}
}
}
return 0;
@ -671,13 +715,22 @@ static int send_neighbour_link(struct neighbour *n)
if (n->subscriber->reachable & REACHABLE_INDIRECT){
frame->destination = n->subscriber;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Sending link state ack indirectly");
}else if (n->subscriber->reachable & REACHABLE){
// we don't wont to address the next hop to this neighbour as that will force another reply ack
// but we still want them to receive it.
frame->destinations[frame->destination_count++].destination = add_destination_ref(n->subscriber->destination);
}else
// TODO ack over unicast links too
frame->destinations[frame->destination_count++].destination=add_destination_ref(n->best_link->interface->destination);
}else{
// no routing decision yet? send this packet to all probable destinations.
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Sending link state ack to all possibilities");
struct link_out *out = n->out_links;
while(out){
frame->destinations[frame->destination_count++].destination = add_destination_ref(out->destination);
out = out->_next;
}
}
ob_limitsize(frame->payload, 400);
overlay_mdp_encode_ports(frame->payload, MDP_PORT_LINKSTATE, MDP_PORT_LINKSTATE);
@ -722,6 +775,17 @@ static int link_send_neighbours()
if (n->next_neighbour_update < link_send_alarm.alarm)
link_send_alarm.alarm = n->next_neighbour_update;
struct link_out *out = n->out_links;
while(out){
if (out->destination->unicast){
if (out->destination->last_tx + out->destination->tick_ms < now)
overlay_send_tick_packet(out->destination);
if (out->destination->last_tx + out->destination->tick_ms < link_send_alarm.alarm)
link_send_alarm.alarm = out->destination->last_tx + out->destination->tick_ms;
}
out=out->_next;
}
n = n->_next;
}
return 0;
@ -798,7 +862,8 @@ struct link_in * get_neighbour_link(struct neighbour *neighbour, struct overlay_
link->ack_mask = 0;
link->_next = neighbour->links;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; new possible link from neighbour %s on interface %s/%d",
DEBUGF("LINK STATE; new possible %s link from neighbour %s on interface %s/%d",
unicast?"unicast":"broadcast",
alloca_tohex_sid(neighbour->subscriber->sid),
interface->name,
sender_interface);
@ -902,18 +967,59 @@ int link_unicast_ack(struct subscriber *subscriber, struct overlay_interface *in
return 0;
}
static struct link_out *create_out_link(struct neighbour *neighbour, overlay_interface *interface, struct sockaddr_in addr, char unicast){
struct link_out *ret=emalloc_zero(sizeof(struct link_out));
if (ret){
ret->_next=neighbour->out_links;
neighbour->out_links=ret;
if (unicast)
ret->destination = create_unicast_destination(addr, interface);
else
ret->destination = add_destination_ref(interface->destination);
ret->timeout = gettime_ms()+ret->destination->tick_ms*2;
}
return ret;
}
static struct link_out *find_out_link(struct neighbour *neighbour, overlay_interface *interface, char unicast){
struct link_out *ret = neighbour->out_links;
while(ret){
if (ret->destination->interface==interface
&& ret->destination->unicast==unicast)
return ret;
ret=ret->_next;
}
return NULL;
}
// track stats for receiving packets from this neighbour
int link_received_packet(struct decode_context *context, int sender_seq, int unicast)
int link_received_packet(struct decode_context *context, int sender_seq, char unicast)
{
if (unicast || !context->sender)
if (!context->sender)
return 0;
struct neighbour *neighbour = get_neighbour(context->sender, 1);
// get stats about incoming packets
struct link_in *link=get_neighbour_link(neighbour, context->interface, context->sender_interface, unicast);
time_ms_t now = gettime_ms();
// TODO track unicast link based on context->addr
if (!neighbour->out_links){
// if this packet arrived in an IPv4 packet, assume we need to send them unicast packets
if (context->addr.sin_family==AF_INET && context->addr.sin_port!=0 && context->addr.sin_addr.s_addr!=0){
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Assuming reachable via unicast");
create_out_link(neighbour, context->interface, context->addr, 1);
}
// if this packet arrived from the same IPv4 subnet, or a different type of network, assume they can hear our broadcasts
if (context->addr.sin_family!=AF_INET ||
(context->addr.sin_addr.s_addr & context->interface->netmask.s_addr)
== (context->interface->address.sin_addr.s_addr& context->interface->netmask.s_addr)){
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("Assuming reachable via broadcast");
create_out_link(neighbour, context->interface, context->addr, 0);
}
}
link->ack_counter --;
// for now we'll use a simple time based link up/down flag + dropped packet count
@ -1045,7 +1151,8 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
continue;
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; record - %s, %s, %d, %d, %x, %d",
DEBUGF("LINK STATE; record - %d, %s, %s, %d, %d, %x, %d",
flags,
receiver?alloca_tohex_sid(receiver->sid):"NULL",
transmitter?alloca_tohex_sid(transmitter->sid):"NULL",
interface_id,
@ -1062,9 +1169,10 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
continue;
}
struct network_destination *destination=NULL;
if (receiver == sender){
// ignore other incoming links to our neighbour
// TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding?
if (transmitter!=my_subscriber || interface_id==-1)
continue;
@ -1073,6 +1181,15 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
if (interface->state != INTERFACE_STATE_UP)
continue;
struct link_out *out = find_out_link(neighbour, interface, flags&FLAG_UNICAST?1:0);
if (!out)
continue;
// start sending sequence numbers when our neighbour has acked a packet
if (out->destination->sequence_number<0)
out->destination->sequence_number=0;
out->timeout=now + out->destination->tick_ms * 5;
destination = out->destination;
}else if(transmitter == my_subscriber){
// if our neighbour starts using us to reach this receiver, we have to treat the link in our routing table as if it just died.
transmitter = NULL;
@ -1087,26 +1204,24 @@ int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp)
if (!link)
continue;
if (transmitter == my_subscriber && receiver == sender && interface_id != -1){
if (transmitter == my_subscriber && receiver == sender && interface_id != -1 && destination){
// they can hear us? we can route through them!
if (flags&FLAG_UNICAST){
if (!link->destination){
// TODO create unicast destination!
}
//link->destination = out->destination;
}else{
set_destination_ref(&link->destination, interface->destination);
}
version = link->link_version;
// which network destination can they hear us from?
if (set_destination_ref(&link->destination, destination)){
changed = 1;
version++;
}
if (neighbour->link_in_timeout < now || version<0){
changed = 1;
version++;
}
neighbour->link_in_timeout = now + interface->destination->tick_ms * 5;
if (drop_rate != link->drop_rate || transmitter != link->transmitter)
version++;

View File

@ -429,7 +429,7 @@ struct network_destination * new_destination(struct overlay_interface *interface
struct network_destination * create_unicast_destination(struct sockaddr_in addr, struct overlay_interface *interface);
struct network_destination * add_destination_ref(struct network_destination *ref);
void release_destination_ref(struct network_destination *ref);
void set_destination_ref(struct network_destination **ptr, struct network_destination *ref);
int set_destination_ref(struct network_destination **ptr, struct network_destination *ref);
typedef struct overlay_interface {
struct sched_ent alarm;
@ -844,7 +844,7 @@ extern char crash_handler_clue[1024];
int link_received_duplicate(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int previous_seq, int unicast);
int link_received_packet(struct decode_context *context, int sender_seq, int unicast);
int link_received_packet(struct decode_context *context, int sender_seq, char unicast);
int link_receive(struct overlay_frame *frame, overlay_mdp_frame *mdp);
void link_explained(struct subscriber *subscriber);
void link_interface_down(struct overlay_interface *interface);

View File

@ -237,7 +237,7 @@ test_multiple_nodes() {
tfw_cat --stdout --stderr
}
doc_scan="Simulate isolated clients"
doc_scan="Network scan with isolated clients"
setup_scan() {
setup_servald
assert_no_servald_processes
@ -280,11 +280,16 @@ setup_single_filter() {
foreach_instance +A +B start_routing_instance
}
test_single_filter() {
wait_until path_exists +A +B
wait_until path_exists +B +A
wait_until --timeout=10 path_exists +A +B
wait_until --timeout=5 path_exists +B +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
set_instance +B
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA:BROADCAST:"
}
doc_broadcast_only="Broadcast packets only"