From 36cee7e9bcd8968c9b2ef9ce2c20c8be1d797fd0 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 20 Nov 2012 16:41:06 +1030 Subject: [PATCH] Move payload queueing functions to separate .c file --- overlay.c | 25 +-- overlay_interface.c | 400 +--------------------------------- overlay_payload.c | 103 --------- overlay_queue.c | 519 ++++++++++++++++++++++++++++++++++++++++++++ serval.h | 38 +--- sourcefiles.mk | 1 + 6 files changed, 540 insertions(+), 546 deletions(-) create mode 100644 overlay_queue.c diff --git a/overlay.c b/overlay.c index 3af375ba..52bdbd6c 100644 --- a/overlay.c +++ b/overlay.c @@ -74,8 +74,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. int overlayMode=0; -overlay_txqueue overlay_tx[OQ_MAX]; - keyring_file *keyring=NULL; int overlayServerMode() @@ -94,28 +92,7 @@ int overlayServerMode() /* put initial identity in if we don't have any visible */ keyring_seed(keyring); - /* Set default congestion levels for queues */ - int i; - for(i=0;i= 80) + return WHY("quantity string >=80 characters"); + long long result; + if (str_to_ll_scaled(q, 10, &result, NULL)) + return result; + return WHYF("Illegal quantity: %s", alloca_str_toprint(q)); +} + int overlay_interface_arg(char *arg) { /* Parse an interface argument, of the form: @@ -694,7 +689,7 @@ void overlay_dummy_poll(struct sched_ent *alarm) return ; } -static int +int overlay_broadcast_ensemble(int interface_number, struct sockaddr_in *recipientaddr, unsigned char *bytes,int len) @@ -924,377 +919,6 @@ void overlay_interface_discover(struct sched_ent *alarm){ return; } -/* remove and free a payload from the queue */ -static struct overlay_frame * -overlay_queue_remove(overlay_txqueue *queue, struct overlay_frame *frame){ - struct overlay_frame *prev = frame->prev; - struct overlay_frame *next = frame->next; - if (prev) - prev->next = next; - else if(frame == queue->first) - queue->first = next; - - if (next) - next->prev = prev; - else if(frame == queue->last) - queue->last = prev; - - queue->length--; - - op_free(frame); - - return next; -} - -#if 0 /* unused */ -static int -overlay_queue_dump(overlay_txqueue *q) -{ - strbuf b = strbuf_alloca(8192); - struct overlay_frame *f; - strbuf_sprintf(b,"overlay_txqueue @ 0x%p\n",q); - strbuf_sprintf(b," length=%d\n",q->length); - strbuf_sprintf(b," maxLenght=%d\n",q->maxLength); - strbuf_sprintf(b," latencyTarget=%d milli-seconds\n",q->latencyTarget); - strbuf_sprintf(b," first=%p\n",q->first); - f=q->first; - while(f) { - strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n", - f,f->next,f->prev); - if (f==f->next) { - strbuf_sprintf(b," LOOP!\n"); break; - } - f=f->next; - } - strbuf_sprintf(b," last=%p\n",q->last); - f=q->last; - while(f) { - strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n", - f,f->next,f->prev); - if (f==f->prev) { - strbuf_sprintf(b," LOOP!\n"); break; - } - f=f->prev; - } - DEBUG(strbuf_str(b)); - return 0; -} -#endif // 0 - -static void -overlay_init_packet(struct outgoing_packet *packet, overlay_interface *interface, int tick){ - packet->interface = interface; - packet->i = (interface - overlay_interfaces); - packet->dest=interface->broadcast_address; - packet->buffer=ob_new(); - packet->add_advertisements=1; - ob_limitsize(packet->buffer, packet->interface->mtu); - ob_append_bytes(packet->buffer,magic_header,4); - - overlay_address_clear(); - - if (tick){ - /* 1. Send announcement about ourselves, including one SID that we host if we host more than one SID - (the first SID we host becomes our own identity, saving a little bit of data here). - */ - overlay_add_selfannouncement(packet->i, packet->buffer); - }else{ - // add a badly formatted dummy self announce payload to tell people we sent this. - ob_append_byte(packet->buffer, OF_TYPE_SELFANNOUNCE); - ob_append_byte(packet->buffer, 1); - ob_append_rfs(packet->buffer, SID_SIZE + 2); - - /* from me, to me, via me - (it's shorter than an actual broadcast, - and receivers wont try to process it - since its not going to have a payload body anyway) */ - overlay_address_append_self(interface, packet->buffer); - overlay_address_set_sender(my_subscriber); - ob_append_byte(packet->buffer, OA_CODE_PREVIOUS); - ob_append_byte(packet->buffer, OA_CODE_PREVIOUS); - - ob_patch_rfs(packet->buffer, COMPUTE_RFS_LENGTH); - } -} - -// update the alarm time and return 1 if changed -static int -overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){ - int ret=0; - time_ms_t send_time; - - // ignore packet if the destination is currently unreachable - if (frame->destination && subscriber_is_reachable(frame->destination)==REACHABLE_NONE) - return 0; - - // when is the next packet from this queue due? - send_time=queue->first->enqueued_at + queue->transmit_delay; - if (next_packet.alarm==0 || send_time < next_packet.alarm){ - next_packet.alarm=send_time; - ret = 1; - } - - // how long can we wait if the server is busy? - send_time += queue->grace_period; - if (next_packet.deadline==0 || send_time < next_packet.deadline){ - next_packet.deadline=send_time; - ret = 1; - } - if (!next_packet.function){ - next_packet.function=overlay_send_packet; - send_packet.name="overlay_send_packet"; - next_packet.stats=&send_packet; - } - return ret; -} - -static void -overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, time_ms_t now){ - struct overlay_frame *frame = queue->first; - - // TODO stop when the packet is nearly full? - - while(frame){ - if (frame->enqueued_at + queue->latencyTarget < now){ - DEBUGF("Dropping frame type %x for %s due to expiry timeout", - frame->type, frame->destination?alloca_tohex_sid(frame->destination->sid):"All"); - frame = overlay_queue_remove(queue, frame); - continue; - } - /* Note, once we queue a broadcast packet we are committed to sending it out every interface, - even if we hear it from somewhere else in the mean time - */ - - struct subscriber *next_hop = frame->destination; - - if (next_hop){ - switch(subscriber_is_reachable(next_hop)){ - case REACHABLE_NONE: - goto skip; - - case REACHABLE_INDIRECT: - next_hop=next_hop->next_hop; - frame->sendBroadcast=0; - break; - - case REACHABLE_DEFAULT_ROUTE: - next_hop=directory_service; - frame->sendBroadcast=0; - break; - - case REACHABLE_DIRECT: - case REACHABLE_UNICAST: - frame->sendBroadcast=0; - break; - - case REACHABLE_BROADCAST: - if (!frame->sendBroadcast){ - if (frame->ttl>2) - frame->ttl=2; - frame->sendBroadcast=1; - if (is_all_matching(frame->broadcast_id.id, BROADCAST_LEN, 0)){ - overlay_broadcast_generate_address(&frame->broadcast_id); - // mark it as already seen so we don't immediately retransmit it - overlay_broadcast_drop_check(&frame->broadcast_id); - } - int i; - for(i=0;ibroadcast_sent_via[i]=0; - } - break; - } - } - - if (!packet->buffer){ - // use the interface of the first payload we find - if (frame->sendBroadcast){ - // find an interface that we haven't broadcast on yet - int i; - for(i=0;ibroadcast_sent_via[i]){ - overlay_init_packet(packet, &overlay_interfaces[i], 0); - break; - } - } - - if (!packet->buffer){ - // oh dear, why is this broadcast still in the queue? - frame = overlay_queue_remove(queue, frame); - continue; - } - }else{ - overlay_init_packet(packet, next_hop->interface, 0); - if (next_hop->reachable==REACHABLE_UNICAST){ - packet->unicast_subscriber = next_hop; - packet->dest = next_hop->address; - packet->unicast=1; - } - } - - }else{ - // make sure this payload can be sent via this interface - if (frame->sendBroadcast){ - if (frame->broadcast_sent_via[packet->i]){ - goto skip; - } - }else{ - if(packet->interface != next_hop->interface) - goto skip; - if (next_hop->reachable==REACHABLE_DIRECT && packet->unicast) - goto skip; - if (next_hop->reachable==REACHABLE_UNICAST && - ((!packet->unicast) || - packet->dest.sin_addr.s_addr != next_hop->address.sin_addr.s_addr)) - goto skip; - } - } - - if (debug&DEBUG_OVERLAYFRAMES){ - DEBUGF("Sending payload type %x len %d for %s via %s", frame->type, frame->payload->position, - frame->destination?alloca_tohex_sid(frame->destination->sid):"All", - frame->sendBroadcast?alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN):alloca_tohex_sid(next_hop->sid)); - } - - if (overlay_frame_append_payload(packet->interface, frame, next_hop, packet->buffer)) - // payload was not queued - goto skip; - - // don't send rhizome adverts if the packet contains a voice payload - if (frame->queue==OQ_ISOCHRONOUS_VOICE) - packet->add_advertisements=0; - - // mark the payload as sent - int keep_payload = 0; - - if (frame->sendBroadcast){ - int i; - frame->broadcast_sent_via[packet->i]=1; - - // check if there is still a broadcast to be sent - for(i=0;ibroadcast_sent_via[i]){ - keep_payload=1; - break; - } - } - }else{ - frame->send_copies --; - // ignore resend logic for unicast packets, where wifi gives better resilience - if (frame->send_copies>0 && !packet->unicast) - keep_payload=1; - } - - if (!keep_payload){ - frame = overlay_queue_remove(queue, frame); - continue; - } - - skip: - // if we can't send the payload now, check when we should try - overlay_calc_queue_time(queue, frame); - frame = frame->next; - } -} - -// fill a packet from our outgoing queues and send it -static int -overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) { - int i; - IN(); - // while we're looking at queues, work out when to schedule another packet - unschedule(&next_packet); - next_packet.alarm=0; - next_packet.deadline=0; - - for (i=0;ibuffer){ - // send the packet - if (packet->buffer->position>=HEADERFIELDS_LEN){ - // stuff rhizome announcements at the last moment - if (packet->add_advertisements) - overlay_rhizome_add_advertisements(packet->i,packet->buffer); - - if (debug&DEBUG_PACKETCONSTRUCTION) - dump("assembled packet",&packet->buffer->bytes[0],packet->buffer->position); - - if (overlay_broadcast_ensemble(packet->i, &packet->dest, packet->buffer->bytes, packet->buffer->position)){ - // sendto failed. We probably don't have a valid route - if (packet->unicast_subscriber){ - set_reachable(packet->unicast_subscriber, REACHABLE_NONE); - } - } - } - ob_free(packet->buffer); - overlay_address_clear(); - RETURN(1); - } - RETURN(0); -} - -// when the queue timer elapses, send a packet -void overlay_send_packet(struct sched_ent *alarm){ - struct outgoing_packet packet; - bzero(&packet, sizeof(struct outgoing_packet)); - - overlay_fill_send_packet(&packet, gettime_ms()); -} - -// update time for next alarm and reschedule -void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame){ - if (overlay_calc_queue_time(queue, frame)){ - unschedule(&next_packet); - schedule(&next_packet); - } -} - -static int -overlay_tick_interface(int i, time_ms_t now) { - struct outgoing_packet packet; - IN(); - - /* An interface with no speed budget is for listening only, so doesn't get ticked */ - if (overlay_interfaces[i].bits_per_second<1 - || overlay_interfaces[i].state!=INTERFACE_STATE_UP) { - RETURN(0); - } - - if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Ticking interface #%d",i); - - // initialise the packet buffer - bzero(&packet, sizeof(struct outgoing_packet)); - overlay_init_packet(&packet, &overlay_interfaces[i], 1); - - /* Add advertisements for ROUTES */ - overlay_route_add_advertisements(packet.interface, packet.buffer); - - /* Stuff more payloads from queues and send it */ - overlay_fill_send_packet(&packet, now); - RETURN(0); -} - -static long long -parse_quantity(char *q) -{ - if (strlen(q) >= 80) - return WHY("quantity string >=80 characters"); - long long result; - if (str_to_ll_scaled(q, 10, &result, NULL)) - return result; - return WHYF("Illegal quantity: %s", alloca_str_toprint(q)); -} - static void logServalPacket(int level, struct __sourceloc __whence, const char *message, const unsigned char *packet, size_t len) { struct mallocbuf mb = STRUCT_MALLOCBUF_NULL; diff --git a/overlay_payload.c b/overlay_payload.c index 2dab22d3..4adc3eb7 100644 --- a/overlay_payload.c +++ b/overlay_payload.c @@ -156,26 +156,6 @@ cleanup: return -1; } -int dump_queue(char *msg,int q) -{ - overlay_txqueue *qq=&overlay_tx[q]; - DEBUGF("Contents of TX queue #%d (%s):",q,msg); - DEBUGF(" length=%d, maxLength=%d",qq->length,qq->maxLength); - struct overlay_frame *f=qq->first,*l=qq->last; - DEBUGF(" head of queue = %p, tail of queue = %p", f, l); - struct overlay_frame *n=f; - int count=0; - while(n) { - DEBUGF(" queue entry #%d : prev=%p, next=%p", count,n->prev,n->next); - if (n==n->next) { - WHY(" ERROR: loop in queue"); - return -1; - } - n=n->next; - } - return 0; -} - int dump_payload(struct overlay_frame *p, char *message) { DEBUGF( "+++++\nFrame from %s to %s of type 0x%02x %s:", @@ -187,89 +167,6 @@ int dump_payload(struct overlay_frame *p, char *message) return 0; } -int overlay_payload_enqueue(struct overlay_frame *p) -{ - /* Add payload p to queue q. - - Queues get scanned from first to last, so we should append new entries - on the end of the queue. - - Complain if there are too many frames in the queue. - */ - - if (!p) return WHY("Cannot queue NULL"); - - if (p->destination){ - int r = subscriber_is_reachable(p->destination); - if (r == REACHABLE_SELF || r == REACHABLE_NONE) - return WHYF("Destination %s is unreachable (%d)", alloca_tohex_sid(p->destination->sid), r); - } - - if (p->queue>=OQ_MAX) - return WHY("Invalid queue specified"); - - overlay_txqueue *queue = &overlay_tx[p->queue]; - - if (debug&DEBUG_PACKETTX) - DEBUGF("Enqueuing packet for %s* (q[%d]length = %d)", - p->destination?alloca_tohex(p->destination->sid, 7): alloca_tohex(p->broadcast_id.id,BROADCAST_LEN), - p->queue, queue->length); - - if (p->payload && p->payload->position > p->payload->sizeLimit){ - // HACK, maybe should be done in each caller - // set the size of the payload based on the position written - p->payload->sizeLimit=p->payload->position; - } - - if (queue->length>=queue->maxLength) - return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength); - - if (p->send_copies<=0) - p->send_copies=1; - else if(p->send_copies>5) - return WHY("Too many copies requested"); - - if (!p->destination){ - int i; - int drop=1; - - // hook to allow for flooding via olsr - olsr_send(p); - - // make sure there is an interface up that allows broadcasts - for(i=0;ibroadcast_sent_via[i]=0; - drop=0; - }else - p->broadcast_sent_via[i]=1; - } - - // just drop it now - if (drop) - return -1; - - p->sendBroadcast=1; - } - - struct overlay_frame *l=queue->last; - if (l) l->next=p; - p->prev=l; - p->next=NULL; - p->enqueued_at=gettime_ms(); - - queue->last=p; - if (!queue->first) queue->first=p; - queue->length++; - - overlay_update_queue_schedule(queue, p); - - if (0) dump_queue("after",p->queue); - - return 0; -} - int op_free(struct overlay_frame *p) { if (!p) return WHY("Asked to free NULL"); diff --git a/overlay_queue.c b/overlay_queue.c new file mode 100644 index 00000000..a7ee8f2a --- /dev/null +++ b/overlay_queue.c @@ -0,0 +1,519 @@ +#include "serval.h" +#include "overlay_buffer.h" +#include "overlay_packet.h" +#include "str.h" + +typedef struct overlay_txqueue { + struct overlay_frame *first; + struct overlay_frame *last; + int length; /* # frames in queue */ + int maxLength; /* max # frames in queue before we consider ourselves congested */ + + /* wait until first->enqueued_at+transmit_delay before trying to force the transmission of a packet */ + int transmit_delay; + + /* if servald is busy, wait this long before trying to force the transmission of a packet */ + int grace_period; + + /* Latency target in ms for this traffic class. + Frames older than the latency target will get dropped. */ + int latencyTarget; + + /* XXX Need to initialise these: + Real-time queue for voice (<200ms ?) + Real-time queue for video (<200ms ?) (lower priority than voice) + Ordinary service queue (<3 sec ?) + Rhizome opportunistic queue (infinity) + + (Mesh management doesn't need a queue, as each overlay packet is tagged with some mesh management information) + */ +} overlay_txqueue; + +overlay_txqueue overlay_tx[OQ_MAX]; + +unsigned char magic_header[]={/* Magic */ 'O',0x10, + /* Version */ 0x00,0x01}; + +struct outgoing_packet{ + overlay_interface *interface; + int i; + struct subscriber *unicast_subscriber; + int unicast; + int add_advertisements; + struct sockaddr_in dest; + struct overlay_buffer *buffer; +}; + +struct sched_ent next_packet; +struct profile_total send_packet; + +static void overlay_send_packet(struct sched_ent *alarm); +static void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame); + +int overlay_queue_init(){ + /* Set default congestion levels for queues */ + int i; + for(i=0;iprev; + struct overlay_frame *next = frame->next; + if (prev) + prev->next = next; + else if(frame == queue->first) + queue->first = next; + + if (next) + next->prev = prev; + else if(frame == queue->last) + queue->last = prev; + + queue->length--; + + op_free(frame); + + return next; +} + +static int +overlay_queue_dump(overlay_txqueue *q) +{ + strbuf b = strbuf_alloca(8192); + struct overlay_frame *f; + strbuf_sprintf(b,"overlay_txqueue @ 0x%p\n",q); + strbuf_sprintf(b," length=%d\n",q->length); + strbuf_sprintf(b," maxLenght=%d\n",q->maxLength); + strbuf_sprintf(b," latencyTarget=%d milli-seconds\n",q->latencyTarget); + strbuf_sprintf(b," first=%p\n",q->first); + f=q->first; + while(f) { + strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n", + f,f->next,f->prev); + if (f==f->next) { + strbuf_sprintf(b," LOOP!\n"); break; + } + f=f->next; + } + strbuf_sprintf(b," last=%p\n",q->last); + f=q->last; + while(f) { + strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n", + f,f->next,f->prev); + if (f==f->prev) { + strbuf_sprintf(b," LOOP!\n"); break; + } + f=f->prev; + } + DEBUG(strbuf_str(b)); + return 0; +} + +int overlay_payload_enqueue(struct overlay_frame *p) +{ + /* Add payload p to queue q. + + Queues get scanned from first to last, so we should append new entries + on the end of the queue. + + Complain if there are too many frames in the queue. + */ + + if (!p) return WHY("Cannot queue NULL"); + + if (p->destination){ + int r = subscriber_is_reachable(p->destination); + if (r == REACHABLE_SELF || r == REACHABLE_NONE) + return WHYF("Destination %s is unreachable (%d)", alloca_tohex_sid(p->destination->sid), r); + } + + if (p->queue>=OQ_MAX) + return WHY("Invalid queue specified"); + + overlay_txqueue *queue = &overlay_tx[p->queue]; + + if (debug&DEBUG_PACKETTX) + DEBUGF("Enqueuing packet for %s* (q[%d]length = %d)", + p->destination?alloca_tohex(p->destination->sid, 7): alloca_tohex(p->broadcast_id.id,BROADCAST_LEN), + p->queue, queue->length); + + if (p->payload && p->payload->position > p->payload->sizeLimit){ + // HACK, maybe should be done in each caller + // set the size of the payload based on the position written + p->payload->sizeLimit=p->payload->position; + } + + if (queue->length>=queue->maxLength) + return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength); + + if (p->send_copies<=0) + p->send_copies=1; + else if(p->send_copies>5) + return WHY("Too many copies requested"); + + if (!p->destination){ + int i; + int drop=1; + + // hook to allow for flooding via olsr + olsr_send(p); + + // make sure there is an interface up that allows broadcasts + for(i=0;ibroadcast_sent_via[i]=0; + drop=0; + }else + p->broadcast_sent_via[i]=1; + } + + // just drop it now + if (drop) + return -1; + + p->sendBroadcast=1; + } + + struct overlay_frame *l=queue->last; + if (l) l->next=p; + p->prev=l; + p->next=NULL; + p->enqueued_at=gettime_ms(); + + queue->last=p; + if (!queue->first) queue->first=p; + queue->length++; + + overlay_update_queue_schedule(queue, p); + + if (0) overlay_queue_dump(queue); + + return 0; +} + +static void +overlay_init_packet(struct outgoing_packet *packet, overlay_interface *interface, int tick){ + packet->interface = interface; + packet->i = (interface - overlay_interfaces); + packet->dest=interface->broadcast_address; + packet->buffer=ob_new(); + packet->add_advertisements=1; + ob_limitsize(packet->buffer, packet->interface->mtu); + ob_append_bytes(packet->buffer,magic_header,4); + + overlay_address_clear(); + + if (tick){ + /* 1. Send announcement about ourselves, including one SID that we host if we host more than one SID + (the first SID we host becomes our own identity, saving a little bit of data here). + */ + overlay_add_selfannouncement(packet->i, packet->buffer); + + /* Add advertisements for ROUTES */ + overlay_route_add_advertisements(packet->interface, packet->buffer); + + }else{ + // add a badly formatted dummy self announce payload to tell people we sent this. + ob_append_byte(packet->buffer, OF_TYPE_SELFANNOUNCE); + ob_append_byte(packet->buffer, 1); + ob_append_rfs(packet->buffer, SID_SIZE + 2); + + /* from me, to me, via me + (it's shorter than an actual broadcast, + and receivers wont try to process it + since its not going to have a payload body anyway) */ + overlay_address_append_self(interface, packet->buffer); + overlay_address_set_sender(my_subscriber); + ob_append_byte(packet->buffer, OA_CODE_PREVIOUS); + ob_append_byte(packet->buffer, OA_CODE_PREVIOUS); + + ob_patch_rfs(packet->buffer, COMPUTE_RFS_LENGTH); + } +} + +// update the alarm time and return 1 if changed +static int +overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){ + int ret=0; + time_ms_t send_time; + + // ignore packet if the destination is currently unreachable + if (frame->destination && subscriber_is_reachable(frame->destination)==REACHABLE_NONE) + return 0; + + // when is the next packet from this queue due? + send_time=queue->first->enqueued_at + queue->transmit_delay; + if (next_packet.alarm==0 || send_time < next_packet.alarm){ + next_packet.alarm=send_time; + ret = 1; + } + + // how long can we wait if the server is busy? + send_time += queue->grace_period; + if (next_packet.deadline==0 || send_time < next_packet.deadline){ + next_packet.deadline=send_time; + ret = 1; + } + if (!next_packet.function){ + next_packet.function=overlay_send_packet; + send_packet.name="overlay_send_packet"; + next_packet.stats=&send_packet; + } + return ret; +} + +static void +overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, time_ms_t now){ + struct overlay_frame *frame = queue->first; + + // TODO stop when the packet is nearly full? + + while(frame){ + if (frame->enqueued_at + queue->latencyTarget < now){ + DEBUGF("Dropping frame type %x for %s due to expiry timeout", + frame->type, frame->destination?alloca_tohex_sid(frame->destination->sid):"All"); + frame = overlay_queue_remove(queue, frame); + continue; + } + /* Note, once we queue a broadcast packet we are committed to sending it out every interface, + even if we hear it from somewhere else in the mean time + */ + + struct subscriber *next_hop = frame->destination; + + if (next_hop){ + switch(subscriber_is_reachable(next_hop)){ + case REACHABLE_NONE: + goto skip; + + case REACHABLE_INDIRECT: + next_hop=next_hop->next_hop; + frame->sendBroadcast=0; + break; + + case REACHABLE_DEFAULT_ROUTE: + next_hop=directory_service; + frame->sendBroadcast=0; + break; + + case REACHABLE_DIRECT: + case REACHABLE_UNICAST: + frame->sendBroadcast=0; + break; + + case REACHABLE_BROADCAST: + if (!frame->sendBroadcast){ + if (frame->ttl>2) + frame->ttl=2; + frame->sendBroadcast=1; + if (is_all_matching(frame->broadcast_id.id, BROADCAST_LEN, 0)){ + overlay_broadcast_generate_address(&frame->broadcast_id); + // mark it as already seen so we don't immediately retransmit it + overlay_broadcast_drop_check(&frame->broadcast_id); + } + int i; + for(i=0;ibroadcast_sent_via[i]=0; + } + break; + } + } + + if (!packet->buffer){ + // use the interface of the first payload we find + if (frame->sendBroadcast){ + // find an interface that we haven't broadcast on yet + int i; + for(i=0;ibroadcast_sent_via[i]){ + overlay_init_packet(packet, &overlay_interfaces[i], 0); + break; + } + } + + if (!packet->buffer){ + // oh dear, why is this broadcast still in the queue? + frame = overlay_queue_remove(queue, frame); + continue; + } + }else{ + overlay_init_packet(packet, next_hop->interface, 0); + if (next_hop->reachable==REACHABLE_UNICAST){ + packet->unicast_subscriber = next_hop; + packet->dest = next_hop->address; + packet->unicast=1; + } + } + + }else{ + // make sure this payload can be sent via this interface + if (frame->sendBroadcast){ + if (frame->broadcast_sent_via[packet->i]){ + goto skip; + } + }else{ + if(packet->interface != next_hop->interface) + goto skip; + if (next_hop->reachable==REACHABLE_DIRECT && packet->unicast) + goto skip; + if (next_hop->reachable==REACHABLE_UNICAST && + ((!packet->unicast) || + packet->dest.sin_addr.s_addr != next_hop->address.sin_addr.s_addr)) + goto skip; + } + } + + if (debug&DEBUG_OVERLAYFRAMES){ + DEBUGF("Sending payload type %x len %d for %s via %s", frame->type, frame->payload->position, + frame->destination?alloca_tohex_sid(frame->destination->sid):"All", + frame->sendBroadcast?alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN):alloca_tohex_sid(next_hop->sid)); + } + + if (overlay_frame_append_payload(packet->interface, frame, next_hop, packet->buffer)) + // payload was not queued + goto skip; + + // don't send rhizome adverts if the packet contains a voice payload + if (frame->queue==OQ_ISOCHRONOUS_VOICE) + packet->add_advertisements=0; + + // mark the payload as sent + int keep_payload = 0; + + if (frame->sendBroadcast){ + int i; + frame->broadcast_sent_via[packet->i]=1; + + // check if there is still a broadcast to be sent + for(i=0;ibroadcast_sent_via[i]){ + keep_payload=1; + break; + } + } + }else{ + frame->send_copies --; + // ignore resend logic for unicast packets, where wifi gives better resilience + if (frame->send_copies>0 && !packet->unicast) + keep_payload=1; + } + + if (!keep_payload){ + frame = overlay_queue_remove(queue, frame); + continue; + } + + skip: + // if we can't send the payload now, check when we should try + overlay_calc_queue_time(queue, frame); + frame = frame->next; + } +} + +// fill a packet from our outgoing queues and send it +static int +overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) { + int i; + IN(); + // while we're looking at queues, work out when to schedule another packet + unschedule(&next_packet); + next_packet.alarm=0; + next_packet.deadline=0; + + for (i=0;ibuffer){ + // send the packet + if (packet->buffer->position>=HEADERFIELDS_LEN){ + // stuff rhizome announcements at the last moment + if (packet->add_advertisements) + overlay_rhizome_add_advertisements(packet->i,packet->buffer); + + if (debug&DEBUG_PACKETCONSTRUCTION) + dump("assembled packet",&packet->buffer->bytes[0],packet->buffer->position); + + if (overlay_broadcast_ensemble(packet->i, &packet->dest, packet->buffer->bytes, packet->buffer->position)){ + // sendto failed. We probably don't have a valid route + if (packet->unicast_subscriber){ + set_reachable(packet->unicast_subscriber, REACHABLE_NONE); + } + } + } + ob_free(packet->buffer); + overlay_address_clear(); + RETURN(1); + } + RETURN(0); +} + +// when the queue timer elapses, send a packet +static void overlay_send_packet(struct sched_ent *alarm){ + struct outgoing_packet packet; + bzero(&packet, sizeof(struct outgoing_packet)); + + overlay_fill_send_packet(&packet, gettime_ms()); +} + +// update time for next alarm and reschedule +static void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame){ + if (overlay_calc_queue_time(queue, frame)){ + unschedule(&next_packet); + schedule(&next_packet); + } +} + +int +overlay_tick_interface(int i, time_ms_t now) { + struct outgoing_packet packet; + IN(); + + /* An interface with no speed budget is for listening only, so doesn't get ticked */ + if (overlay_interfaces[i].bits_per_second<1 + || overlay_interfaces[i].state!=INTERFACE_STATE_UP) { + RETURN(0); + } + + if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Ticking interface #%d",i); + + // initialise the packet buffer + bzero(&packet, sizeof(struct outgoing_packet)); + overlay_init_packet(&packet, &overlay_interfaces[i], 1); + + /* Stuff more payloads from queues and send it */ + overlay_fill_send_packet(&packet, now); + RETURN(0); +} \ No newline at end of file diff --git a/serval.h b/serval.h index 04487af1..9f67b60c 100644 --- a/serval.h +++ b/serval.h @@ -329,6 +329,7 @@ struct sched_ent{ }; struct overlay_buffer; +struct overlay_frame; #define STRUCT_SCHED_ENT_UNUSED ((struct sched_ent){NULL, NULL, NULL, NULL, {-1, 0, 0}, 0LL, 0LL, NULL, -1}) @@ -398,35 +399,6 @@ extern overlay_interface overlay_interfaces[OVERLAY_MAX_INTERFACES]; extern int overlay_last_interface_number; // used to remember where a packet came from extern unsigned int overlay_sequence_number; -typedef struct overlay_txqueue { - struct overlay_frame *first; - struct overlay_frame *last; - int length; /* # frames in queue */ - int maxLength; /* max # frames in queue before we consider ourselves congested */ - - /* wait until first->enqueued_at+transmit_delay before trying to force the transmission of a packet */ - int transmit_delay; - - /* if servald is busy, wait this long before trying to force the transmission of a packet */ - int grace_period; - - /* Latency target in ms for this traffic class. - Frames older than the latency target will get dropped. */ - int latencyTarget; - - /* XXX Need to initialise these: - Real-time queue for voice (<200ms ?) - Real-time queue for video (<200ms ?) (lower priority than voice) - Ordinary service queue (<3 sec ?) - Rhizome opportunistic queue (infinity) - - (Mesh management doesn't need a queue, as each overlay packet is tagged with some mesh management information) - */ -} overlay_txqueue; - - -extern overlay_txqueue overlay_tx[OQ_MAX]; - ssize_t recvwithttl(int sock, unsigned char *buffer, size_t bufferlen, int *ttl, struct sockaddr *recvaddr, socklen_t *recvaddrlen); // is the SID entirely 0xFF? @@ -478,8 +450,6 @@ int overlay_frame_append_payload(overlay_interface *interface, struct overlay_fr int overlay_interface_args(const char *arg); int overlay_rhizome_add_advertisements(int interface_number,struct overlay_buffer *e); int overlay_add_local_identity(unsigned char *s); -void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame); -void overlay_send_packet(struct sched_ent *alarm); extern int overlay_interface_count; @@ -669,6 +639,9 @@ int overlay_interface_register(char *name, struct in_addr mask); overlay_interface * overlay_interface_find(struct in_addr addr); overlay_interface * overlay_interface_find_name(const char *name); +int overlay_broadcast_ensemble(int interface_number, + struct sockaddr_in *recipientaddr, + unsigned char *bytes,int len); int directory_registration(); int directory_service_init(); @@ -762,6 +735,9 @@ void overlay_mdp_poll(struct sched_ent *alarm); void fd_periodicstats(struct sched_ent *alarm); void rhizome_check_connections(struct sched_ent *alarm); +int overlay_tick_interface(int i, time_ms_t now); +int overlay_queue_init(); + void monitor_client_poll(struct sched_ent *alarm); void monitor_poll(struct sched_ent *alarm); void rhizome_client_poll(struct sched_ent *alarm); diff --git a/sourcefiles.mk b/sourcefiles.mk index 4cc6281c..99bc56e1 100644 --- a/sourcefiles.mk +++ b/sourcefiles.mk @@ -26,6 +26,7 @@ SERVAL_SOURCES = $(SERVAL_BASE)audiodevices.c \ $(SERVAL_BASE)overlay_address.c \ $(SERVAL_BASE)overlay_buffer.c \ $(SERVAL_BASE)overlay_interface.c \ + $(SERVAL_BASE)overlay_queue.c \ $(SERVAL_BASE)overlay_mdp.c \ $(SERVAL_BASE)overlay_olsr.c \ $(SERVAL_BASE)overlay_packetformats.c \