Use payload queue for periodic route advertisements

This commit is contained in:
Jeremy Lakeman 2012-12-19 12:36:28 +10:30
parent 1b1af90924
commit 9a78e16625
5 changed files with 50 additions and 69 deletions

View File

@ -60,10 +60,13 @@ int overlay_route_please_advertise(overlay_node *n)
else return 1;
}
struct subscriber *next_advertisement=NULL;
struct advertisement_state{
struct overlay_buffer *payload;
struct subscriber *next_advertisement;
};
int add_advertisement(struct subscriber *subscriber, void *context){
struct overlay_buffer *e=context;
struct advertisement_state *state=context;
if (subscriber->node){
overlay_node *n=subscriber->node;
@ -73,24 +76,23 @@ int add_advertisement(struct subscriber *subscriber, void *context){
// never send the full sid in an advertisement
subscriber->send_full=0;
if (overlay_address_append(NULL,e,subscriber) ||
ob_append_byte(e,n->best_link_score -1) ||
ob_append_byte(e,n->observations[n->best_observation].gateways_en_route +1)){
if (overlay_address_append(NULL,state->payload,subscriber) ||
ob_append_byte(state->payload,n->best_link_score -1) ||
ob_append_byte(state->payload,n->observations[n->best_observation].gateways_en_route +1)){
// stop if we run out of space, remember where we should start next time.
next_advertisement=subscriber;
ob_rewind(e);
state->next_advertisement=subscriber;
ob_rewind(state->payload);
return 1;
}
ob_checkpoint(e);
ob_checkpoint(state->payload);
}
}
return 0;
}
int overlay_route_add_advertisements(struct decode_context *context, overlay_interface *interface,
struct overlay_buffer *e)
int overlay_route_queue_advertisements(overlay_interface *interface)
{
/* Construct a route advertisement frame and append it to e.
@ -120,12 +122,19 @@ int overlay_route_add_advertisements(struct decode_context *context, overlay_int
if (!my_subscriber)
return WHY("Cannot advertise because I don't know who I am");
struct overlay_frame *frame=malloc(sizeof(struct overlay_frame));
bzero(frame,sizeof(struct overlay_frame));
frame->type=OF_TYPE_NODEANNOUNCE;
frame->source = my_subscriber;
frame->ttl=1;
frame->queue=OQ_MESH_MANAGEMENT;
frame->destination_resolved=1;
frame->recvaddr=interface->broadcast_address;
frame->interface=interface;
frame->payload = ob_new();
ob_limitsize(frame->payload, 400);
if (overlay_frame_build_header(context, e,
0, OF_TYPE_NODEANNOUNCE, 0, 1,
NULL, NULL,
NULL, my_subscriber))
return -1;
struct advertisement_state state={.payload = frame->payload,};
// TODO high priority advertisements first....
/*
@ -140,21 +149,22 @@ int overlay_route_add_advertisements(struct decode_context *context, overlay_int
slots_used++;
}
*/
struct subscriber *start = next_advertisement;
next_advertisement=NULL;
ob_checkpoint(e);
ob_checkpoint(frame->payload);
// append announcements starting from the last node we couldn't advertise last time
enum_subscribers(start, add_advertisement, e);
enum_subscribers(interface->next_advert, add_advertisement, &state);
// if we didn't start at the beginning and still have space, start again from the beginning
if (start && !next_advertisement && ob_limit(e) - ob_position(e) > 0){
enum_subscribers(NULL, add_advertisement, e);
if (interface->next_advert && !state.next_advertisement && ob_remaining(frame->payload) > 0){
enum_subscribers(NULL, add_advertisement, &state);
}
ob_patch_rfs(e);
interface->next_advert=state.next_advertisement;
ob_limitsize(frame->payload, ob_position(frame->payload));
if (overlay_payload_enqueue(frame)){
op_free(frame);
return -1;
}
return 0;
}

View File

@ -471,11 +471,8 @@ static void overlay_interface_poll(struct sched_ent *alarm)
if (interface->state==INTERFACE_STATE_UP && interface->tick_ms>0){
// tick the interface
time_ms_t now = gettime_ms();
int i = (interface - overlay_interfaces);
if (overlay_tick_interface(i, now))
alarm->alarm=limit_next_allowed(&overlay_interfaces[i].transfer_limit);
else
alarm->alarm=now+interface->tick_ms;
overlay_route_queue_advertisements(interface);
alarm->alarm=now+interface->tick_ms;
alarm->deadline=alarm->alarm+interface->tick_ms/2;
schedule(alarm);
}
@ -615,8 +612,8 @@ void overlay_dummy_poll(struct sched_ent *alarm)
interface->tick_ms>0 &&
(interface->last_tick_ms == -1 || now >= interface->last_tick_ms + interface->tick_ms)) {
// tick the interface
int i = (interface - overlay_interfaces);
overlay_tick_interface(i, now);
overlay_route_queue_advertisements(interface);
interface->last_tick_ms=now;
}
schedule(alarm);

View File

@ -99,9 +99,13 @@ int overlay_frame_append_payload(struct decode_context *context, overlay_interfa
dump("payload contents", &p->payload->bytes[0],p->payload->position);
}
struct broadcast *broadcast=NULL;
if ((!p->destination) && !is_all_matching(p->broadcast_id.id,BROADCAST_LEN,0)){
broadcast = &p->broadcast_id;
}
if (overlay_frame_build_header(context, headers,
p->queue, p->type, p->modifiers, p->ttl,
(p->destination?NULL:&p->broadcast_id), p->next_hop,
broadcast, p->next_hop,
p->destination, p->source))
goto cleanup;

View File

@ -225,7 +225,7 @@ int overlay_payload_enqueue(struct overlay_frame *p)
static void
overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination, int unicast,
overlay_interface *interface, struct sockaddr_in addr, int tick){
overlay_interface *interface, struct sockaddr_in addr){
packet->interface = interface;
packet->i = (interface - overlay_interfaces);
packet->dest=addr;
@ -237,17 +237,11 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati
overlay_packet_init_header(&packet->context, packet->buffer, destination, unicast, packet->i, 0);
packet->header_length = ob_position(packet->buffer);
if (tick){
/* Add advertisements for ROUTES */
overlay_route_add_advertisements(&packet->context, packet->interface, packet->buffer);
}
}
// update the alarm time and return 1 if changed
static int
overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
int ret=0;
do{
if (frame->destination_resolved)
break;
@ -302,7 +296,6 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
struct overlay_frame *frame = queue->first;
// TODO stop when the packet is nearly full?
while(frame){
if (frame->enqueued_at + queue->latencyTarget < now){
DEBUGF("Dropping frame type %x for %s due to expiry timeout",
@ -316,7 +309,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
*/
// quickly skip payloads that have no chance of fitting
if (packet->buffer && ob_position(frame->payload) > ob_remaining(packet->buffer))
if (packet->buffer && ob_limit(frame->payload) > ob_remaining(packet->buffer))
goto skip;
if (!frame->destination_resolved){
@ -399,10 +392,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
// can we send a packet on this interface now?
if (limit_is_allowed(&frame->interface->transfer_limit))
goto skip;
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->interface, frame->recvaddr, 0);
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->interface, frame->recvaddr);
}else{
// is this packet going our way?
if (frame->interface!=packet->interface || memcmp(&packet->dest, &frame->recvaddr, sizeof(packet->dest))!=0){
@ -511,28 +504,3 @@ static void overlay_send_packet(struct sched_ent *alarm){
overlay_fill_send_packet(&packet, gettime_ms());
}
int
overlay_tick_interface(int i, time_ms_t now) {
struct outgoing_packet packet;
IN();
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP) {
RETURN(-1);
}
if (limit_is_allowed(&overlay_interfaces[i].transfer_limit)){
//WARN("Throttling has blocked a tick packet");
RETURN(-1);
}
if (config.debug.overlayinterfaces) DEBUGF("Ticking interface #%d",i);
// initialise the packet buffer
bzero(&packet, sizeof(struct outgoing_packet));
overlay_init_packet(&packet, NULL, 0, &overlay_interfaces[i], overlay_interfaces[i].broadcast_address, 1);
/* Stuff more payloads from queues and send it */
overlay_fill_send_packet(&packet, now);
RETURN(0);
}

View File

@ -369,6 +369,8 @@ typedef struct overlay_interface {
These figures will be refined over time, and we will allow people to set them per-interface.
*/
unsigned tick_ms; /* milliseconds per tick */
struct subscriber *next_advert;
int send_broadcasts;
/* The time of the last tick on this interface in milli seconds */
time_ms_t last_tick_ms;
@ -514,7 +516,7 @@ int overlay_route_record_link( time_ms_t now, struct subscriber *to,
struct subscriber *via,int sender_interface,
unsigned int s1,unsigned int s2,int score,int gateways_en_route);
int overlay_route_dump();
int overlay_route_add_advertisements(struct decode_context *context, overlay_interface *interface, struct overlay_buffer *e);
int overlay_route_queue_advertisements(overlay_interface *interface);
int ovleray_route_please_advertise(overlay_node *n);
int overlay_route_saw_advertisements(int i, struct overlay_frame *f, struct decode_context *context, time_ms_t now);