Schedule interface ticks per interface

This commit is contained in:
Jeremy Lakeman 2012-07-06 09:58:54 +09:30
parent 41f3228300
commit 35b4ba8594
5 changed files with 31 additions and 80 deletions

View File

@ -147,9 +147,6 @@ schedule(&_sched_##X);
/* Periodically check for new interfaces */
SCHEDULE(overlay_interface_discover, 1);
/* Start scheduling interface ticks */
SCHEDULE(overlay_check_ticks, 2);
/* Periodically update route table. */
SCHEDULE(overlay_route_tick, 100);

View File

@ -45,7 +45,8 @@ struct interface_rules *interface_filter=NULL;
struct profile_total interface_poll_stats;
struct profile_total dummy_poll_stats;
unsigned int overlay_sequence_number=0;
int overlay_tick_interface(int i, long long now);
/* Return milliseconds since server started. First call will always return zero.
Must use long long, not time_t, as time_t can be 32bits, which is too small for
@ -63,13 +64,6 @@ long long overlay_gettime_ms()
return now;
}
int overlay_update_sequence_number()
{
long long now=overlay_gettime_ms();
overlay_sequence_number=now&0xffffffff;
return 0;
}
int overlay_interface_type(char *s)
{
if (!strcasecmp(s,"ethernet")) return OVERLAY_INTERFACE_ETHERNET;
@ -240,6 +234,10 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
interface_poll_stats.name="overlay_interface_poll";
I(alarm.stats)=&interface_poll_stats;
watch(&I(alarm));
// run the first tick asap
I(alarm.alarm)=overlay_gettime_ms();
schedule(&I(alarm));
return 0;
@ -330,6 +328,16 @@ void overlay_interface_poll(struct sched_ent *alarm)
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
if (alarm->poll.revents==0){
// tick the interface
unsigned long long now = overlay_gettime_ms();
int i = (interface - overlay_interfaces);
overlay_tick_interface(i, now);
alarm->alarm=now+interface->tick_ms;
schedule(alarm);
return;
}
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
@ -359,7 +367,14 @@ void overlay_dummy_poll(struct sched_ent *alarm)
struct sockaddr src_addr;
size_t addrlen = sizeof(src_addr);
unsigned char transaction_id[8];
unsigned long long now = overlay_gettime_ms();
if (interface->last_tick_ms + interface->tick_ms <+ now){
// tick the interface
int i = (interface - overlay_interfaces);
overlay_tick_interface(i, now);
}
/* Read from dummy interface file */
long long length=lseek(alarm->poll.fd,0,SEEK_END);
if (interface->offset>=length)
@ -407,24 +422,6 @@ void overlay_dummy_poll(struct sched_ent *alarm)
return ;
}
int overlay_tx_messages()
{
/* Check out the various queues, and add payloads to a new frame and send it out. */
/* XXX We may want to throttle the maximum packets/sec or KB/sec */
/* How are we going to pick and choose things from the various priority queues?
We could simply pick the top item from each queue in round-robin until the
frame is filled. That would be a start. We could certainly get more intelligent
and stuff lots of little frames from a high priority queue in if that makes sense,
especially if they look like getting delayed a bit. Perhaps we just reserve the first
n bytes for the first queue, the first n+k bytes for the first two queues and so on?
*/
/* XXX Go through queue and separate into per-interface queues? */
return WHY("not implemented");
}
int overlay_broadcast_ensemble(int interface_number,
struct sockaddr_in *recipientaddr /* NULL == broadcast */,
unsigned char *bytes,int len)
@ -871,10 +868,9 @@ int overlay_tick_interface(int i, long long now)
DEBUGF("Sending %d byte tick packet",e->length);
if (overlay_broadcast_ensemble(i,NULL,e->bytes,e->length) != -1)
{
overlay_update_sequence_number();
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Successfully transmitted tick frame #%lld on interface #%d (%d bytes)",
(long long)overlay_sequence_number,i,e->length);
DEBUGF("Successfully transmitted tick frame on interface #%d (%d bytes)",
i,e->length);
/* De-queue the passengers who were aboard.
One round of marking, and then one round of culling from the queue. */
@ -955,46 +951,6 @@ int overlay_tick_interface(int i, long long now)
}
}
void overlay_check_ticks(struct sched_ent *alarm) {
/* Check if any interface(s) are due for a tick */
int i;
long long now = overlay_gettime_ms();
/* By default only tick once per day */
long long nexttick=now + 86400*1000;
/* Now check if the next tick time for the interfaces is no later than that time.
If so, trigger a tick on the interface. */
if (debug & DEBUG_OVERLAYINTERFACES) DEBUGF("Examining %d interfaces.",overlay_interface_count);
for(i = 0; i < overlay_interface_count; i++) {
/* Only tick live interfaces */
if (overlay_interfaces[i].observed > 0) {
if (debug & DEBUG_VERBOSE_IO) DEBUGF("Interface %s ticks every %dms, last at %lld.",
overlay_interfaces[i].name,
overlay_interfaces[i].tick_ms,
overlay_interfaces[i].last_tick_ms);
long long thistick=overlay_interfaces[i].last_tick_ms + overlay_interfaces[i].tick_ms;
if (now >= thistick) {
/* This interface is due for a tick */
overlay_tick_interface(i, now);
overlay_interfaces[i].last_tick_ms = now;
thistick=now + overlay_interfaces[i].tick_ms;
}
if (thistick<nexttick) nexttick=thistick;
} else
if (debug & DEBUG_VERBOSE_IO) DEBUGF("Interface %s is awol.", overlay_interfaces[i].name);
}
/* Update interval until next tick */
alarm->alarm = nexttick;
schedule(alarm);
return;
}
long long parse_quantity(char *q)
{
int m;

View File

@ -258,6 +258,7 @@ int overlay_add_selfannouncement(int interface,overlay_buffer *b)
*/
unsigned char *sid=overlay_get_my_sid();
long long now = overlay_gettime_ms();
/* Header byte */
if (ob_append_byte(b, OF_TYPE_SELFANNOUNCE))
@ -321,12 +322,12 @@ int overlay_add_selfannouncement(int interface,overlay_buffer *b)
overlay_abbreviate_set_most_recent_address(sid);
/* Sequence number range. Based on one tick per milli-second. */
overlay_update_sequence_number();
if (ob_append_int(b,overlay_interfaces[interface].last_tick_ms))
return WHY("Could not add low sequence number to self-announcement");
if (ob_append_int(b,overlay_sequence_number))
if (ob_append_int(b,now))
return WHY("Could not add high sequence number to self-announcement");
overlay_interfaces[interface].last_tick_ms=overlay_sequence_number;
overlay_interfaces[interface].last_tick_ms=now;
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("last tick seq# = %lld", overlay_interfaces[interface].last_tick_ms);

View File

@ -288,10 +288,9 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
nextinterface:
if (overlay_broadcast_ensemble(interface,NULL,b->bytes,b->length) != -1)
{
overlay_update_sequence_number();
if (debug&DEBUG_OVERLAYINTERFACES)
WHYF("Voice frame #%lld sent on interface #%d (%d bytes)",
(long long)overlay_sequence_number,interface,b->length);
WHYF("Voice frame sent on interface #%d (%d bytes)",
interface,b->length);
ob_free(b);
return 0;
} else {

2
serval.h Executable file → Normal file
View File

@ -1048,7 +1048,6 @@ int overlay_frame_set_neighbour_as_source(overlay_frame *f,overlay_neighbour *n)
int overlay_frame_set_neighbour_as_destination(overlay_frame *f,overlay_neighbour *n);
int overlay_frame_set_broadcast_as_destination(overlay_frame *f);
int overlay_broadcast_generate_address(unsigned char *a);
int overlay_update_sequence_number();
int packetEncipher(unsigned char *packet,int maxlen,int *len,int cryptoflags);
int overlayServerMode();
int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP);
@ -1592,7 +1591,6 @@ int unwatch(struct sched_ent *alarm);
int fd_poll();
void overlay_interface_discover(struct sched_ent *alarm);
void overlay_check_ticks(struct sched_ent *alarm);
void overlay_dummy_poll(struct sched_ent *alarm);
void overlay_route_tick(struct sched_ent *alarm);
void rhizome_enqueue_suggestions(struct sched_ent *alarm);