Schedule packet sending

This commit is contained in:
Jeremy Lakeman 2012-07-12 10:41:47 +09:30
parent d36ba78afe
commit 5e915bcc09
4 changed files with 76 additions and 4 deletions

View File

@ -103,11 +103,21 @@ int overlayServerMode()
for(i=0;i<OQ_MAX;i++) { for(i=0;i<OQ_MAX;i++) {
overlay_tx[i].maxLength=100; overlay_tx[i].maxLength=100;
overlay_tx[i].latencyTarget=1000; /* Keep packets in queue for 1 second by default */ overlay_tx[i].latencyTarget=1000; /* Keep packets in queue for 1 second by default */
overlay_tx[i].transmit_delay=10; /* Hold onto packets for 10ms before trying to send a full packet */
overlay_tx[i].grace_period=100; /* Delay sending a packet for up to 100ms if servald has other processing to do */
} }
/* But expire voice/video call packets much sooner, as they just aren't any use if late */ /* expire voice/video call packets much sooner, as they just aren't any use if late */
overlay_tx[OQ_ISOCHRONOUS_VOICE].latencyTarget=500; overlay_tx[OQ_ISOCHRONOUS_VOICE].latencyTarget=500;
overlay_tx[OQ_ISOCHRONOUS_VIDEO].latencyTarget=500; overlay_tx[OQ_ISOCHRONOUS_VIDEO].latencyTarget=500;
/* try to send voice packets without any delay, and before other background processing */
overlay_tx[OQ_ISOCHRONOUS_VOICE].transmit_delay=0;
overlay_tx[OQ_ISOCHRONOUS_VOICE].grace_period=0;
/* opportunistic traffic can be significantly delayed */
overlay_tx[OQ_OPPORTUNISTIC].transmit_delay=200;
overlay_tx[OQ_OPPORTUNISTIC].grace_period=500;
/* Get the set of socket file descriptors we need to monitor. /* Get the set of socket file descriptors we need to monitor.
Note that end-of-file will trigger select(), so we cannot run select() if we Note that end-of-file will trigger select(), so we cannot run select() if we
have any dummy interfaces running. So we do an ugly hack of just waiting no more than have any dummy interfaces running. So we do an ugly hack of just waiting no more than

View File

@ -51,6 +51,9 @@ struct outgoing_packet{
overlay_buffer *buffer; overlay_buffer *buffer;
}; };
struct sched_ent next_packet;
struct profile_total send_packet;
int overlay_tick_interface(int i, long long now); int overlay_tick_interface(int i, long long now);
unsigned char magic_header[]={/* Magic */ 'O',0x10, unsigned char magic_header[]={/* Magic */ 'O',0x10,
@ -753,6 +756,37 @@ void overlay_init_packet(struct outgoing_packet *packet, int interface){
ob_append_bytes(packet->buffer,magic_header,4); ob_append_bytes(packet->buffer,magic_header,4);
} }
// update the alarm time and return 1 if changed
int overlay_calc_queue_time(overlay_txqueue *queue, overlay_frame *frame){
int ret=0;
long long int send_time;
if (frame->nexthop_address_status==OA_UNINITIALISED)
overlay_resolve_next_hop(frame);
if (frame->nexthop_address_status!=OA_RESOLVED)
return 0;
// when is the next packet from this queue due?
send_time=queue->first->enqueued_at + queue->transmit_delay;
if (next_packet.alarm==0 || send_time < next_packet.alarm){
next_packet.alarm=send_time;
ret = 1;
}
// how long can we wait if the server is busy?
send_time += queue->grace_period;
if (next_packet.deadline==0 || send_time < next_packet.deadline){
next_packet.deadline=send_time;
ret = 1;
}
if (!next_packet.function){
next_packet.function=overlay_send_packet;
send_packet.name="overlay_send_packet";
next_packet.stats=&send_packet;
}
return ret;
}
void overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, long long now){ void overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, long long now){
overlay_frame *frame = queue->first; overlay_frame *frame = queue->first;
@ -838,6 +872,8 @@ void overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue
} }
skip: skip:
// if we can't send the payload now, check when we should try
overlay_calc_queue_time(queue, frame);
frame = frame->next; frame = frame->next;
} }
} }
@ -846,6 +882,10 @@ void overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue
int overlay_fill_send_packet(struct outgoing_packet *packet, long long now){ int overlay_fill_send_packet(struct outgoing_packet *packet, long long now){
int i; int i;
IN(); IN();
// while we're looking at queues, work out when to schedule another packet
unschedule(&next_packet);
next_packet.alarm=0;
next_packet.deadline=0;
for (i=0;i<OQ_MAX;i++){ for (i=0;i<OQ_MAX;i++){
overlay_txqueue *queue=&overlay_tx[i]; overlay_txqueue *queue=&overlay_tx[i];
@ -853,6 +893,9 @@ int overlay_fill_send_packet(struct outgoing_packet *packet, long long now){
overlay_stuff_packet(packet, queue, now); overlay_stuff_packet(packet, queue, now);
} }
if (next_packet.alarm)
schedule(&next_packet);
if(packet->buffer){ if(packet->buffer){
// send the packet // send the packet
if (packet->buffer->length>=HEADERFIELDS_LEN){ if (packet->buffer->length>=HEADERFIELDS_LEN){
@ -870,13 +913,23 @@ int overlay_fill_send_packet(struct outgoing_packet *packet, long long now){
RETURN(0); RETURN(0);
} }
void overlay_send_packet(){ // when the queue timer elapses, send a packet
void overlay_send_packet(struct sched_ent *alarm){
struct outgoing_packet packet; struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet)); bzero(&packet, sizeof(struct outgoing_packet));
overlay_fill_send_packet(&packet, overlay_gettime_ms()); overlay_fill_send_packet(&packet, overlay_gettime_ms());
} }
// update time for next alarm and reschedule
void overlay_update_queue_schedule(overlay_txqueue *queue, overlay_frame *frame){
if (overlay_calc_queue_time(queue, frame)){
unschedule(&next_packet);
DEBUGF("Scheduled next packet in %dms", next_packet.alarm - overlay_gettime_ms());
schedule(&next_packet);
}
}
int overlay_tick_interface(int i, long long now) int overlay_tick_interface(int i, long long now)
{ {
struct outgoing_packet packet; struct outgoing_packet packet;

View File

@ -280,11 +280,13 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
if (!overlay_tx[q].first) overlay_tx[q].first=p; if (!overlay_tx[q].first) overlay_tx[q].first=p;
overlay_tx[q].length++; overlay_tx[q].length++;
overlay_update_queue_schedule(&overlay_tx[q], p);
if (0) dump_queue("after",q); if (0) dump_queue("after",q);
if (q==OQ_ISOCHRONOUS_VOICE&&(!forceBroadcastP)) { if (q==OQ_ISOCHRONOUS_VOICE&&(!forceBroadcastP)) {
// Send a packet now // Send a packet now
overlay_send_packet(); overlay_send_packet(NULL);
} }
return 0; return 0;

View File

@ -643,6 +643,12 @@ typedef struct overlay_txqueue {
int length; /* # frames in queue */ int length; /* # frames in queue */
int maxLength; /* max # frames in queue before we consider ourselves congested */ int maxLength; /* max # frames in queue before we consider ourselves congested */
/* wait until first->enqueued_at+transmit_delay before trying to force the transmission of a packet */
int transmit_delay;
/* if servald is busy, wait this long before trying to force the transmission of a packet */
int grace_period;
/* Latency target in ms for this traffic class. /* Latency target in ms for this traffic class.
Frames older than the latency target will get dropped. */ Frames older than the latency target will get dropped. */
int latencyTarget; int latencyTarget;
@ -864,7 +870,8 @@ int overlay_sendto(struct sockaddr_in *recipientaddr,unsigned char *bytes,int le
int overlay_rhizome_add_advertisements(int interface_number,overlay_buffer *e); int overlay_rhizome_add_advertisements(int interface_number,overlay_buffer *e);
int overlay_add_local_identity(unsigned char *s); int overlay_add_local_identity(unsigned char *s);
int overlay_address_is_local(unsigned char *s); int overlay_address_is_local(unsigned char *s);
void overlay_send_packet(); void overlay_update_queue_schedule(overlay_txqueue *queue, overlay_frame *frame);
void overlay_send_packet(struct sched_ent *alarm);
extern int overlay_interface_count; extern int overlay_interface_count;