From 9a78e16625a17575096b8d971422cc3fe62a61cc Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 19 Dec 2012 12:36:28 +1030 Subject: [PATCH] Use payload queue for periodic route advertisements --- overlay_advertise.c | 58 ++++++++++++++++++++++++++------------------- overlay_interface.c | 11 ++++----- overlay_payload.c | 6 ++++- overlay_queue.c | 40 ++++--------------------------- serval.h | 4 +++- 5 files changed, 50 insertions(+), 69 deletions(-) diff --git a/overlay_advertise.c b/overlay_advertise.c index 0085e66a..4c6b12cd 100644 --- a/overlay_advertise.c +++ b/overlay_advertise.c @@ -60,10 +60,13 @@ int overlay_route_please_advertise(overlay_node *n) else return 1; } -struct subscriber *next_advertisement=NULL; +struct advertisement_state{ + struct overlay_buffer *payload; + struct subscriber *next_advertisement; +}; int add_advertisement(struct subscriber *subscriber, void *context){ - struct overlay_buffer *e=context; + struct advertisement_state *state=context; if (subscriber->node){ overlay_node *n=subscriber->node; @@ -73,24 +76,23 @@ int add_advertisement(struct subscriber *subscriber, void *context){ // never send the full sid in an advertisement subscriber->send_full=0; - if (overlay_address_append(NULL,e,subscriber) || - ob_append_byte(e,n->best_link_score -1) || - ob_append_byte(e,n->observations[n->best_observation].gateways_en_route +1)){ + if (overlay_address_append(NULL,state->payload,subscriber) || + ob_append_byte(state->payload,n->best_link_score -1) || + ob_append_byte(state->payload,n->observations[n->best_observation].gateways_en_route +1)){ // stop if we run out of space, remember where we should start next time. - next_advertisement=subscriber; - ob_rewind(e); + state->next_advertisement=subscriber; + ob_rewind(state->payload); return 1; } - ob_checkpoint(e); + ob_checkpoint(state->payload); } } return 0; } -int overlay_route_add_advertisements(struct decode_context *context, overlay_interface *interface, - struct overlay_buffer *e) +int overlay_route_queue_advertisements(overlay_interface *interface) { /* Construct a route advertisement frame and append it to e. @@ -120,12 +122,19 @@ int overlay_route_add_advertisements(struct decode_context *context, overlay_int if (!my_subscriber) return WHY("Cannot advertise because I don't know who I am"); + struct overlay_frame *frame=malloc(sizeof(struct overlay_frame)); + bzero(frame,sizeof(struct overlay_frame)); + frame->type=OF_TYPE_NODEANNOUNCE; + frame->source = my_subscriber; + frame->ttl=1; + frame->queue=OQ_MESH_MANAGEMENT; + frame->destination_resolved=1; + frame->recvaddr=interface->broadcast_address; + frame->interface=interface; + frame->payload = ob_new(); + ob_limitsize(frame->payload, 400); - if (overlay_frame_build_header(context, e, - 0, OF_TYPE_NODEANNOUNCE, 0, 1, - NULL, NULL, - NULL, my_subscriber)) - return -1; + struct advertisement_state state={.payload = frame->payload,}; // TODO high priority advertisements first.... /* @@ -140,21 +149,22 @@ int overlay_route_add_advertisements(struct decode_context *context, overlay_int slots_used++; } */ - struct subscriber *start = next_advertisement; - next_advertisement=NULL; - - ob_checkpoint(e); - + ob_checkpoint(frame->payload); // append announcements starting from the last node we couldn't advertise last time - enum_subscribers(start, add_advertisement, e); + enum_subscribers(interface->next_advert, add_advertisement, &state); // if we didn't start at the beginning and still have space, start again from the beginning - if (start && !next_advertisement && ob_limit(e) - ob_position(e) > 0){ - enum_subscribers(NULL, add_advertisement, e); + if (interface->next_advert && !state.next_advertisement && ob_remaining(frame->payload) > 0){ + enum_subscribers(NULL, add_advertisement, &state); } - ob_patch_rfs(e); + interface->next_advert=state.next_advertisement; + ob_limitsize(frame->payload, ob_position(frame->payload)); + if (overlay_payload_enqueue(frame)){ + op_free(frame); + return -1; + } return 0; } diff --git a/overlay_interface.c b/overlay_interface.c index ff33058d..fb4b63c4 100644 --- a/overlay_interface.c +++ b/overlay_interface.c @@ -471,11 +471,8 @@ static void overlay_interface_poll(struct sched_ent *alarm) if (interface->state==INTERFACE_STATE_UP && interface->tick_ms>0){ // tick the interface time_ms_t now = gettime_ms(); - int i = (interface - overlay_interfaces); - if (overlay_tick_interface(i, now)) - alarm->alarm=limit_next_allowed(&overlay_interfaces[i].transfer_limit); - else - alarm->alarm=now+interface->tick_ms; + overlay_route_queue_advertisements(interface); + alarm->alarm=now+interface->tick_ms; alarm->deadline=alarm->alarm+interface->tick_ms/2; schedule(alarm); } @@ -615,8 +612,8 @@ void overlay_dummy_poll(struct sched_ent *alarm) interface->tick_ms>0 && (interface->last_tick_ms == -1 || now >= interface->last_tick_ms + interface->tick_ms)) { // tick the interface - int i = (interface - overlay_interfaces); - overlay_tick_interface(i, now); + overlay_route_queue_advertisements(interface); + interface->last_tick_ms=now; } schedule(alarm); diff --git a/overlay_payload.c b/overlay_payload.c index 047e19dc..6a495c43 100644 --- a/overlay_payload.c +++ b/overlay_payload.c @@ -99,9 +99,13 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa dump("payload contents", &p->payload->bytes[0],p->payload->position); } + struct broadcast *broadcast=NULL; + if ((!p->destination) && !is_all_matching(p->broadcast_id.id,BROADCAST_LEN,0)){ + broadcast = &p->broadcast_id; + } if (overlay_frame_build_header(context, headers, p->queue, p->type, p->modifiers, p->ttl, - (p->destination?NULL:&p->broadcast_id), p->next_hop, + broadcast, p->next_hop, p->destination, p->source)) goto cleanup; diff --git a/overlay_queue.c b/overlay_queue.c index 21d17091..e25e27c7 100644 --- a/overlay_queue.c +++ b/overlay_queue.c @@ -225,7 +225,7 @@ int overlay_payload_enqueue(struct overlay_frame *p) static void overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination, int unicast, - overlay_interface *interface, struct sockaddr_in addr, int tick){ + overlay_interface *interface, struct sockaddr_in addr){ packet->interface = interface; packet->i = (interface - overlay_interfaces); packet->dest=addr; @@ -237,17 +237,11 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati overlay_packet_init_header(&packet->context, packet->buffer, destination, unicast, packet->i, 0); packet->header_length = ob_position(packet->buffer); - if (tick){ - /* Add advertisements for ROUTES */ - overlay_route_add_advertisements(&packet->context, packet->interface, packet->buffer); - } } // update the alarm time and return 1 if changed static int overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){ - int ret=0; - do{ if (frame->destination_resolved) break; @@ -302,7 +296,6 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim struct overlay_frame *frame = queue->first; // TODO stop when the packet is nearly full? - while(frame){ if (frame->enqueued_at + queue->latencyTarget < now){ DEBUGF("Dropping frame type %x for %s due to expiry timeout", @@ -316,7 +309,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim */ // quickly skip payloads that have no chance of fitting - if (packet->buffer && ob_position(frame->payload) > ob_remaining(packet->buffer)) + if (packet->buffer && ob_limit(frame->payload) > ob_remaining(packet->buffer)) goto skip; if (!frame->destination_resolved){ @@ -399,10 +392,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim // can we send a packet on this interface now? if (limit_is_allowed(&frame->interface->transfer_limit)) goto skip; - + if (frame->source_full) my_subscriber->send_full=1; - overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->interface, frame->recvaddr, 0); + overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->interface, frame->recvaddr); }else{ // is this packet going our way? if (frame->interface!=packet->interface || memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0){ @@ -511,28 +504,3 @@ static void overlay_send_packet(struct sched_ent *alarm){ overlay_fill_send_packet(&packet, gettime_ms()); } - -int -overlay_tick_interface(int i, time_ms_t now) { - struct outgoing_packet packet; - IN(); - - if (overlay_interfaces[i].state!=INTERFACE_STATE_UP) { - RETURN(-1); - } - - if (limit_is_allowed(&overlay_interfaces[i].transfer_limit)){ - //WARN("Throttling has blocked a tick packet"); - RETURN(-1); - } - - if (config.debug.overlayinterfaces) DEBUGF("Ticking interface #%d",i); - - // initialise the packet buffer - bzero(&packet, sizeof(struct outgoing_packet)); - overlay_init_packet(&packet, NULL, 0, &overlay_interfaces[i], overlay_interfaces[i].broadcast_address, 1); - - /* Stuff more payloads from queues and send it */ - overlay_fill_send_packet(&packet, now); - RETURN(0); -} diff --git a/serval.h b/serval.h index aebce7f0..4138487a 100644 --- a/serval.h +++ b/serval.h @@ -369,6 +369,8 @@ typedef struct overlay_interface { These figures will be refined over time, and we will allow people to set them per-interface. */ unsigned tick_ms; /* milliseconds per tick */ + struct subscriber *next_advert; + int send_broadcasts; /* The time of the last tick on this interface in milli seconds */ time_ms_t last_tick_ms; @@ -514,7 +516,7 @@ int overlay_route_record_link( time_ms_t now, struct subscriber *to, struct subscriber *via,int sender_interface, unsigned int s1,unsigned int s2,int score,int gateways_en_route); int overlay_route_dump(); -int overlay_route_add_advertisements(struct decode_context *context, overlay_interface *interface, struct overlay_buffer *e); +int overlay_route_queue_advertisements(overlay_interface *interface); int ovleray_route_please_advertise(overlay_node *n); int overlay_route_saw_advertisements(int i, struct overlay_frame *f, struct decode_context *context, time_ms_t now);