Add support for retransmitting broadcast mdp packets

This commit is contained in:
Jeremy Lakeman 2013-05-20 13:23:35 +09:30
parent a213872f09
commit 02be4c2979
6 changed files with 140 additions and 68 deletions

View File

@ -116,12 +116,18 @@ int set_reachable(struct subscriber *subscriber, int reachable){
case REACHABLE_BROADCAST:
DEBUGF("REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_UNICAST:
DEBUGF("REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_UNICAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA BROADCAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
case REACHABLE_BROADCAST|REACHABLE_UNICAST|REACHABLE_ASSUMED:
DEBUGF("ASSUMED REACHABLE VIA BROADCAST & UNICAST sid=%s", alloca_tohex_sid(subscriber->sid));
break;
}
}

View File

@ -23,6 +23,9 @@
#include "overlay_address.h"
#include "serval.h"
#define FRAME_NOT_SENT -1
#define FRAME_DONT_SEND -2
struct overlay_frame {
struct overlay_frame *prev;
struct overlay_frame *next;
@ -36,10 +39,10 @@ struct overlay_frame {
void *send_context;
int (*send_hook)(struct overlay_frame *, int seq, void *context);
/* Mark which interfaces the frame has been sent on,
so that we can ensure that broadcast frames get sent
exactly once on each interface */
unsigned char broadcast_sent_via[OVERLAY_MAX_INTERFACES];
/* What sequence number have we used to send this packet on this interface.
*/
int interface_sent_sequence[OVERLAY_MAX_INTERFACES];
time_ms_t interface_dont_send_until[OVERLAY_MAX_INTERFACES];
struct broadcast broadcast_id;
// null if destination is broadcast
@ -54,7 +57,6 @@ struct overlay_frame {
struct sockaddr_in recvaddr;
overlay_interface *interface;
char unicast;
int sent_seq;
time_ms_t dont_send_until;
/* Actual payload */

View File

@ -104,8 +104,9 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa
if ((!p->destination) && !is_all_matching(p->broadcast_id.id,BROADCAST_LEN,0)){
broadcast = &p->broadcast_id;
}
int i = interface - overlay_interfaces;
if (overlay_frame_build_header(context, b,
p->queue, p->type, p->modifiers, p->ttl, p->sent_seq,
p->queue, p->type, p->modifiers, p->ttl, p->interface_sent_sequence[i],
broadcast, p->next_hop,
p->destination, p->source))
goto cleanup;

View File

@ -170,7 +170,6 @@ int overlay_payload_enqueue(struct overlay_frame *p)
overlay_send_probe(p->destination, p->destination->address, p->destination->interface, OQ_MESH_MANAGEMENT);
overlay_txqueue *queue = &overlay_tx[p->queue];
p->sent_seq =-1;
if (config.debug.packettx)
DEBUGF("Enqueuing packet for %s* (q[%d]length = %d)",
@ -185,15 +184,22 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (queue->length>=queue->maxLength)
return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength);
if (!p->destination_resolved){
{
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
p->interface_sent_sequence[i]=FRAME_DONT_SEND;
}
if (p->destination_resolved){
p->interface_sent_sequence[p->interface - overlay_interfaces]=FRAME_NOT_SENT;
}else{
if (p->destination){
// allow the packet to be resent
if (p->resend == 0)
p->resend = 3;
}else{
int i;
int drop=1;
int interface_copies = 0;
// hook to allow for flooding via olsr
olsr_send(p);
@ -203,17 +209,20 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& overlay_interfaces[i].send_broadcasts
&& link_state_interface_has_neighbour(&overlay_interfaces[i])){
p->broadcast_sent_via[i]=0;
drop=0;
}else
p->broadcast_sent_via[i]=1;
p->interface_sent_sequence[i]=FRAME_NOT_SENT;
interface_copies++;
}
}
// just drop it now
if (drop){
if (interface_copies == 0){
WARN("No broadcast interfaces to send with");
return -1;
}
// allow the packet to be resent
if (p->resend == 0)
p->resend = 3 * interface_copies;
}
}
@ -230,7 +239,6 @@ int overlay_payload_enqueue(struct overlay_frame *p)
rhizome_saw_voice_traffic();
overlay_calc_queue_time(queue, p);
return 0;
}
@ -289,20 +297,23 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
}while(0);
time_ms_t next_allowed_packet=0;
if (frame->interface){
if (frame->destination_resolved && frame->interface){
// don't include interfaces which are currently transmitting using a serial buffer
if (frame->interface->tx_bytes_pending>0)
return 0;
next_allowed_packet = limit_next_allowed(&frame->interface->transfer_limit);
}else{
}else if(!frame->destination){
// check all interfaces
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP ||
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP ||
frame->interface_sent_sequence[i]==FRAME_DONT_SEND ||
!link_state_interface_has_neighbour(&overlay_interfaces[i]))
continue;
time_ms_t next_packet = limit_next_allowed(&overlay_interfaces[i].transfer_limit);
if (next_packet < frame->interface_dont_send_until[i])
next_packet = frame->interface_dont_send_until[i];
if (next_allowed_packet==0||next_packet < next_allowed_packet)
next_allowed_packet = next_packet;
}
@ -394,9 +405,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
if (packet->buffer){
// check if we can stuff into this packet
if (frame->broadcast_sent_via[packet->i]){
if (frame->interface_sent_sequence[packet->i]==FRAME_DONT_SEND || frame->interface_dont_send_until[packet->i] >now)
goto skip;
}
frame->interface = packet->interface;
frame->recvaddr = packet->interface->broadcast_address;
@ -407,10 +417,12 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP ||
frame->broadcast_sent_via[i] ||
frame->interface_sent_sequence[i]==FRAME_DONT_SEND ||
!link_state_interface_has_neighbour(&overlay_interfaces[i]))
continue;
keep=1;
if (frame->interface_dont_send_until[i] >now)
continue;
time_ms_t next_allowed = limit_next_allowed(&overlay_interfaces[i].transfer_limit);
if (next_allowed > now)
continue;
@ -482,16 +494,21 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// payload was not queued
goto skip;
}
if (frame->sent_seq!=-1 && config.debug.overlayframes)
DEBUGF("Retransmitted frame from seq %d in seq %d", frame->sent_seq, packet->seq);
frame->sent_seq = packet->seq;
sent:
if (frame->interface_sent_sequence[packet->i]>=0 && config.debug.overlayframes)
DEBUGF("Retransmitted frame %p from seq %d in seq %d", frame, frame->interface_sent_sequence[packet->i], packet->seq);
frame->interface_sent_sequence[packet->i] = packet->seq;
frame->interface_dont_send_until[packet->i] = now+200;
if (config.debug.overlayframes){
DEBUGF("Sent payload type %x len %d for %s via %s, seq %d", frame->type, ob_position(frame->payload),
DEBUGF("Sent payload %p type %x len %d for %s via %s, seq %d",
frame,
frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN),
frame->sent_seq);
frame->interface_sent_sequence[packet->i]);
}
if (frame->destination)
@ -502,8 +519,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// mark the payload as sent
int keep_payload = 0;
frame->resend --;
if (frame->destination_resolved){
frame->resend --;
if (frame->resend>0 && frame->next_hop && packet->seq!=-1 && (!frame->unicast)){
frame->dont_send_until = now+200;
frame->destination_resolved = 0;
@ -512,18 +529,17 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
DEBUGF("Holding onto payload for ack/nack resend in %lldms", frame->dont_send_until - now);
}
}else{
if (frame->resend<=0 || packet->seq==-1 || frame->unicast){
// dont retransmit if we aren't sending sequence numbers, or we've run out of allowed resends
frame->interface_sent_sequence[packet->i] = FRAME_DONT_SEND;
}
int i;
frame->broadcast_sent_via[packet->i]=1;
// check if there is still a broadcast to be sent
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state==INTERFACE_STATE_UP &&
link_state_interface_has_neighbour(&overlay_interfaces[i])){
if (!frame->broadcast_sent_via[i]){
keep_payload=1;
break;
}
link_state_interface_has_neighbour(&overlay_interfaces[i]) &&
frame->interface_sent_sequence[i]!=FRAME_DONT_SEND){
keep_payload = 1;
break;
}
}
}
@ -592,24 +608,45 @@ int overlay_send_tick_packet(struct overlay_interface *interface){
// de-queue all packets that have been sent to this subscriber & have arrived.
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq)
{
int interface_id = interface - overlay_interfaces;
int i;
time_ms_t now = gettime_ms();
for (i=0;i<OQ_MAX;i++){
struct overlay_frame *frame = overlay_tx[i].first;
while(frame){
if (frame->next_hop == neighbour && frame->interface == interface && frame->sent_seq != -1){
int seq_delta = (ack_seq - frame->sent_seq)&0xFF;
if (seq_delta <= 32 && (seq_delta==0 || ack_mask&(1<<(seq_delta-1)))){
if (config.debug.overlayframes)
DEBUGF("Dequeing packet to %s sent by seq %d, due to ack of seq %d", alloca_tohex_sid(neighbour->sid), frame->sent_seq, ack_seq);
frame = overlay_queue_remove(&overlay_tx[i], frame);
continue;
int frame_seq = frame->interface_sent_sequence[interface_id];
if (frame_seq >=0 && (frame->next_hop == neighbour || !frame->destination)){
int seq_delta = (ack_seq - frame_seq)&0xFF;
char acked = (seq_delta==0 || (seq_delta <= 32 && ack_mask&(1<<(seq_delta-1))))?1:0;
if (acked){
frame->interface_sent_sequence[interface_id] = FRAME_DONT_SEND;
int discard = 1;
if (!frame->destination){
int j;
for(j=0;j<OVERLAY_MAX_INTERFACES;j++){
if (overlay_interfaces[j].state==INTERFACE_STATE_UP &&
frame->interface_sent_sequence[j]!=FRAME_DONT_SEND){
discard = 0;
break;
}
}
}
if (discard){
if (config.debug.overlayframes)
DEBUGF("Dequeing packet %p to %s sent by seq %d, due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame = overlay_queue_remove(&overlay_tx[i], frame);
continue;
}
}
if (seq_delta < 128){
// resend, and re-resolve destination
if (seq_delta < 128 && frame->destination && frame->dont_send_until>now){
// resend immediately
if (config.debug.overlayframes)
DEBUGF("Requeue packet to %s sent by seq %d due to ack of seq %d", alloca_tohex_sid(neighbour->sid), frame->sent_seq, ack_seq);
frame->dont_send_until = gettime_ms();
DEBUGF("Requeue packet %p to %s sent by seq %d due to ack of seq %d", frame, alloca_tohex_sid(neighbour->sid), frame_seq, ack_seq);
frame->dont_send_until = now;
// dont count the next retransmission against the time based retries
frame->resend ++;
overlay_calc_queue_time(&overlay_tx[i], frame);
}

View File

@ -85,7 +85,7 @@ struct neighbour{
time_ms_t neighbour_link_timeout;
// if a neighbour is telling the world that they are using us as a next hop, we need to send acks & nacks with high priority
// otherwise we don't care too much about packet loss.
time_ms_t using_us_timeout;
char using_us;
// next link update
time_ms_t next_neighbour_update;
@ -332,7 +332,7 @@ next:
int reachable = subscriber->reachable;
if (next_hop == NULL){
if (!(subscriber->reachable & REACHABLE_ASSUMED))
if (subscriber->reachable&REACHABLE_BROADCAST && !(subscriber->reachable & REACHABLE_ASSUMED))
reachable = REACHABLE_NONE;
} else if (next_hop == subscriber){
// reset the state of any unicast probe's if the interface has changed
@ -744,7 +744,7 @@ int link_state_ack_soon(struct subscriber *subscriber){
RETURN(0);
time_ms_t now = gettime_ms();
if (neighbour->using_us_timeout > now && neighbour->next_neighbour_update > now + 80){
if (neighbour->using_us && neighbour->next_neighbour_update > now + 80){
neighbour->next_neighbour_update = now + 80;
}
update_alarm(neighbour->next_neighbour_update);
@ -807,7 +807,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
neighbour->ack_counter --;
// if we need to nack promptly
if (neighbour->using_us_timeout > now){
if (neighbour->using_us){
neighbour->next_neighbour_update = now + 10;
if (neighbour->ack_counter <=0){
@ -829,7 +829,7 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
link->link_timeout = now + (interface->tick_ms *5);
// force an update soon when we need to promptly ack packets
if (neighbour->using_us_timeout > now && neighbour->ack_counter <=0){
if (neighbour->using_us > now && neighbour->ack_counter <=0){
neighbour_find_best_link(neighbour);
send_neighbour_link(neighbour);
}
@ -920,27 +920,35 @@ int link_receive(overlay_mdp_frame *mdp)
ack_mask,
drop_rate);
if (transmitter == my_subscriber && receiver->reachable!=REACHABLE_SELF){
// if I am in your routing graph to reach another node, even if I'm not your immediate neighbour
// I *MUST* forward your broadcasts to this node, otherwise I can drop them
}
if (receiver == my_subscriber){
// track if our neighbour is using our network link, if they are we need to ack / nack promptly
if (transmitter == sender)
neighbour->using_us_timeout = now + 500;
else
neighbour->using_us_timeout = 0;
// track if our neighbour is using us as an immediate neighbour, if they are we need to ack / nack promptly
neighbour->using_us = (transmitter==sender?1:0);
// for routing, we can completely ignore any links that our neighbour is using to route through us.
// we can always send packets to ourself :)
continue;
}
// ignore other incoming links to our neighbour
// TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding?
if (receiver == sender){
// ignore other incoming links to our neighbour
// TODO build a map of everyone in our 2 hop neighbourhood to control broadcast flooding?
if (transmitter!=my_subscriber || interface_id==-1)
continue;
interface = &overlay_interfaces[interface_id];
// ignore any links claiming to be from an interface we aren't using
if (interface->state != INTERFACE_STATE_UP)
continue;
}else if(transmitter == my_subscriber)
// if our neighbour starts using us to reach this receiver, we have to treat the link the same as if it just died.
}else if(transmitter == my_subscriber){
transmitter = NULL;
}
struct link *link = find_link(neighbour, receiver, transmitter?1:0);
if (!link)

View File

@ -385,6 +385,27 @@ test_ping_unreliable() {
tfw_cat --stdout --stderr
}
doc_brping_unreliable="Broadcast ping over a 1-hop unreliable link"
setup_brping_unreliable() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 20 \
set debug.overlayframes 1 \
set debug.rejecteddata 1
foreach_instance +A +B start_routing_instance
}
test_brping_unreliable() {
wait_until path_exists +A +B
wait_until path_exists +B +A
set_instance +A
executeOk_servald mdp ping --interval=0.020 --timeout=2 broadcast 500
tfw_cat --stdout --stderr
}
doc_unreliable_links="Prefer a longer, better path vs an unreliable link"
setup_unreliable_links() {
setup_servald
@ -413,11 +434,8 @@ test_unreliable_links() {
set_instance +A
executeOk_servald mdp ping --interval=0.100 --timeout=3 $SIDC 30
tfw_cat --stdout --stderr
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDC:INDIRECT :"
set_instance +C
executeOk_servald route print
assertStdoutGrep --matches=1 "^$SIDA:INDIRECT :"
wait_until path_exists +A +B +C
wait_until path_exists +C +B +A
}
doc_unreliable_links2="Choose the best multihop path with some unreliable links"