new event scheduler almost working, but after a while can stop

calling overlay_check_ticks alarm for some reason, which causes
queues to congest and bad things to generally happen.
This commit is contained in:
gardners 2012-06-22 17:30:21 +09:30
parent 3e88400e16
commit 3b6a004cc9
6 changed files with 59 additions and 35 deletions

View File

@ -144,6 +144,7 @@ int fd_checkalarms()
TIMING_PAUSE();
for(i=0;i<alarmcount;i++)
{
now=overlay_gettime_ms();
if (alarms[i].next_alarm&&alarms[i].next_alarm<=now) {
_TIMING_CHECK(__FILE__,fd_funcname(alarms[i].func),-1);
alarms[i].func();

View File

@ -96,7 +96,7 @@ int overlayServerMode()
int i;
for(i=0;i<OQ_MAX;i++) {
overlay_tx[i].maxLength=100;
overlay_tx[i].latencyTarget=5000; /* Keep packets in queue for 5 seconds by default */
overlay_tx[i].latencyTarget=1000; /* Keep packets in queue for 1 second by default */
}
/* But expire voice/video call packets much sooner, as they just aren't any use if late */
overlay_tx[OQ_ISOCHRONOUS_VOICE].latencyTarget=500;
@ -286,7 +286,8 @@ int overlay_frame_process(int interface,overlay_frame *f)
if (!broadcast) {
if (overlay_get_nexthop(f->destination,f->nexthop,&len,
&f->nexthop_interface))
WHY("Could not find next hop for host - dropping frame");
WHYF("Could not find next hop for %s* - dropping frame",
overlay_render_sid_prefix(f->destination,7));
dontForward=1;
}
f->ttl--;

View File

@ -313,27 +313,30 @@ void overlay_interface_poll(int fd)
if (overlay_interfaces[i].fd!=fd) continue;
/* Read from UDP socket */
int recvttl=1;
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)|O_NONBLOCK);
plen=recvwithttl(overlay_interfaces[i].fd,packet,sizeof(packet),
&recvttl,&src_addr,&addrlen);
if (plen<1) {
/* No more packets */
return;
} else {
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,"Read from real interface",
packet,plen);
fflush(stderr);
}
if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d (%s)\n",plen,i,overlay_interfaces[i].name);
if (packetOk(i,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
serval_packetvisualise(stderr,"Malformed packet", packet,plen);
plen=1;
while (plen>0) {
int recvttl=1;
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)|O_NONBLOCK);
plen=recvwithttl(overlay_interfaces[i].fd,packet,sizeof(packet),
&recvttl,&src_addr,&addrlen);
if (plen<1) {
/* No more packets */
return;
} else {
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,"Read from real interface",
packet,plen);
fflush(stderr);
}
if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d (%s)\n",plen,i,overlay_interfaces[i].name);
if (packetOk(i,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
serval_packetvisualise(stderr,"Malformed packet", packet,plen);
}
}
}
return;
@ -822,7 +825,7 @@ int overlay_tick_interface(int i, long long now)
return 0;
}
if (debug&DEBUG_OVERLAYINTERFACES) fprintf(stderr,"Ticking interface #%d\n",i);
WHYF("Ticking interface #%d\n",i);
/* Get a buffer ready, and limit it's size appropriately.
XXX size limit should be reduced from MTU.
@ -929,8 +932,10 @@ int overlay_tick_interface(int i, long long now)
{
if ((*p)->dequeue) {
{
if (0) printf("dequeuing %p%s NOW\n",
*p,(*p)->isBroadcast?" (broadcast)":" (unicast)");
WHYF("dequeuing %s* -> %s* NOW (queue length=%d)",
overlay_render_sid_prefix((*p)->source,7),
overlay_render_sid_prefix((*p)->destination,7),
overlay_tx[q].length);
t=*p;
*p=t->next;
if (overlay_tx[q].last==t) overlay_tx[q].last=t->prev;
@ -979,7 +984,7 @@ void overlay_check_ticks(void) {
for(i = 0; i < overlay_interface_count; i++) {
/* Only tick live interfaces */
if (overlay_interfaces[i].observed > 0) {
if (debug & DEBUG_VERBOSE_IO) INFOF("Interface %s ticks every %dms, last at %lld.",
if (1||debug & DEBUG_VERBOSE_IO) INFOF("Interface %s ticks every %dms, last at %lld.",
overlay_interfaces[i].name,
overlay_interfaces[i].tick_ms,
overlay_interfaces[i].last_tick_ms);
@ -990,7 +995,7 @@ void overlay_check_ticks(void) {
overlay_interfaces[i].last_tick_ms = now;
}
} else
if (debug & DEBUG_VERBOSE_IO) INFOF("Interface %s is awol.", overlay_interfaces[i].name);
if (1||debug & DEBUG_VERBOSE_IO) INFOF("Interface %s is awol.", overlay_interfaces[i].name);
}
/* Update interval until next tick */
@ -1010,14 +1015,21 @@ long long overlay_time_until_next_tick()
for(i=0;i<overlay_interface_count;i++)
if (overlay_interfaces[i].observed>0)
{
if (debug&DEBUG_VERBOSE_IO) fprintf(stderr,"Interface %s ticks every %dms, last at T-%lldms.\n",overlay_interfaces[i].name,
overlay_interfaces[i].tick_ms,now-overlay_interfaces[i].last_tick_ms);
long long thistick=
overlay_interfaces[i].tick_ms
-(now-overlay_interfaces[i].last_tick_ms);
WHYF("Interface %s ticks every %dms, last at T-%lldms, next needed in %lldms.\n",
overlay_interfaces[i].name,
overlay_interfaces[i].tick_ms,now-overlay_interfaces[i].last_tick_ms,
thistick);
long long thistick=(overlay_interfaces[i].last_tick_ms+overlay_interfaces[i].tick_ms)-now;
if (thistick<0) thistick=0;
if (thistick<nexttick) nexttick=thistick;
WHYF("nexttick is now %lldms",nexttick);
}
WHYF("Next tick required in %lldms",nexttick);
return nexttick;
}

View File

@ -409,6 +409,12 @@ int overlay_saw_mdp_frame(int interface, overlay_mdp_frame *mdp,long long now)
send back a connection refused type message? Silence is probably the
more prudent path.
*/
WHYF("Received packet with listener (MDP ports: src=%s*:%d, dst=%d)",
overlay_render_sid_prefix(mdp->out.src.sid,7),
mdp->out.src.port,mdp->out.dst.port);
if ((!overlay_address_is_local(mdp->out.dst.sid))
&&(!overlay_address_is_broadcast(mdp->out.dst.sid)))
{
@ -448,7 +454,7 @@ int overlay_saw_mdp_frame(int interface, overlay_mdp_frame *mdp,long long now)
}
if (match>-1) {
struct sockaddr_un addr;
printf("unix domain socket '%s'\n",mdp_bindings_sockets[match]);
bcopy(mdp_bindings_sockets[match],&addr.sun_path[0],mdp_bindings_socket_name_lengths[match]);
addr.sun_family=AF_UNIX;
errno=0;

View File

@ -244,6 +244,9 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
Complain if there are too many frames in the queue.
*/
WHYF("Enqueuing packet for %s* (q[%d]length = %d)",
overlay_render_sid_prefix(p->destination,7),
q,overlay_tx[q].length);
if (q==OQ_ISOCHRONOUS_VOICE&&(!forceBroadcastP)) {
/* Dispatch voice data immediately.
Also tell Rhizome to back off a bit, so that voice traffic
@ -311,7 +314,8 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
if (0) dump_payload(p,"queued for delivery");
if (overlay_tx[q].length>=overlay_tx[q].maxLength) return WHY("Queue congested");
if (overlay_tx[q].length>=overlay_tx[q].maxLength)
return WHYF("Queue #%d congested (size = %d)",q,overlay_tx[q].maxLength);
if (0) dump_queue("before",q);

View File

@ -54,7 +54,7 @@ setup_servald_instance() {
setup_servald_instances() {
setup_servald
DUMMYNET=/tmp/dummy
DUMMYNET=`pwd`/dummy.dat
rm $DUMMYNET
touch $DUMMYNET
assert [ -e $DUMMYNET ]
@ -63,7 +63,7 @@ setup_servald_instances() {
setup_servald_instance +B $DUMMYNET
SIDB=$sid
# Now make sure that they can see each other
sleep 5 # Should be plenty of time
sleep 10 # Should be plenty of time
set_instance +A
echo "Dummynet file $DUMMYNET after 5 seconds: "`ls -l $DUMMYNET`
executeOk_servald id peers