Add packet retransmissions due to missed ack's

This commit is contained in:
Jeremy Lakeman 2013-05-15 11:33:43 +09:30
parent b10746b3a5
commit e5856225cf
9 changed files with 220 additions and 122 deletions

View File

@ -181,10 +181,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MAX_AUDIO_BYTES 1024
#define MDP_AWAITREPLY 9999
/* max number of recent samples to cram into a VoMP frame as well as the current
frame of audio (preemptive audio retransmission) */
#define VOMP_MAX_RECENT_SAMPLES 2
// codec's with well defined parameters
#define VOMP_CODEC_16SIGNED 0x01
#define VOMP_CODEC_ULAW 0x02

View File

@ -765,8 +765,6 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
if (frame->queue==0)
frame->queue = OQ_ORDINARY;
frame->send_copies = mdp->out.send_copies;
if (overlay_payload_enqueue(frame))
op_free(frame);
RETURN(0);

View File

@ -32,8 +32,7 @@ struct overlay_frame {
unsigned char ttl;
unsigned char queue;
// temporary hack to improve reliability before implementing per-packet nack's
int send_copies;
char resend;
/* Mark which interfaces the frame has been sent on,
so that we can ensure that broadcast frames get sent
@ -53,6 +52,8 @@ struct overlay_frame {
struct sockaddr_in recvaddr;
overlay_interface *interface;
char unicast;
int sent_seq;
time_ms_t dont_send_until;
/* Actual payload */
struct overlay_buffer *payload;

View File

@ -34,7 +34,7 @@ struct sockaddr_in loopback;
int overlay_packet_init_header(int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, char seq){
char unicast, char interface, int seq){
if (encapsulation !=ENCAP_OVERLAY && encapsulation !=ENCAP_SINGLE)
return WHY("Invalid packet encapsulation");
@ -447,7 +447,9 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
// process payloads that are for me or everyone
if (header_valid&HEADER_PROCESS)
process_incoming_frame(now, interface, &f, &context);
if (f.next_hop == my_subscriber || f.destination == my_subscriber)
link_state_ack_soon(context.sender);
}
if (f.payload){

View File

@ -39,6 +39,7 @@ overlay_txqueue overlay_tx[OQ_MAX];
struct outgoing_packet{
overlay_interface *interface;
int seq;
int i;
struct subscriber *unicast_subscriber;
struct sockaddr_in dest;
@ -169,7 +170,8 @@ int overlay_payload_enqueue(struct overlay_frame *p)
overlay_send_probe(p->destination, p->destination->address, p->destination->interface, OQ_MESH_MANAGEMENT);
overlay_txqueue *queue = &overlay_tx[p->queue];
p->sent_seq =-1;
if (config.debug.packettx)
DEBUGF("Enqueuing packet for %s* (q[%d]length = %d)",
p->destination?alloca_tohex(p->destination->sid, 7): alloca_tohex(p->broadcast_id.id,BROADCAST_LEN),
@ -184,13 +186,12 @@ int overlay_payload_enqueue(struct overlay_frame *p)
if (queue->length>=queue->maxLength)
return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength);
if (p->send_copies<=0)
p->send_copies=1;
else if(p->send_copies>5)
return WHY("Too many copies requested");
if (!p->destination_resolved){
if (!p->destination){
if (p->destination){
// allow the packet to be resent
if (p->resend == 0)
p->resend = 3;
}else{
int i;
int drop=1;
@ -240,15 +241,15 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati
packet->i = (interface - overlay_interfaces);
packet->dest=addr;
packet->buffer=ob_new();
int seq=-1;
packet->seq=-1;
if (unicast)
packet->unicast_subscriber = destination;
else
seq = interface->sequence_number = (interface->sequence_number + 1)&0xFF;
packet->seq = interface->sequence_number = (interface->sequence_number + 1)&0xFF;
ob_limitsize(packet->buffer, packet->interface->mtu);
overlay_packet_init_header(ENCAP_OVERLAY, &packet->context, packet->buffer,
destination, unicast, packet->i, seq);
destination, unicast, packet->i, packet->seq);
packet->header_length = ob_position(packet->buffer);
}
@ -309,6 +310,9 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
return 0;
}
if (next_allowed_packet < frame->dont_send_until)
next_allowed_packet = frame->dont_send_until;
overlay_queue_schedule_next(next_allowed_packet);
return 0;
@ -332,6 +336,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
even if we hear it from somewhere else in the mean time
*/
// ignore payloads that are waiting for ack / nack resends
if (frame->dont_send_until > now)
goto skip;
// quickly skip payloads that have no chance of fitting
if (packet->buffer && ob_limit(frame->payload) > ob_remaining(packet->buffer))
goto skip;
@ -378,8 +386,6 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
if(r&REACHABLE_UNICAST){
frame->recvaddr = frame->next_hop->address;
frame->unicast = 1;
// ignore resend logic for unicast packets, where wifi gives better resilience
frame->send_copies=1;
}else
frame->recvaddr = frame->interface->broadcast_address;
@ -467,12 +473,14 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// payload was not queued
goto skip;
}
frame->sent_seq = packet->seq;
sent:
sent:
if (config.debug.overlayframes){
DEBUGF("Sent payload type %x len %d for %s via %s", frame->type, ob_position(frame->payload),
DEBUGF("Sent payload type %x len %d for %s via %s, seq %d", frame->type, ob_position(frame->payload),
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN));
frame->next_hop?alloca_tohex_sid(frame->next_hop->sid):alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN),
frame->sent_seq);
}
if (frame->destination)
@ -484,9 +492,14 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
int keep_payload = 0;
if (frame->destination_resolved){
frame->send_copies --;
if (frame->send_copies>0)
keep_payload=1;
frame->resend --;
if (frame->resend>0 && frame->next_hop && packet->seq!=-1 && (!frame->unicast)){
frame->dont_send_until = now+200;
frame->destination_resolved = 0;
keep_payload = 1;
if (config.debug.overlayframes)
DEBUGF("Holding onto payload for ack/nack resend in %lldms", frame->dont_send_until - now);
}
}else{
int i;
frame->broadcast_sent_via[packet->i]=1;
@ -565,12 +578,33 @@ int overlay_send_tick_packet(struct overlay_interface *interface){
return 0;
}
int overlay_queue_nack(struct subscriber *neighbour, struct overlay_interface *interface, int sequence)
// de-queue all packets that have been sent to this subscriber & have arrived.
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq)
{
return 0;
}
int i;
for (i=0;i<OQ_MAX;i++){
struct overlay_frame *frame = overlay_tx[i].first;
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, int sequence)
{
while(frame){
if (frame->next_hop == neighbour && frame->interface == interface && frame->sent_seq != -1){
int seq_delta = (ack_seq - frame->sent_seq)&0xFF;
if (seq_delta <= 32 && (seq_delta==0 || ack_mask&(1<<(seq_delta-1)))){
if (config.debug.overlayframes)
DEBUGF("Dequeing packet to %s sent by seq %d, due to ack of seq %d", alloca_tohex_sid(neighbour->sid), frame->sent_seq, ack_seq);
frame = overlay_queue_remove(&overlay_tx[i], frame);
continue;
}
if (seq_delta < 128){
// resend, and re-resolve destination
if (config.debug.overlayframes)
DEBUGF("Requeue packet to %s sent by seq %d due to ack of seq %d", alloca_tohex_sid(neighbour->sid), frame->sent_seq, ack_seq);
frame->dont_send_until = gettime_ms();
frame->resend ++;
overlay_calc_queue_time(&overlay_tx[i], frame);
}
}
frame = frame->next;
}
}
return 0;
}

View File

@ -87,6 +87,7 @@ struct neighbour{
// next link update
time_ms_t next_neighbour_update;
time_ms_t last_update;
time_ms_t rtt;
int ack_counter;
// un-balanced tree of known link states
@ -156,6 +157,8 @@ static struct neighbour *get_neighbour(struct subscriber *subscriber, char creat
n = emalloc_zero(sizeof(struct neighbour));
n->subscriber = subscriber;
n->_next = neighbours;
// TODO measure min/max rtt
n->rtt = 120;
neighbours = n;
if (config.debug.linkstate)
DEBUGF("LINK STATE; new neighbour %s", alloca_tohex_sid(n->subscriber->sid));
@ -184,6 +187,7 @@ static struct link *find_link(struct neighbour *neighbour, struct subscriber *re
link->receiver = receiver;
link->path_version = neighbour->path_version -1;
link->last_ack_seq = -1;
link->link_version = -1;
}
break;
}
@ -343,7 +347,7 @@ next:
if (changed){
if (config.debug.linkstate){
if (next_hop == subscriber){
if (reachable & REACHABLE_DIRECT){
DEBUGF("LINK STATE; neighbour %s is reachable on interface %s",
alloca_tohex_sid(subscriber->sid),
interface->name);
@ -535,53 +539,90 @@ static int send_legacy_self_announce_ack(struct neighbour *neighbour, struct nei
return 0;
}
static int link_send_neighbours(struct overlay_buffer *payload)
static int neighbour_find_best_link(struct neighbour *n)
{
// TODO compare other link stats to find the best...
struct neighbour_link *best_link=n->links;
if (best_link){
struct neighbour_link *link=best_link->_next;
while(link){
if (link->interface != best_link->interface &&
overlay_interface_compare(best_link->interface, link->interface))
best_link = link;
link = link->_next;
}
}
if (n->best_link != best_link){
n->best_link = best_link;
n->next_neighbour_update = gettime_ms()+10;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; best link from neighbour %s is now on interface %s",
alloca_tohex_sid(n->subscriber->sid),
best_link?best_link->interface->name:"NONE");
}
return 0;
}
static int send_neighbour_link(struct neighbour *n){
IN();
if (!n->best_link)
RETURN(-1);
time_ms_t now = gettime_ms();
if (n->legacy_protocol){
// send a self announce ack instead.
send_legacy_self_announce_ack(n, n->best_link, now);
n->last_update = now;
} else {
struct overlay_frame *frame=emalloc_zero(sizeof(struct overlay_frame));
frame->type=OF_TYPE_DATA;
frame->source=my_subscriber;
frame->ttl=1;
frame->queue=OQ_MESH_MANAGEMENT;
frame->payload = ob_new();
if (n->subscriber->reachable & REACHABLE_DIRECT && (!(n->subscriber->reachable&REACHABLE_ASSUMED))){
frame->destination_resolved = 1;
frame->interface = n->subscriber->interface;
frame->recvaddr = frame->interface->broadcast_address;
frame->resend=-1;
}
ob_limitsize(frame->payload, 400);
overlay_mdp_encode_ports(frame->payload, MDP_PORT_LINKSTATE, MDP_PORT_LINKSTATE);
char flags=0;
if (n->best_link->unicast)
flags|=FLAG_UNICAST;
else
flags|=FLAG_BROADCAST;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; Sending ack to %s for seq %d", alloca_tohex_sid(n->subscriber->sid), n->best_link->ack_sequence);
append_link_state(frame->payload, flags, n->subscriber, my_subscriber, n->best_link->neighbour_interface, 1, n->best_link->ack_sequence, n->best_link->ack_mask, -1);
if (overlay_payload_enqueue(frame))
op_free(frame);
else
n->last_update = now;
}
n->next_neighbour_update = n->last_update + n->best_link->interface->tick_ms;
n->ack_counter = ACK_WINDOW;
OUT();
return 0;
}
static int link_send_neighbours()
{
time_ms_t now = gettime_ms();
clean_neighbours(now);
struct neighbour *n = neighbours;
while (n){
// TODO compare other link stats to find the best...
struct neighbour_link *best_link=n->links;
if (best_link){
struct neighbour_link *link=best_link->_next;
while(link){
if (link->interface != best_link->interface &&
overlay_interface_compare(best_link->interface, link->interface))
best_link = link;
link = link->_next;
}
}
neighbour_find_best_link(n);
if (n->best_link != best_link){
n->best_link = best_link;
n->next_neighbour_update = now;
if (config.debug.linkstate && config.debug.verbose)
DEBUGF("LINK STATE; best link from neighbour %s is now on interface %s",
alloca_tohex_sid(n->subscriber->sid),
best_link?best_link->interface->name:"NONE");
}
if (n->next_neighbour_update - INCLUDE_ANYWAY <= now){
if (n->legacy_protocol){
// send a self announce ack instead.
send_legacy_self_announce_ack(n, best_link, now);
} else {
char flags=0;
if (best_link->unicast)
flags|=FLAG_UNICAST;
else
flags|=FLAG_BROADCAST;
if (append_link_state(payload, flags, n->subscriber, my_subscriber, best_link->neighbour_interface, 1, best_link->ack_sequence, best_link->ack_mask, -1)){
link_send_alarm.alarm = now;
return 1;
}
}
n->last_update = now;
n->next_neighbour_update = now + best_link->interface->tick_ms;
n->ack_counter = ACK_WINDOW;
if (n->next_neighbour_update <= now){
send_neighbour_link(n);
}
if (n->next_neighbour_update < link_send_alarm.alarm)
@ -597,7 +638,10 @@ static void link_send(struct sched_ent *alarm)
{
time_ms_t now = gettime_ms();
alarm->alarm=now + 10000;
alarm->alarm=now + 60000;
// TODO use a separate alarm
link_send_neighbours();
struct overlay_frame *frame=emalloc_zero(sizeof(struct overlay_frame));
frame->type=OF_TYPE_DATA;
@ -611,9 +655,7 @@ static void link_send(struct sched_ent *alarm)
ob_checkpoint(frame->payload);
int pos = ob_position(frame->payload);
if (link_send_neighbours(frame->payload)==0){
enum_subscribers(NULL, append_link, frame->payload);
}
enum_subscribers(NULL, append_link, frame->payload);
ob_rewind(frame->payload);
@ -622,8 +664,10 @@ static void link_send(struct sched_ent *alarm)
else if (overlay_payload_enqueue(frame))
op_free(frame);
alarm->deadline = alarm->alarm;
schedule(alarm);
if (neighbours){
alarm->deadline = alarm->alarm;
schedule(alarm);
}
}
static void update_alarm(time_ms_t limit){
@ -671,6 +715,19 @@ int link_state_interface_has_neighbour(struct overlay_interface *interface)
return 0;
}
// when we receive a packet from a neighbour with ourselves as the next hop, make sure we send an ack soon(ish)
int link_state_ack_soon(struct subscriber *subscriber){
IN();
struct neighbour *neighbour = get_neighbour(subscriber, 1);
time_ms_t now = gettime_ms();
if (neighbour->next_neighbour_update > now + 80){
neighbour->next_neighbour_update = now + 80;
}
update_alarm(neighbour->next_neighbour_update);
OUT();
return 0;
}
// track stats for receiving packets from this neighbour
int link_received_packet(struct subscriber *subscriber, struct overlay_interface *interface, int sender_interface, int sender_seq, int unicast)
{
@ -681,12 +738,10 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
struct neighbour *neighbour = get_neighbour(subscriber, 1);
struct neighbour_link *link=get_neighbour_link(neighbour, interface, sender_interface, unicast);
time_ms_t now = gettime_ms();
time_ms_t next_update = neighbour->next_neighbour_update;
neighbour->ack_counter --;
// for now we'll use a simple time based link up/down flag + dropped packet count
if (sender_seq >=0){
if (link->ack_sequence != -1){
int offset = (link->ack_sequence - 1 - sender_seq)&0xFF;
@ -706,31 +761,39 @@ int link_received_packet(struct subscriber *subscriber, struct overlay_interface
DEBUGF("LINK STATE; missed seq %d from %s on %s",
link->ack_sequence, alloca_tohex_sid(subscriber->sid), interface->name);
link->ack_mask = link->ack_mask << 1;
next_update = now+100;
neighbour->ack_counter --;
neighbour->next_neighbour_update = now + 10;
if (neighbour->ack_counter <=0){
neighbour_find_best_link(neighbour);
send_neighbour_link(neighbour);
}
}
}
}else
link->ack_sequence = sender_seq;
}
// force an update soon when we need to ack packets
if (neighbour->ack_counter <=0)
next_update = now+10;
// force an update when we start hearing a new neighbour link
if (link->link_timeout < now)
next_update = now;
if (next_update < neighbour->next_neighbour_update){
neighbour->next_neighbour_update = next_update;
if (link->link_timeout < now){
if (neighbour->next_neighbour_update > now + 10);
neighbour->next_neighbour_update = now + 10;
}
update_alarm(neighbour->next_neighbour_update);
link->link_timeout = now + (interface->tick_ms *5);
// force an update soon when we need to ack packets
if (neighbour->ack_counter <=0){
neighbour_find_best_link(neighbour);
send_neighbour_link(neighbour);
}
update_alarm(neighbour->next_neighbour_update);
return 0;
}
// parse incoming link details
int link_receive(overlay_mdp_frame *mdp)
{
IN();
struct overlay_buffer *payload = ob_static(mdp->out.payload, mdp->out.payload_length);
ob_limitsize(payload, mdp->out.payload_length);
@ -800,7 +863,7 @@ int link_receive(overlay_mdp_frame *mdp)
continue;
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; record - %s, %s, %d, %d, %d, %d",
DEBUGF("LINK STATE; record - %s, %s, %d, %d, %x, %d",
receiver?alloca_tohex_sid(receiver->sid):"NULL",
transmitter?alloca_tohex_sid(transmitter->sid):"NULL",
interface_id,
@ -832,39 +895,27 @@ int link_receive(overlay_mdp_frame *mdp)
// they can hear us? we can route through them!
if (neighbour->neighbour_link_timeout < now)
version = link->link_version;
if (neighbour->neighbour_link_timeout < now || version<0){
changed = 1;
version++;
}
neighbour->neighbour_link_timeout = now + interface->tick_ms * 5;
version = link->link_version;
if (drop_rate != link->drop_rate || transmitter != link->transmitter)
version++;
// process new nacks
if (ack_seq != link->last_ack_seq){
int i;
for (i=0;i<32;i++){
int nack_seq = (ack_seq -1 -i)&0xFF;
if (nack_seq == link->last_ack_seq)
break;
// process acks / nacks
overlay_queue_ack(sender, interface, ack_mask, ack_seq);
if (!(ack_mask & (1<<i))){
overlay_queue_nack(sender, interface, nack_seq);
if (config.debug.verbose && config.debug.linkstate)
DEBUGF("LINK STATE; neighbour %s missed seq %d from %s",
alloca_tohex_sid(sender->sid), nack_seq, interface->name);
}
}
}
// process ack
overlay_queue_ack(sender, interface, ack_seq);
link->last_ack_seq = ack_seq;
}
if (link->transmitter != transmitter || link->link_version != version){
changed = 1;
link->transmitter = transmitter;
link->link_version = version;
link->link_version = version & 0xFF;
link->interface = interface;
link->drop_rate = drop_rate;
// TODO other link attributes...
@ -884,7 +935,7 @@ int link_receive(overlay_mdp_frame *mdp)
schedule(&link_send_alarm);
}
}
OUT();
return 0;
}

View File

@ -517,7 +517,7 @@ int single_packet_encapsulation(struct overlay_buffer *b, struct overlay_frame *
int overlay_packet_init_header(int encapsulation,
struct decode_context *context, struct overlay_buffer *buff,
struct subscriber *destination,
char unicast, char interface, char seq);
char unicast, char interface, int seq);
int overlay_frame_build_header(struct decode_context *context, struct overlay_buffer *buff,
int queue, int type, int modifiers, int ttl,
struct broadcast *broadcast, struct subscriber *next_hop,
@ -540,8 +540,7 @@ int overlay_payload_enqueue(struct overlay_frame *p);
int overlay_queue_remaining(int queue);
int overlay_queue_schedule_next(time_ms_t next_allowed_packet);
int overlay_send_tick_packet(struct overlay_interface *interface);
int overlay_queue_nack(struct subscriber *neighbour, struct overlay_interface *interface, int sequence);
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, int sequence);
int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq);
int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, time_ms_t now);
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
@ -573,8 +572,6 @@ typedef struct overlay_mdp_data_frame {
sockaddr_mdp src;
sockaddr_mdp dst;
uint16_t payload_length;
// temporary hack to improve reliability before implementing per-packet nack's
int send_copies;
int queue;
int ttl;
unsigned char payload[MDP_MTU-100];
@ -839,6 +836,7 @@ void link_interface_down(struct overlay_interface *interface);
int link_state_announce_links();
int link_state_legacy_ack(struct overlay_frame *frame, time_ms_t now);
int link_state_interface_has_neighbour(struct overlay_interface *interface);
int link_state_ack_soon(struct subscriber *sender);
int generate_nonce(unsigned char *nonce,int bytes);

View File

@ -363,6 +363,27 @@ test_multi_interface() {
wait_until multi_has_link $SIDA
}
doc_ping_unreliable="Ping over an unreliable link"
setup_ping_unreliable() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B add_interface 1
foreach_instance +A +B \
executeOk_servald config \
set interfaces.1.drop_broadcasts 20 \
set debug.overlayframes 1 \
set debug.rejecteddata 1
foreach_instance +A +B start_routing_instance
}
test_ping_unreliable() {
wait_until path_exists +A +B
wait_until path_exists +B +A
set_instance +A
executeOk_servald mdp ping $SIDB 40
tfw_cat --stdout --stderr
}
doc_unreliable_links="Choose a longer, better path over an unreliable link"
setup_unreliable_links() {
setup_servald

3
vomp.c
View File

@ -559,9 +559,6 @@ int vomp_received_audio(struct vomp_call_state *call, int audio_codec, int time,
bcopy(audio,&mdp.out.payload[(*len)],audio_length);
(*len)+=audio_length;
// send the payload more than once to add resilience to dropped packets
// TODO remove once network links have built in retries
mdp.out.send_copies=VOMP_MAX_RECENT_SAMPLES;
mdp.out.queue=OQ_ISOCHRONOUS_VOICE;
overlay_mdp_dispatch(&mdp,0,NULL,0);