Report packet acks, make routing decisions based on dropped packets

This commit is contained in:
Jeremy Lakeman 2013-05-07 13:29:42 +09:30
parent 7fc5c0d6c0
commit c0b31a2774
5 changed files with 189 additions and 44 deletions

View File

@ -434,7 +434,7 @@ STRING(256, file, "", str_nonempty,, "Path of interfa
ATOM(struct in_addr, dummy_address, hton_in_addr(INADDR_LOOPBACK), in_addr,, "Dummy interface address") ATOM(struct in_addr, dummy_address, hton_in_addr(INADDR_LOOPBACK), in_addr,, "Dummy interface address")
ATOM(struct in_addr, dummy_netmask, hton_in_addr(0xFFFFFF00), in_addr,, "Dummy interface netmask") ATOM(struct in_addr, dummy_netmask, hton_in_addr(0xFFFFFF00), in_addr,, "Dummy interface netmask")
ATOM(uint16_t, port, PORT_DNA, uint16_nonzero,, "Port number for network interface") ATOM(uint16_t, port, PORT_DNA, uint16_nonzero,, "Port number for network interface")
ATOM(bool_t, drop_broadcasts, 0, boolean,, "If true, drop all incoming broadcast packets") ATOM(uint16_t, drop_broadcasts, 0, uint16_nonzero,, "Percentage of incoming broadcast packets that should be dropped for testing purposes")
ATOM(bool_t, drop_unicasts, 0, boolean,, "If true, drop all incoming unicast packets") ATOM(bool_t, drop_unicasts, 0, boolean,, "If true, drop all incoming unicast packets")
ATOM(short, type, OVERLAY_INTERFACE_WIFI, interface_type,, "Type of network interface") ATOM(short, type, OVERLAY_INTERFACE_WIFI, interface_type,, "Type of network interface")
ATOM(int32_t, packet_interval, -1, int32_nonneg,, "Minimum interval between packets in microseconds") ATOM(int32_t, packet_interval, -1, int32_nonneg,, "Minimum interval between packets in microseconds")

View File

@ -593,6 +593,21 @@ struct file_packet{
unsigned char payload[1400]; unsigned char payload[1400];
}; };
static int should_drop(struct overlay_interface *interface, struct sockaddr_in addr){
if (memcmp(&addr, &interface->address, sizeof(addr))==0){
return interface->drop_unicasts;
}
if (memcmp(&addr, &interface->broadcast_address, sizeof(addr))==0){
if (interface->drop_broadcasts == 0)
return 0;
if (interface->drop_broadcasts >= 100)
return 1;
if (rand()%100 >= interface->drop_broadcasts)
return 0;
}
return 1;
}
static void interface_read_file(struct overlay_interface *interface) static void interface_read_file(struct overlay_interface *interface)
{ {
IN(); IN();
@ -630,9 +645,7 @@ static void interface_read_file(struct overlay_interface *interface)
if (config.debug.packetrx) if (config.debug.packetrx)
DEBUG_packet_visualise("Read from dummy interface", packet.payload, packet.payload_length); DEBUG_packet_visualise("Read from dummy interface", packet.payload, packet.payload_length);
if (((!interface->drop_unicasts) && memcmp(&packet.dst_addr, &interface->address, sizeof(packet.dst_addr))==0) || if (!should_drop(interface, packet.dst_addr)){
((!interface->drop_broadcasts) &&
memcmp(&packet.dst_addr, &interface->broadcast_address, sizeof(packet.dst_addr))==0)){
if (packetOkOverlay(interface, packet.payload, packet.payload_length, -1, if (packetOkOverlay(interface, packet.payload, packet.payload_length, -1,
(struct sockaddr*)&packet.src_addr, sizeof(packet.src_addr))<0) { (struct sockaddr*)&packet.src_addr, sizeof(packet.src_addr))<0) {

View File

@ -18,15 +18,16 @@ Link state routing;
*/ */
#define INCLUDE_ANYWAY (500) #define INCLUDE_ANYWAY (500)
#define LINK_EXPIRY (5000)
#define LINK_NEIGHBOUR_INTERVAL (1000)
#define LINK_INTERVAL (5000)
#define MAX_LINK_STATES 512 #define MAX_LINK_STATES 512
#define FLAG_HAS_INTERFACE (1<<0) #define FLAG_HAS_INTERFACE (1<<0)
#define FLAG_NO_PATH (1<<1) #define FLAG_NO_PATH (1<<1)
#define FLAG_BROADCAST (1<<2) #define FLAG_BROADCAST (1<<2)
#define FLAG_UNICAST (1<<3) #define FLAG_UNICAST (1<<3)
#define FLAG_HAS_ACK (1<<4)
#define FLAG_HAS_DROP_RATE (1<<5)
#define ACK_WINDOW (16)
struct link{ struct link{
struct link *_left; struct link *_left;
@ -42,9 +43,11 @@ struct link{
// link quality stats; // link quality stats;
char link_version; char link_version;
char drop_rate;
// calculated path score; // calculated path score;
int hop_count; int hop_count;
int path_drop_rate;
// loop prevention; // loop prevention;
char calculating; char calculating;
@ -61,8 +64,10 @@ struct neighbour_link{
// very simple time based link up/down detection; // very simple time based link up/down detection;
// when will we consider the link broken? // when will we consider the link broken?
time_ms_t link_timeout; time_ms_t link_timeout;
char unicast; char unicast;
int sequence; int ack_sequence;
uint32_t ack_mask;
}; };
struct neighbour{ struct neighbour{
@ -78,15 +83,13 @@ struct neighbour{
// next link update // next link update
time_ms_t next_neighbour_update; time_ms_t next_neighbour_update;
int ack_counter;
// un-balanced tree of known link states // un-balanced tree of known link states
struct link *root; struct link *root;
// list of incoming link stats // list of incoming link stats
struct neighbour_link *links, *best_link; struct neighbour_link *links, *best_link;
// whenever we pick a different link or the stats change in a way everyone needs to know ASAP, update the version
char version;
}; };
// one struct per subscriber, where we track all routing information, allocated on first use // one struct per subscriber, where we track all routing information, allocated on first use
@ -96,6 +99,9 @@ struct link_state{
struct subscriber *transmitter; struct subscriber *transmitter;
int hop_count; int hop_count;
int route_version; int route_version;
// if a neighbour is free'd this link will point to invalid memory.
// do not trust this pointer unless you have just called find_best_link
struct link *link;
char calculating; char calculating;
// when do we need to send a new link state message. // when do we need to send a new link state message.
@ -115,6 +121,13 @@ static struct sched_ent link_send_alarm={
struct neighbour *neighbours=NULL; struct neighbour *neighbours=NULL;
int route_version=0; int route_version=0;
static int NumberOfSetBits(uint32_t i)
{
i = i - ((i >> 1) & 0x55555555);
i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
return (((i + (i >> 4)) & 0x0F0F0F0F) * 0x01010101) >> 24;
}
static struct link_state *get_link_state(struct subscriber *subscriber) static struct link_state *get_link_state(struct subscriber *subscriber)
{ {
if (!subscriber->link_state){ if (!subscriber->link_state){
@ -198,20 +211,28 @@ static void update_path_score(struct neighbour *neighbour, struct link *link){
link->calculating = 1; link->calculating = 1;
int hop_count = -1; int hop_count = -1;
int drop_rate = 0;
if (link->transmitter == my_subscriber){ if (link->transmitter == my_subscriber){
if (link->receiver==neighbour->subscriber) if (link->receiver==neighbour->subscriber){
hop_count = 1; hop_count = 1;
}
}else{ }else{
struct link *parent = get_parent(neighbour, link); struct link *parent = get_parent(neighbour, link);
if (parent && (!parent->calculating)){ if (parent && (!parent->calculating)){
update_path_score(neighbour, parent); update_path_score(neighbour, parent);
// TODO more interesting path cost metrics... // TODO more interesting path cost metrics...
if (parent->hop_count>0) if (parent->hop_count>0){
hop_count = parent->hop_count+1; hop_count = parent->hop_count+1;
drop_rate = parent->path_drop_rate;
}
} }
} }
// ignore occasional dropped packets due to collisions
if (link->drop_rate>2)
drop_rate += link->drop_rate;
if (config.debug.verbose && config.debug.linkstate && hop_count != link->hop_count) if (config.debug.verbose && config.debug.linkstate && hop_count != link->hop_count)
DEBUGF("LINK STATE; path score to %s via %s version %d = %d", DEBUGF("LINK STATE; path score to %s via %s version %d = %d",
alloca_tohex_sid(link->receiver->sid), alloca_tohex_sid(link->receiver->sid),
@ -221,6 +242,7 @@ static void update_path_score(struct neighbour *neighbour, struct link *link){
link->hop_count = hop_count; link->hop_count = hop_count;
link->path_version = neighbour->path_version; link->path_version = neighbour->path_version;
link->path_drop_rate = drop_rate;
link->calculating = 0; link->calculating = 0;
} }
@ -240,6 +262,8 @@ static int find_best_link(struct subscriber *subscriber)
struct neighbour *neighbour = neighbours; struct neighbour *neighbour = neighbours;
struct overlay_interface *interface = NULL; struct overlay_interface *interface = NULL;
int best_hop_count = 99; int best_hop_count = 99;
int best_drop_rate = 99;
struct link *best_link = NULL;
struct subscriber *next_hop = NULL, *transmitter=NULL; struct subscriber *next_hop = NULL, *transmitter=NULL;
time_ms_t now = gettime_ms(); time_ms_t now = gettime_ms();
@ -259,11 +283,17 @@ static int find_best_link(struct subscriber *subscriber)
} }
update_path_score(neighbour, link); update_path_score(neighbour, link);
if (link->hop_count>0 && link->hop_count < best_hop_count){
next_hop = neighbour->subscriber; if (link->hop_count>0){
best_hop_count = link->hop_count; if (link->path_drop_rate < best_drop_rate ||
transmitter = link->transmitter; (link->path_drop_rate == best_drop_rate && link->hop_count < best_hop_count)){
interface = link->interface; next_hop = neighbour->subscriber;
best_hop_count = link->hop_count;
best_drop_rate = link->path_drop_rate;
transmitter = link->transmitter;
interface = link->interface;
best_link = link;
}
} }
next: next:
@ -271,7 +301,7 @@ next:
} }
int changed =0; int changed =0;
if (state->next_hop != next_hop || state->transmitter != transmitter) if (state->next_hop != next_hop || state->transmitter != transmitter || state->link != best_link)
changed = 1; changed = 1;
if (next_hop == subscriber && (interface != subscriber->interface)) if (next_hop == subscriber && (interface != subscriber->interface))
changed = 1; changed = 1;
@ -281,6 +311,7 @@ next:
state->hop_count = best_hop_count; state->hop_count = best_hop_count;
state->route_version = route_version; state->route_version = route_version;
state->calculating = 0; state->calculating = 0;
state->link = best_link;
int reachable = subscriber->reachable; int reachable = subscriber->reachable;
if (next_hop == NULL){ if (next_hop == NULL){
@ -322,11 +353,19 @@ next:
return 0; return 0;
} }
static int append_link_state(struct overlay_buffer *payload, char flags, struct subscriber *transmitter, struct subscriber *receiver, int interface, int version){ static int append_link_state(struct overlay_buffer *payload, char flags,
struct subscriber *transmitter, struct subscriber *receiver,
int interface, int version, int ack_sequence, uint32_t ack_mask,
int drop_rate)
{
if (interface!=-1) if (interface!=-1)
flags|=FLAG_HAS_INTERFACE; flags|=FLAG_HAS_INTERFACE;
if (!transmitter) if (!transmitter)
flags|=FLAG_NO_PATH; flags|=FLAG_NO_PATH;
if (ack_sequence!=-1)
flags|=FLAG_HAS_ACK;
if (drop_rate!=-1)
flags|=FLAG_HAS_DROP_RATE;
int length_pos = ob_position(payload); int length_pos = ob_position(payload);
if (ob_append_byte(payload, 0)) if (ob_append_byte(payload, 0))
@ -349,6 +388,18 @@ static int append_link_state(struct overlay_buffer *payload, char flags, struct
if (ob_append_byte(payload, interface)) if (ob_append_byte(payload, interface))
return -1; return -1;
if (ack_sequence!=-1){
if (ob_append_byte(payload, ack_sequence))
return -1;
if (ob_append_ui32(payload, ack_mask))
return -1;
}
if (drop_rate!=-1)
if (ob_append_byte(payload, drop_rate))
return -1;
// TODO insert future fields here // TODO insert future fields here
@ -371,23 +422,25 @@ static int append_link(struct subscriber *subscriber, void *context)
time_ms_t now = gettime_ms(); time_ms_t now = gettime_ms();
find_best_link(subscriber); if (find_best_link(subscriber))
return 0;
if (state->next_update - INCLUDE_ANYWAY <= now){ if (state->next_update - INCLUDE_ANYWAY <= now){
if (subscriber->reachable==REACHABLE_SELF){ if (subscriber->reachable==REACHABLE_SELF){
// Other entries in our keyring are always one hop away from us. // Other entries in our keyring are always one hop away from us.
if (append_link_state(payload, 0, my_subscriber, subscriber, -1, 0)){ if (append_link_state(payload, 0, my_subscriber, subscriber, -1, 1, -1, 0, 0)){
link_send_alarm.alarm = now; link_send_alarm.alarm = now;
return 1; return 1;
} }
} else { } else {
struct link *link = state->link;
if (append_link_state(payload, 0, state->transmitter, subscriber, -1, 0)){ if (append_link_state(payload, 0, state->transmitter, subscriber, -1, link?link->link_version:-1, -1, 0, link?link->drop_rate:32)){
link_send_alarm.alarm = now; link_send_alarm.alarm = now;
return 1; return 1;
} }
} }
state->next_update = now + LINK_INTERVAL; // include information about this link every 5s
state->next_update = now + 5000;
} }
if (state->next_update < link_send_alarm.alarm) if (state->next_update < link_send_alarm.alarm)
@ -463,7 +516,6 @@ static int link_send_neighbours(struct overlay_buffer *payload)
if (n->best_link != best_link){ if (n->best_link != best_link){
n->best_link = best_link; n->best_link = best_link;
n->version ++;
n->next_neighbour_update = now; n->next_neighbour_update = now;
if (config.debug.linkstate && config.debug.verbose) if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; best link from neighbour %s is now on interface %s", DEBUGF("LINK STATE; best link from neighbour %s is now on interface %s",
@ -478,11 +530,12 @@ static int link_send_neighbours(struct overlay_buffer *payload)
flags|=FLAG_BROADCAST; flags|=FLAG_BROADCAST;
if (n->next_neighbour_update - INCLUDE_ANYWAY <= now){ if (n->next_neighbour_update - INCLUDE_ANYWAY <= now){
if (append_link_state(payload, flags, n->subscriber, my_subscriber, best_link->neighbour_interface, n->version)){ if (append_link_state(payload, flags, n->subscriber, my_subscriber, best_link->neighbour_interface, 1, best_link->ack_sequence, best_link->ack_mask, -1)){
link_send_alarm.alarm = now; link_send_alarm.alarm = now;
return 1; return 1;
} }
n->next_neighbour_update = now + best_link->interface->tick_ms; n->next_neighbour_update = now + best_link->interface->tick_ms;
n->ack_counter = ACK_WINDOW;
} }
if (n->next_neighbour_update < link_send_alarm.alarm) if (n->next_neighbour_update < link_send_alarm.alarm)
@ -496,7 +549,7 @@ static int link_send_neighbours(struct overlay_buffer *payload)
// send link details // send link details
static void link_send(struct sched_ent *alarm) static void link_send(struct sched_ent *alarm)
{ {
alarm->alarm=gettime_ms() + LINK_INTERVAL; alarm->alarm=gettime_ms() + 10000;
struct overlay_frame *frame=emalloc_zero(sizeof(struct overlay_frame)); struct overlay_frame *frame=emalloc_zero(sizeof(struct overlay_frame));
frame->type=OF_TYPE_DATA; frame->type=OF_TYPE_DATA;
@ -521,6 +574,7 @@ static void link_send(struct sched_ent *alarm)
else if (overlay_payload_enqueue(frame)) else if (overlay_payload_enqueue(frame))
op_free(frame); op_free(frame);
alarm->deadline = alarm->alarm;
schedule(alarm); schedule(alarm);
} }
@ -545,7 +599,8 @@ struct neighbour_link * get_neighbour_link(struct neighbour *neighbour, struct o
link->interface = interface; link->interface = interface;
link->neighbour_interface = sender_interface; link->neighbour_interface = sender_interface;
link->unicast = unicast; link->unicast = unicast;
link->sequence = -1; link->ack_sequence = -1;
link->ack_mask = 0;
link->_next = neighbour->links; link->_next = neighbour->links;
if (config.debug.linkstate && config.debug.verbose) if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; new possible link from neighbour %s on interface %s/%d", DEBUGF("LINK STATE; new possible link from neighbour %s on interface %s/%d",
@ -566,22 +621,39 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
struct neighbour *neighbour = get_neighbour(subscriber, 1); struct neighbour *neighbour = get_neighbour(subscriber, 1);
struct neighbour_link *link=get_neighbour_link(neighbour, interface, sender_interface, unicast); struct neighbour_link *link=get_neighbour_link(neighbour, interface, sender_interface, unicast);
time_ms_t now = gettime_ms(); time_ms_t now = gettime_ms();
time_ms_t next_update = neighbour->next_neighbour_update;
neighbour->ack_counter --;
// for now we'll use a simple time based link up/down flag // for now we'll use a simple time based link up/down flag
// force an update when we start hearing a new neighbour link
if (link->link_timeout < now){
neighbour->next_neighbour_update = now;
neighbour->version++;
update_alarm(now);
}
if (sender_seq >=0){ if (sender_seq >=0){
if (link->sequence !=-1 && sender_seq != ((link->sequence+1)&0xFF)){ if (link->ack_sequence != -1){
DEBUGF("LINK STATE %s; Sequence jumped from %d to %d", link->ack_mask = (link->ack_mask << 1) | 1;
interface->name,link->sequence, sender_seq); while(1){
} link->ack_sequence = (link->ack_sequence+1)&0xFF;
link->sequence = sender_seq; if (link->ack_sequence == sender_seq)
break;
// missed a packet? send a link state soon
link->ack_mask = link->ack_mask << 1;
next_update = now+100;
}
}else
link->ack_sequence = sender_seq;
} }
link->link_timeout = now + (interface->tick_ms *4);
// force an update soon when we need to ack packets
if (neighbour->ack_counter <=0)
next_update = now+10;
// force an update when we start hearing a new neighbour link
if (link->link_timeout < now)
next_update = now;
if (next_update < neighbour->next_neighbour_update){
neighbour->next_neighbour_update = next_update;
}
update_alarm(neighbour->next_neighbour_update);
link->link_timeout = now + (interface->tick_ms *5);
return 0; return 0;
} }
@ -630,6 +702,23 @@ int link_receive(overlay_mdp_frame *mdp)
continue; continue;
} }
int ack_seq = -1;
uint32_t ack_mask = 0;
int drop_rate = -1;
if (flags & FLAG_HAS_ACK){
ack_seq = ob_get(payload);
ack_mask = ob_get_ui32(payload);
drop_rate = 15 - NumberOfSetBits((ack_mask & 0x7FFF));
}
if (flags & FLAG_HAS_DROP_RATE){
drop_rate = ob_get(payload);
if (drop_rate <0)
break;
}
// jump to the position of the next record, even if there's more data we don't understand // jump to the position of the next record, even if there's more data we don't understand
payload->position = start_pos + length; payload->position = start_pos + length;
@ -646,6 +735,10 @@ int link_receive(overlay_mdp_frame *mdp)
// TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding? // TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding?
if (transmitter == my_subscriber && interface_id != -1){ if (transmitter == my_subscriber && interface_id != -1){
// TODO get matching neighbour link and combine scores
// TODO use ack_sequence && ack_mask to control (re)sending packets
// they can hear us? we can route through them! // they can hear us? we can route through them!
interface = &overlay_interfaces[interface_id]; interface = &overlay_interfaces[interface_id];
if (interface->state != INTERFACE_STATE_UP) if (interface->state != INTERFACE_STATE_UP)
@ -654,18 +747,28 @@ int link_receive(overlay_mdp_frame *mdp)
if (neighbour->neighbour_link_timeout < now) if (neighbour->neighbour_link_timeout < now)
changed = 1; changed = 1;
neighbour->neighbour_link_timeout = now + LINK_INTERVAL; neighbour->neighbour_link_timeout = now + interface->tick_ms * 5;
}else }else
continue; continue;
}else if(transmitter == my_subscriber) }else if(transmitter == my_subscriber)
transmitter = NULL; transmitter = NULL;
struct link *link = find_link(neighbour, receiver, transmitter?1:0); struct link *link = find_link(neighbour, receiver, transmitter?1:0);
if (link && transmitter == my_subscriber){
// TODO combine our link stats with theirs
version = link->link_version;
if (drop_rate!=link->drop_rate)
version++;
}
if (link && (link->transmitter != transmitter || link->link_version != version)){ if (link && (link->transmitter != transmitter || link->link_version != version)){
changed = 1; changed = 1;
link->transmitter = transmitter; link->transmitter = transmitter;
link->link_version = version; link->link_version = version;
link->interface = interface; link->interface = interface;
link->drop_rate = drop_rate;
// TODO other link attributes... // TODO other link attributes...
} }
} }

View File

@ -393,7 +393,7 @@ typedef struct overlay_interface {
struct slip_decode_state slip_decode_state; struct slip_decode_state slip_decode_state;
// copy of ifconfig flags // copy of ifconfig flags
char drop_broadcasts; uint16_t drop_broadcasts;
char drop_unicasts; char drop_unicasts;
int port; int port;
int type; int type;

View File

@ -189,7 +189,7 @@ setup_scan() {
set interfaces.1.dummy_address 127.0.1.11 set interfaces.1.dummy_address 127.0.1.11
foreach_instance +A +B \ foreach_instance +A +B \
executeOk_servald config \ executeOk_servald config \
set interfaces.1.drop_broadcasts 1 set interfaces.1.drop_broadcasts 100
foreach_instance +A +B start_routing_instance foreach_instance +A +B start_routing_instance
} }
test_scan() { test_scan() {
@ -340,6 +340,35 @@ test_multi_interface() {
wait_until multi_has_link $SIDA wait_until multi_has_link $SIDA
} }
doc_unreliable_links="Choose a longer, perfect path over an unreliable link"
setup_unreliable_links() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B +C create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +B +C add_interface 2
foreach_instance +A +C add_interface 3
set_instance +A
executeOk_servald config \
set interfaces.3.drop_broadcasts 70
set_instance +C
executeOk_servald config \
set interfaces.3.drop_broadcasts 70
foreach_instance +A +B +C start_routing_instance
}
test_unreliable_links() {
foreach_instance +A +B +C \
wait_until has_seen_instances +A +B +C
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDC 5
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT :"
set_instance +C
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA:INDIRECT :"
}
setup_circle() { setup_circle() {
setup_servald setup_servald
assert_no_servald_processes assert_no_servald_processes