Improve reliability

- don't drop packets that haven't been resolved yet on incoming acks
- statistically drop unicast packets during tests
This commit is contained in:
Jeremy Lakeman 2013-08-12 14:21:31 +09:30
parent 9a50d8a9ef
commit a516af616e
8 changed files with 119 additions and 86 deletions

View File

@ -435,8 +435,9 @@ STRING(256, file, "", str_nonempty,, "Path of interfa
ATOM(struct in_addr, dummy_address, hton_in_addr(INADDR_LOOPBACK), in_addr,, "Dummy interface address")
ATOM(struct in_addr, dummy_netmask, hton_in_addr(0xFFFFFF00), in_addr,, "Dummy interface netmask")
ATOM(uint16_t, port, PORT_DNA, uint16_nonzero,, "Port number for network interface")
ATOM(uint16_t, drop_broadcasts, 0, uint16_nonzero,, "Percentage of incoming broadcast packets that should be dropped for testing purposes")
ATOM(bool_t, drop_broadcasts, 0, boolean,, "If true, drop all incoming broadcast packets")
ATOM(bool_t, drop_unicasts, 0, boolean,, "If true, drop all incoming unicast packets")
ATOM(uint16_t, drop_packets, 0, uint16_nonzero,, "Percentage of incoming packets that should be dropped for testing purposes")
ATOM(short, type, OVERLAY_INTERFACE_WIFI, interface_type,, "Type of network interface")
ATOM(int32_t, packet_interval, -1, int32_nonneg,, "Minimum interval between packets in microseconds")
ATOM(int32_t, mdp_tick_ms, -1, int32_nonneg,, "Override MDP tick interval for this interface")

View File

@ -367,6 +367,7 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
// copy ifconfig values
interface->drop_broadcasts = ifconfig->drop_broadcasts;
interface->drop_unicasts = ifconfig->drop_unicasts;
interface->drop_packets = ifconfig->drop_packets;
interface->port = ifconfig->port;
interface->type = ifconfig->type;
interface->send_broadcasts = ifconfig->send_broadcasts;
@ -590,17 +591,22 @@ struct file_packet{
};
static int should_drop(struct overlay_interface *interface, struct sockaddr_in addr){
if (interface->drop_packets>=100)
return 1;
if (memcmp(&addr, &interface->address, sizeof(addr))==0){
return interface->drop_unicasts;
}
if (memcmp(&addr, &interface->destination->address, sizeof(addr))==0){
if (interface->drop_broadcasts == 0)
return 0;
if (interface->drop_broadcasts >= 100)
if (interface->drop_unicasts)
return 1;
if (rand()%100 >= interface->drop_broadcasts)
return 0;
}
}else if (memcmp(&addr, &interface->destination->address, sizeof(addr))==0){
if (interface->drop_broadcasts)
return 1;
}else
return 1;
if (interface->drop_packets <= 0)
return 0;
if (rand()%100 >= interface->drop_packets)
return 0;
return 1;
}

View File

@ -324,7 +324,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// TODO stop when the packet is nearly full?
while(frame){
if (frame->enqueued_at + queue->latencyTarget < now){
if (config.debug.rejecteddata)
if (config.debug.overlayframes)
DEBUGF("Dropping frame type %x for %s due to expiry timeout",
frame->type, frame->destination?alloca_tohex_sid(frame->destination->sid):"All");
frame = overlay_queue_remove(queue, frame);
@ -380,7 +380,6 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// is this packet going our way?
if (dest==packet->destination){
destination_index=i;
frame->destinations[i].sent_sequence = dest->sequence_number;
break;
}
}else{
@ -439,6 +438,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
goto skip;
}
frame->destinations[destination_index].sent_sequence = frame->destinations[destination_index].destination->sequence_number;
frame->destinations[destination_index].delay_until = now+200;
frame->transmit_count++;
@ -524,34 +524,40 @@ int overlay_queue_ack(struct subscriber *neighbour, struct network_destination *
struct overlay_frame *frame = overlay_tx[i].first;
while(frame){
for (j=frame->destination_count -1;j>=0;j--){
if (frame->destinations[j].destination==destination){
int frame_seq = frame->destinations[j].sent_sequence;
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;
if (acked){
if (config.debug.overlayframes)
DEBUGF("Packet %p to %s sent by seq %d, acked with seq %d",
frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
remove_destination(frame, j);
}else if (seq_delta < 128 && frame->destination && frame->delay_until>now){
// resend immediately
if (config.debug.overlayframes)
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->delay_until = now;
overlay_calc_queue_time(&overlay_tx[i], frame);
}
}
for (j=frame->destination_count -1;j>=0;j--)
if (frame->destinations[j].destination==destination)
break;
if (j>=0){
int frame_seq = frame->destinations[j].sent_sequence;
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;
if (acked){
if (config.debug.overlayframes)
DEBUGF("Packet %p to %s sent by seq %d, acked with seq %d",
frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
// drop packets that don't need to be retransmitted
if (frame->destination || frame->destination_count<=1){
frame = overlay_queue_remove(&overlay_tx[i], frame);
continue;
}
remove_destination(frame, j);
}else if (seq_delta < 128 && frame->destination && frame->delay_until>now){
// retransmit asap
if (config.debug.overlayframes)
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->delay_until = now;
overlay_calc_queue_time(&overlay_tx[i], frame);
}
}
}
if (frame->destination_count==0){
frame = overlay_queue_remove(&overlay_tx[i], frame);
}else
frame = frame->next;
frame = frame->next;
}
}
return 0;

View File

@ -890,7 +890,8 @@ int link_add_destinations(struct overlay_frame *frame)
if (n){
struct link_out *out = n->out_links;
while(out){
frame->destinations[frame->destination_count++].destination = add_destination_ref(out->destination);
if (frame->destination_count < MAX_PACKET_DESTINATIONS)
frame->destinations[frame->destination_count++].destination = add_destination_ref(out->destination);
out = out->_next;
}
}

View File

@ -443,7 +443,8 @@ typedef struct overlay_interface {
struct slip_decode_state slip_decode_state;
// copy of ifconfig flags
uint16_t drop_broadcasts;
uint16_t drop_packets;
char drop_broadcasts;
char drop_unicasts;
int port;
int type;

View File

@ -37,13 +37,13 @@ teardown() {
}
is_published() {
tfw_log "grep \"PUBLISHED.*$1\" $LOGA"
grep "PUBLISHED.*$1" $LOGA || return 1
tfw_log "grep \"PUBLISHED.*$1\" $instance_servald_log"
grep "PUBLISHED.*$1" $instance_servald_log || return 1
return 0
}
sent_directory_request() {
grep "Sending directory registration" $1 || return 1
grep "Sending directory registration" $instance_servald_log || return 1
return 0
}
@ -68,9 +68,9 @@ setup_publish() {
doc_publish="Publish and retrieve a directory entry"
test_publish() {
wait_until sent_directory_request $LOGB
wait_until sent_directory_request $LOGC
wait_until sent_directory_request $LOGD
foreach_instance +B +C +D
wait_until sent_directory_request
set_instance +A
wait_until is_published $SIDB
wait_until is_published $SIDC
wait_until is_published $SIDD
@ -115,32 +115,31 @@ setup_routing() {
foreach_instance +A +B +C \
executeOk_servald config \
set interfaces.0.file dummy1 \
set interfaces.0.mdp_tick_ms 0 \
set interfaces.0.send_broadcasts 0 \
set interfaces.0.default_route 1 \
set interfaces.0.dummy_netmask 255.255.255.0
set interfaces.0.dummy_netmask 255.255.255.0
set_instance +A
executeOk_servald config \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set dna.helper.executable "$servald_build_root/directory_service" \
set debug.dnahelper on
set_instance +B
executeOk_servald config \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set directory.service $SIDA \
set hosts.$SIDA.address 10.0.1.1
set_instance +C
executeOk_servald config \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set directory.service $SIDA \
set hosts.$SIDA.address 10.0.1.1
foreach_instance +B +C \
executeOk_servald config \
set interfaces.0.dummy_address 10.0.${instance_number}.1 \
set directory.service $SIDA \
set hosts.$SIDA.address 10.0.1.1
foreach_instance +A +B +C start_routing_instance
wait_until grep "DNAHELPER got STARTED ACK" $LOGA
set_instance +A
wait_until grep "DNAHELPER got STARTED ACK" $instance_servald_log
}
doc_routing="Ping via relay node"
test_routing() {
foreach_instance +B +C \
wait_until sent_directory_request $instance_servald_log
executeOk_servald scan 10.0.1.1
foreach_instance +B +C \
wait_until sent_directory_request
set_instance +A
wait_until is_published $SIDB
wait_until is_published $SIDC
set_instance +B

View File

@ -237,7 +237,7 @@ setup_FileTransferUnreliableBigMDP() {
executeOk_servald config \
set rhizome.http.enable 0 \
set interfaces.1.file dummy \
set interfaces.1.drop_broadcasts 20
set interfaces.1.drop_packets 10
setup_bigfile_common
}
test_FileTransferUnreliableBigMDP() {

View File

@ -221,6 +221,9 @@ test_multiple_nodes() {
wait_until path_exists +A +B
wait_until path_exists +A +C
wait_until path_exists +A +D
wait_until path_exists +B +A
wait_until path_exists +C +A
wait_until path_exists +D +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
@ -238,7 +241,7 @@ setup_scan() {
foreach_instance +A +B +C add_interface 1
foreach_instance +A +B +C \
executeOk_servald config \
set interfaces.1.drop_broadcasts 100
set interfaces.1.drop_broadcasts on
foreach_instance +A +B +C start_routing_instance
}
test_scan() {
@ -259,6 +262,22 @@ test_scan() {
tfw_cat --stdout --stderr
}
doc_scan_one="Network scan a single address"
setup_scan_one() {
setup_scan
}
test_scan_one() {
set_instance +A
executeOk_servald scan 127.0.1.2
wait_until scan_completed
wait_until --timeout=10 has_seen_instances +B
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDB:UNICAST:"
assertStdoutGrep --matches=0 "^$SIDC:"
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
}
scan_completed() {
grep "Scan completed" $instance_servald_log || return 1
return 0
@ -272,7 +291,7 @@ setup_single_filter() {
foreach_instance +A +B add_interface 1
set_instance +B
executeOk_servald config \
set interfaces.1.drop_broadcasts 100
set interfaces.1.drop_broadcasts on
foreach_instance +A +B start_routing_instance
}
test_single_filter() {
@ -366,11 +385,11 @@ setup_unicast_route() {
foreach_instance +C +D add_interface 3
set_instance +A
executeOk_servald config \
set interfaces.1.drop_broadcasts 100
set interfaces.1.drop_broadcasts on
set_instance +C
executeOk_servald config \
set interfaces.2.drop_broadcasts 100 \
set interfaces.3.drop_broadcasts 100
set interfaces.2.drop_broadcasts on \
set interfaces.3.drop_broadcasts on
foreach_instance +A +B +C +D start_routing_instance
}
test_unicast_route() {
@ -477,7 +496,7 @@ setup_ping_unreliable() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 40
set interfaces.1.drop_packets 40
foreach_instance +A +B start_routing_instance
}
test_ping_unreliable() {
@ -496,11 +515,11 @@ setup_ping_unreliable2() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 40
set interfaces.1.drop_packets 40
foreach_instance +B +C add_interface 2
foreach_instance +B +C \
executeOk_servald config \
set interfaces.2.drop_broadcasts 40
set interfaces.2.drop_packets 40
foreach_instance +A +B +C start_routing_instance
}
test_ping_unreliable2() {
@ -519,7 +538,7 @@ setup_brping_unreliable() {
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 20
set interfaces.1.drop_packets 20
foreach_instance +A +B start_routing_instance
}
test_brping_unreliable() {
@ -540,16 +559,16 @@ setup_unreliable_links() {
foreach_instance +A +C add_interface 3
set_instance +A
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.3.drop_broadcasts 70
set interfaces.1.drop_packets 5 \
set interfaces.3.drop_packets 70
set_instance +B
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.2.drop_broadcasts 5
set interfaces.1.drop_packets 5 \
set interfaces.2.drop_packets 5
set_instance +C
executeOk_servald config \
set interfaces.2.drop_broadcasts 5 \
set interfaces.3.drop_broadcasts 70
set interfaces.2.drop_packets 5 \
set interfaces.3.drop_packets 70
foreach_instance +A +B +C start_routing_instance
}
test_unreliable_links() {
@ -575,24 +594,24 @@ setup_unreliable_links2() {
foreach_instance +C +D add_interface 6
set_instance +A
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.2.drop_broadcasts 40 \
set interfaces.3.drop_broadcasts 90
set interfaces.1.drop_packets 5 \
set interfaces.2.drop_packets 40 \
set interfaces.3.drop_packets 90
set_instance +B
executeOk_servald config \
set interfaces.1.drop_broadcasts 5 \
set interfaces.4.drop_broadcasts 5 \
set interfaces.5.drop_broadcasts 40
set interfaces.1.drop_packets 5 \
set interfaces.4.drop_packets 5 \
set interfaces.5.drop_packets 40
set_instance +C
executeOk_servald config \
set interfaces.2.drop_broadcasts 40 \
set interfaces.4.drop_broadcasts 5 \
set interfaces.6.drop_broadcasts 5
set interfaces.2.drop_packets 40 \
set interfaces.4.drop_packets 5 \
set interfaces.6.drop_packets 5
set_instance +D
executeOk_servald config \
set interfaces.3.drop_broadcasts 90 \
set interfaces.5.drop_broadcasts 40 \
set interfaces.6.drop_broadcasts 5
set interfaces.3.drop_packets 90 \
set interfaces.5.drop_packets 40 \
set interfaces.6.drop_packets 5
foreach_instance +A +B +C +D start_routing_instance
}
test_unreliable_links2() {