diff --git a/overlay_link.c b/overlay_link.c index fd04affa..4d65bf9c 100644 --- a/overlay_link.c +++ b/overlay_link.c @@ -116,12 +116,18 @@ int set_reachable(struct subscriber *subscriber, int reachable){ case REACHABLE_BROADCAST: DEBUGF("REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid)); break; + case REACHABLE_BROADCAST|REACHABLE_UNICAST: + DEBUGF("REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid)); + break; case REACHABLE_UNICAST|REACHABLE_ASSUMED: DEBUGF("ASSUMED REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid)); break; case REACHABLE_BROADCAST|REACHABLE_ASSUMED: DEBUGF("ASSUMED REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid)); break; + case REACHABLE_BROADCAST|REACHABLE_UNICAST|REACHABLE_ASSUMED: + DEBUGF("ASSUMED REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid)); + break; } } diff --git a/overlay_packet.h b/overlay_packet.h index ed2c3157..4626af49 100644 --- a/overlay_packet.h +++ b/overlay_packet.h @@ -23,6 +23,9 @@ #include "overlay_address.h" #include "serval.h" +#define FRAME_NOT_SENT -1 +#define FRAME_DONT_SEND -2 + struct overlay_frame { struct overlay_frame *prev; struct overlay_frame *next; @@ -36,10 +39,10 @@ struct overlay_frame { void *send_context; int (*send_hook)(struct overlay_frame *, int seq, void *context); - /* Mark which interfaces the frame has been sent on, - so that we can ensure that broadcast frames get sent - exactly once on each interface */ - unsigned char broadcast_sent_via[OVERLAY_MAX_INTERFACES]; + /* What sequence number have we used to send this packet on this interface. + */ + int interface_sent_sequence[OVERLAY_MAX_INTERFACES]; + time_ms_t interface_dont_send_until[OVERLAY_MAX_INTERFACES]; struct broadcast broadcast_id; // null if destination is broadcast @@ -54,7 +57,6 @@ struct overlay_frame { struct sockaddr_in recvaddr; overlay_interface *interface; char unicast; - int sent_seq; time_ms_t dont_send_until; /* Actual payload */ diff --git a/overlay_payload.c b/overlay_payload.c index e196fa8e..58286c61 100644 --- a/overlay_payload.c +++ b/overlay_payload.c @@ -104,8 +104,9 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa if ((!p->destination) && !is_all_matching(p->broadcast_id.id,BROADCAST_LEN,0)){ broadcast = &p->broadcast_id; } + int i = interface - overlay_interfaces; if (overlay_frame_build_header(context, b, - p->queue, p->type, p->modifiers, p->ttl, p->sent_seq, + p->queue, p->type, p->modifiers, p->ttl, p->interface_sent_sequence[i], broadcast, p->next_hop, p->destination, p->source)) goto cleanup; diff --git a/overlay_queue.c b/overlay_queue.c index 3f295439..5d0e4c0b 100644 --- a/overlay_queue.c +++ b/overlay_queue.c @@ -170,7 +170,6 @@ int overlay_payload_enqueue(struct overlay_frame *p) overlay_send_probe(p->destination, p->destination->address, p->destination->interface, OQ_MESH_MANAGEMENT); overlay_txqueue *queue = &overlay_tx[p->queue]; - p->sent_seq =-1; if (config.debug.packettx) DEBUGF("Enqueuing packet for %s* (q[%d]length = %d)", @@ -185,15 +184,22 @@ int overlay_payload_enqueue(struct overlay_frame *p) if (queue->length>=queue->maxLength) return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength); - - if (!p->destination_resolved){ + { + int i; + for(i=0;iinterface_sent_sequence[i]=FRAME_DONT_SEND; + } + + if (p->destination_resolved){ + p->interface_sent_sequence[p->interface - overlay_interfaces]=FRAME_NOT_SENT; + }else{ if (p->destination){ // allow the packet to be resent if (p->resend == 0) p->resend = 3; }else{ int i; - int drop=1; + int interface_copies = 0; // hook to allow for flooding via olsr olsr_send(p); @@ -203,17 +209,20 @@ int overlay_payload_enqueue(struct overlay_frame *p) if (overlay_interfaces[i].state==INTERFACE_STATE_UP && overlay_interfaces[i].send_broadcasts && link_state_interface_has_neighbour(&overlay_interfaces[i])){ - p->broadcast_sent_via[i]=0; - drop=0; - }else - p->broadcast_sent_via[i]=1; + p->interface_sent_sequence[i]=FRAME_NOT_SENT; + interface_copies++; + } } // just drop it now - if (drop){ + if (interface_copies == 0){ WARN("No broadcast interfaces to send with"); return -1; } + + // allow the packet to be resent + if (p->resend == 0) + p->resend = 3 * interface_copies; } } @@ -230,7 +239,6 @@ int overlay_payload_enqueue(struct overlay_frame *p) rhizome_saw_voice_traffic(); overlay_calc_queue_time(queue, p); - return 0; } @@ -289,20 +297,23 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){ }while(0); time_ms_t next_allowed_packet=0; - if (frame->interface){ + if (frame->destination_resolved && frame->interface){ // don't include interfaces which are currently transmitting using a serial buffer if (frame->interface->tx_bytes_pending>0) return 0; next_allowed_packet = limit_next_allowed(&frame->interface->transfer_limit); - }else{ + }else if(!frame->destination){ // check all interfaces int i; for(i=0;iinterface_sent_sequence[i]==FRAME_DONT_SEND || !link_state_interface_has_neighbour(&overlay_interfaces[i])) continue; time_ms_t next_packet = limit_next_allowed(&overlay_interfaces[i].transfer_limit); + if (next_packet < frame->interface_dont_send_until[i]) + next_packet = frame->interface_dont_send_until[i]; if (next_allowed_packet==0||next_packet < next_allowed_packet) next_allowed_packet = next_packet; } @@ -394,9 +405,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim if (packet->buffer){ // check if we can stuff into this packet - if (frame->broadcast_sent_via[packet->i]){ + if (frame->interface_sent_sequence[packet->i]==FRAME_DONT_SEND || frame->interface_dont_send_until[packet->i] >now) goto skip; - } frame->interface = packet->interface; frame->recvaddr = packet->interface->broadcast_address; @@ -407,10 +417,12 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim for(i=0;ibroadcast_sent_via[i] || + frame->interface_sent_sequence[i]==FRAME_DONT_SEND || !link_state_interface_has_neighbour(&overlay_interfaces[i])) continue; keep=1; + if (frame->interface_dont_send_until[i] >now) + continue; time_ms_t next_allowed = limit_next_allowed(&overlay_interfaces[i].transfer_limit); if (next_allowed > now) continue; @@ -482,16 +494,21 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim // payload was not queued goto skip; } - if (frame->sent_seq!=-1 && config.debug.overlayframes) - DEBUGF("Retransmitted frame from seq %d in seq %d", frame->sent_seq, packet->seq); - frame->sent_seq = packet->seq; - + sent: + if (frame->interface_sent_sequence[packet->i]>=0 && config.debug.overlayframes) + DEBUGF("Retransmitted frame %p from seq %d in seq %d", frame, frame->interface_sent_sequence[packet->i], packet->seq); + + frame->interface_sent_sequence[packet->i] = packet->seq; + frame->interface_dont_send_until[packet->i] = now+200; + if (config.debug.overlayframes){ - DEBUGF("Sent payload type %x len %d for %s via %s, seq %d", frame->type, ob_position(frame->payload), + DEBUGF("Sent payload %p type %x len %d for %s via %s, seq %d", + frame, + frame->type, ob_position(frame->payload), frame->destination?alloca_tohex_sid(frame->destination->sid):"All", frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN), - frame->sent_seq); + frame->interface_sent_sequence[packet->i]); } if (frame->destination) @@ -502,8 +519,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim // mark the payload as sent int keep_payload = 0; + frame->resend --; if (frame->destination_resolved){ - frame->resend --; if (frame->resend>0 && frame->next_hop && packet->seq!=-1 && (!frame->unicast)){ frame->dont_send_until = now+200; frame->destination_resolved = 0; @@ -512,18 +529,17 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim DEBUGF("Holding onto payload for ack/nack resend in %lldms", frame->dont_send_until - now); } }else{ + if (frame->resend<=0 || packet->seq==-1 || frame->unicast){ + // dont retransmit if we aren't sending sequence numbers, or we've run out of allowed resends + frame->interface_sent_sequence[packet->i] = FRAME_DONT_SEND; + } int i; - frame->broadcast_sent_via[packet->i]=1; - - // check if there is still a broadcast to be sent - for(i=0;ibroadcast_sent_via[i]){ - keep_payload=1; - break; - } + link_state_interface_has_neighbour(&overlay_interfaces[i]) && + frame->interface_sent_sequence[i]!=FRAME_DONT_SEND){ + keep_payload = 1; + break; } } } @@ -592,24 +608,45 @@ int overlay_send_tick_packet(struct overlay_interface *interface){ // de-queue all packets that have been sent to this subscriber & have arrived. int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq) { + int interface_id = interface - overlay_interfaces; int i; + time_ms_t now = gettime_ms(); for (i=0;inext_hop == neighbour && frame->interface == interface && frame->sent_seq != -1){ - int seq_delta = (ack_seq - frame->sent_seq)&0xFF; - if (seq_delta <= 32 && (seq_delta==0 || ack_mask&(1<<(seq_delta-1)))){ - if (config.debug.overlayframes) - DEBUGF("Dequeing packet to %s sent by seq %d, due to ack of seq %d", alloca_tohex_sid(neighbour->sid), frame->sent_seq, ack_seq); - frame = overlay_queue_remove(&overlay_tx[i], frame); - continue; + int frame_seq = frame->interface_sent_sequence[interface_id]; + if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){ + int seq_delta = (ack_seq - frame_seq)&0xFF; + char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0; + + if (acked){ + frame->interface_sent_sequence[interface_id] = FRAME_DONT_SEND; + int discard = 1; + if (!frame->destination){ + int j; + for(j=0;jinterface_sent_sequence[j]!=FRAME_DONT_SEND){ + discard = 0; + break; + } + } + } + if (discard){ + if (config.debug.overlayframes) + DEBUGF("Dequeing packet %p to %s sent by seq %d, due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq); + frame = overlay_queue_remove(&overlay_tx[i], frame); + continue; + } } - if (seq_delta < 128){ - // resend, and re-resolve destination + + if (seq_delta < 128 && frame->destination && frame->dont_send_until>now){ + // resend immediately if (config.debug.overlayframes) - DEBUGF("Requeue packet to %s sent by seq %d due to ack of seq %d", alloca_tohex_sid(neighbour->sid), frame->sent_seq, ack_seq); - frame->dont_send_until = gettime_ms(); + DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq); + frame->dont_send_until = now; + // dont count the next retransmission against the time based retries frame->resend ++; overlay_calc_queue_time(&overlay_tx[i], frame); } diff --git a/route_link.c b/route_link.c index 32c8feaa..b12c500e 100644 --- a/route_link.c +++ b/route_link.c @@ -85,7 +85,7 @@ struct neighbour{ time_ms_t neighbour_link_timeout; // if a neighbour is telling the world that they are using us as a next hop, we need to send acks & nacks with high priority // otherwise we don't care too much about packet loss. - time_ms_t using_us_timeout; + char using_us; // next link update time_ms_t next_neighbour_update; @@ -332,7 +332,7 @@ next: int reachable = subscriber->reachable; if (next_hop == NULL){ - if (!(subscriber->reachable & REACHABLE_ASSUMED)) + if (subscriber->reachable&REACHABLE_BROADCAST && !(subscriber->reachable & REACHABLE_ASSUMED)) reachable = REACHABLE_NONE; } else if (next_hop == subscriber){ // reset the state of any unicast probe's if the interface has changed @@ -744,7 +744,7 @@ int link_state_ack_soon(struct subscriber *subscriber){ RETURN(0); time_ms_t now = gettime_ms(); - if (neighbour->using_us_timeout > now && neighbour->next_neighbour_update > now + 80){ + if (neighbour->using_us && neighbour->next_neighbour_update > now + 80){ neighbour->next_neighbour_update = now + 80; } update_alarm(neighbour->next_neighbour_update); @@ -807,7 +807,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface neighbour->ack_counter --; // if we need to nack promptly - if (neighbour->using_us_timeout > now){ + if (neighbour->using_us){ neighbour->next_neighbour_update = now + 10; if (neighbour->ack_counter <=0){ @@ -829,7 +829,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface link->link_timeout = now + (interface->tick_ms *5); // force an update soon when we need to promptly ack packets - if (neighbour->using_us_timeout > now && neighbour->ack_counter <=0){ + if (neighbour->using_us > now && neighbour->ack_counter <=0){ neighbour_find_best_link(neighbour); send_neighbour_link(neighbour); } @@ -920,27 +920,35 @@ int link_receive(overlay_mdp_frame *mdp) ack_mask, drop_rate); + if (transmitter == my_subscriber && receiver->reachable!=REACHABLE_SELF){ + // if I am in your routing graph to reach another node, even if I'm not your immediate neighbour + // I *MUST* forward your broadcasts to this node, otherwise I can drop them + + } + if (receiver == my_subscriber){ - // track if our neighbour is using our network link, if they are we need to ack / nack promptly - if (transmitter == sender) - neighbour->using_us_timeout = now + 500; - else - neighbour->using_us_timeout = 0; + // track if our neighbour is using us as an immediate neighbour, if they are we need to ack / nack promptly + neighbour->using_us = (transmitter==sender?1:0); // for routing, we can completely ignore any links that our neighbour is using to route through us. + // we can always send packets to ourself :) continue; } - // ignore other incoming links to our neighbour - // TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding? if (receiver == sender){ + // ignore other incoming links to our neighbour + // TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding? if (transmitter!=my_subscriber || interface_id==-1) continue; interface = &overlay_interfaces[interface_id]; + // ignore any links claiming to be from an interface we aren't using if (interface->state != INTERFACE_STATE_UP) continue; - }else if(transmitter == my_subscriber) + + // if our neighbour starts using us to reach this receiver, we have to treat the link the same as if it just died. + }else if(transmitter == my_subscriber){ transmitter = NULL; + } struct link *link = find_link(neighbour, receiver, transmitter?1:0); if (!link) diff --git a/tests/routing b/tests/routing index 32095d29..abc733ab 100755 --- a/tests/routing +++ b/tests/routing @@ -385,6 +385,27 @@ test_ping_unreliable() { tfw_cat --stdout --stderr } +doc_brping_unreliable="Broadcast ping over a 1-hop unreliable link" +setup_brping_unreliable() { + setup_servald + assert_no_servald_processes + foreach_instance +A +B create_single_identity + foreach_instance +A +B add_interface 1 + foreach_instance +A +B \ + executeOk_servald config \ + set interfaces.1.drop_broadcasts 20 \ + set debug.overlayframes 1 \ + set debug.rejecteddata 1 + foreach_instance +A +B start_routing_instance +} +test_brping_unreliable() { + wait_until path_exists +A +B + wait_until path_exists +B +A + set_instance +A + executeOk_servald mdp ping --interval=0.020 --timeout=2 broadcast 500 + tfw_cat --stdout --stderr +} + doc_unreliable_links="Prefer a longer, better path vs an unreliable link" setup_unreliable_links() { setup_servald @@ -413,11 +434,8 @@ test_unreliable_links() { set_instance +A executeOk_servald mdp ping --interval=0.100 --timeout=3 $SIDC 30 tfw_cat --stdout --stderr - executeOk_servald route print - assertStdoutGrep --matches=1 "^$SIDC:INDIRECT :" - set_instance +C - executeOk_servald route print - assertStdoutGrep --matches=1 "^$SIDA:INDIRECT :" + wait_until path_exists +A +B +C + wait_until path_exists +C +B +A } doc_unreliable_links2="Choose the best multihop path with some unreliable links"