Revamped dequeuing of TXd frames.

Now almost works (queues don't stay full of junk), although
mdp ping still gets a duplicate reply for 2-hop pings.
This commit is contained in:
gardners 2012-04-14 23:12:45 +09:30
parent 2dab0e50d4
commit 2581e9fe4c
4 changed files with 72 additions and 52 deletions

View File

@ -424,7 +424,15 @@ int overlay_frame_process(int interface,overlay_frame *f)
overlay_rhizome_saw_advertisements(interface,f,now); overlay_rhizome_saw_advertisements(interface,f,now);
break; break;
case OF_TYPE_DATA: case OF_TYPE_DATA:
WHY("saw mdp containing frame"); if (0) {
WHY("saw mdp containing frame");
printf(" src = %s\n",overlay_render_sid(f->source));
printf(" nxt = %s\n",overlay_render_sid(f->nexthop));
printf(" dst = %s\n",overlay_render_sid(f->destination));
fflush(stdout);
dump("payload",&f->payload->bytes[0],f->payload->length);
fflush(stdout);
}
overlay_saw_mdp_containing_frame(interface,f,now); overlay_saw_mdp_containing_frame(interface,f,now);
break; break;
default: default:

View File

@ -577,11 +577,11 @@ int overlay_interface_discover()
int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,overlay_frame *pax[],int *frame_pax,int frame_max_pax) int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,overlay_frame *pax[],int *frame_pax,int frame_max_pax)
{ {
if (0) printf("Stuffing from queue #%d on interface #%d\n",q,i); if (1) printf("Stuffing from queue #%d on interface #%d\n",q,i);
overlay_frame **p=&overlay_tx[q].first; overlay_frame **p=&overlay_tx[q].first;
while(p&&*p) while(p&&*p)
{ {
if (0) printf("p=%p, *p=%p, queue=%d\n",p,*p,q); if (1) printf("p=%p, *p=%p, queue=%d\n",p,*p,q);
/* Throw away any stale frames */ /* Throw away any stale frames */
overlay_frame *pp=*p; overlay_frame *pp=*p;
@ -631,6 +631,7 @@ int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,
&next_hop_interface); &next_hop_interface);
if (!r) { if (!r) {
if (next_hop_interface==i) { if (next_hop_interface==i) {
printf("unicast pax %p\n",*p);
dontSend=0; } else { dontSend=0; } else {
if (0) if (0)
printf("Packet should go via interface #%d, but I am interface #%d\n",next_hop_interface,i); printf("Packet should go via interface #%d, but I am interface #%d\n",next_hop_interface,i);
@ -647,6 +648,7 @@ int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,
once they have been sent via all open interfaces (or gone stale) */ once they have been sent via all open interfaces (or gone stale) */
dontSend=0; dontSend=0;
(*p)->broadcast_sent_via[i]=1; (*p)->broadcast_sent_via[i]=1;
printf("broadcast pax %p\n",*p);
} }
if (dontSend==0) { if (dontSend==0) {
@ -656,11 +658,13 @@ int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,
{ {
/* Add payload to list of payloads we are sending with this frame so that we can dequeue them /* Add payload to list of payloads we are sending with this frame so that we can dequeue them
if we send them. */ if we send them. */
printf(" paxed#%d %p\n",*frame_pax,*p);
pax[(*frame_pax)++]=*p; pax[(*frame_pax)++]=*p;
} }
} }
p=&(*p)->next;
} }
/* Consider next in queue */
p=&(*p)->next;
} }
return 0; return 0;
} }
@ -676,8 +680,8 @@ int overlay_queue_dump(overlay_txqueue *q)
fprintf(stderr," first=%p\n",q->first); fprintf(stderr," first=%p\n",q->first);
f=q->first; f=q->first;
while(f) { while(f) {
fprintf(stderr," %p: ->next=%p, ->prev=%p\n", fprintf(stderr," %p: ->next=%p, ->prev=%p ->dequeue=%d\n",
f,f->next,f->prev); f,f->next,f->prev,f->dequeue);
if (f==f->next) { if (f==f->next) {
fprintf(stderr," LOOP!\n"); break; fprintf(stderr," LOOP!\n"); break;
} }
@ -764,59 +768,66 @@ int overlay_tick_interface(int i, long long now)
if (debug&DEBUG_OVERLAYINTERFACES) if (debug&DEBUG_OVERLAYINTERFACES)
fprintf(stderr,"Successfully transmitted tick frame #%lld on interface #%d (%d bytes)\n", fprintf(stderr,"Successfully transmitted tick frame #%lld on interface #%d (%d bytes)\n",
(long long)overlay_sequence_number,i,e->length); (long long)overlay_sequence_number,i,e->length);
/* De-queue the passengers who were aboard. */
/* De-queue the passengers who were aboard.
One round of marking, and then one round of culling from the queue. */
int j,q; int j,q;
/* Mark frames that can be dequeued */
for(j=0;j<frame_pax;j++)
{
overlay_frame *p=pax[j];
if (!p->isBroadcast)
p->dequeue=1;
else {
int i;
int workLeft=0;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].observed>0)
if (!p->broadcast_sent_via[i])
{
if (1)
fprintf(stderr,
"Frame still needs to be sent on interface #%d\n",i);
workLeft=1;
break;
}
}
if (!workLeft) p->dequeue=1;
}
}
/* Visit queues and dequeue all that we can */
for(q=0;q<OQ_MAX;q++) for(q=0;q<OQ_MAX;q++)
{ {
overlay_frame **p=&overlay_tx[q].first; overlay_frame **p=&overlay_tx[q].first;
for(j=0;j<frame_pax;j++) overlay_frame *t;
while(p&&(*p))
{ {
/* Skip any frames that didn't get queued */ if ((*p)->dequeue) {
while ((*p)&&(*p!=pax[j])) p=&(*p)->next; {
/* skip any broadcast frames that still have live t=*p;
interfaces left to send via */ *p=t->next;
if (p&&(*p)&&(*p)->isBroadcast) if (overlay_tx[q].last==t) overlay_tx[q].last=t->prev;
while (p&&(*p)&&((*p)->isBroadcast)) { if (overlay_tx[q].first==t) overlay_tx[q].first=t->next;
int i; if (t->prev) t->prev->next=t->next;
int workLeft=0; if (t->next) t->next->prev=t->prev;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++) if (debug&DEBUG_QUEUES)
{ {
if (overlay_interfaces[i].observed>0) fprintf(stderr,"** dequeued pax @ %p\n",t);
if (!(*p)->broadcast_sent_via[i]) overlay_queue_dump(&overlay_tx[q]);
{ if (1) fprintf(stderr,"Frame still needs to be sent on interface #%d\n",i);
workLeft=1; break; }
} }
if (!workLeft) { if (op_free(t)) {
if (1||debug&DEBUG_BROADCASTS)
WHY("Dequeueing broadcast frame that has been fully distributed");
break;
}
p=&(*p)->next;
}
/* Now get rid of this frame once we have found it */
if (*p) {
if (debug&DEBUG_QUEUES)
{
fprintf(stderr,"** dequeueing pax @ %p\n",*p);
overlay_queue_dump(&overlay_tx[q]); overlay_queue_dump(&overlay_tx[q]);
WHY("op_free() failed");
if (debug&DEBUG_QUEUES) exit(WHY("Queue structures corrupt"));
} }
*p=pax[j]->next; overlay_tx[q].length--;
if (overlay_tx[q].last==pax[j]) overlay_tx[q].last=pax[j]->prev;
if (overlay_tx[q].first==pax[j]) overlay_tx[q].first=pax[j]->next;
if (pax[j]->prev) pax[j]->prev->next=pax[j]->next;
if (pax[j]->next) pax[j]->next->prev=pax[j]->prev;
if (debug&DEBUG_QUEUES)
{
fprintf(stderr,"** dequeued pax @ %p\n",*p);
overlay_queue_dump(&overlay_tx[q]);
}
if (op_free(pax[j])) {
overlay_queue_dump(&overlay_tx[q]);
WHY("op_free() failed");
if (debug&DEBUG_QUEUES) sleep(3600);
} }
overlay_tx[q].length--;
} }
if (!(*p)) break;
p=&(*p)->next;
} }
} }
return 0; return 0;

View File

@ -322,7 +322,7 @@ int overlay_saw_mdp_containing_frame(int interface,overlay_frame *f,long long no
/* extract MDP port numbers */ /* extract MDP port numbers */
mdp.in.src.port=(b[2]<<24)+(b[3]<<16)+(b[4]<<8)+b[5]; mdp.in.src.port=(b[2]<<24)+(b[3]<<16)+(b[4]<<8)+b[5];
mdp.in.dst.port=(b[6]<<24)+(b[7]<<16)+(b[8]<<8)+b[9]; mdp.in.dst.port=(b[6]<<24)+(b[7]<<16)+(b[8]<<8)+b[9];
printf("mdp dst.port=%d, src.port=%d\n",mdp.in.dst.port,mdp.in.src.port); printf("RX mdp dst.port=%d, src.port=%d\n",mdp.in.dst.port,mdp.in.src.port);
mdp.in.payload_length=len-10; mdp.in.payload_length=len-10;
bcopy(&b[10],&mdp.in.payload[0],mdp.in.payload_length); bcopy(&b[10],&mdp.in.payload[0],mdp.in.payload_length);

View File

@ -515,6 +515,7 @@ typedef struct overlay_frame {
unsigned int modifiers; unsigned int modifiers;
unsigned char ttl; unsigned char ttl;
unsigned char dequeue;
/* Mark which interfaces the frame has been sent on, /* Mark which interfaces the frame has been sent on,
so that we can ensure that broadcast frames get sent so that we can ensure that broadcast frames get sent