Add deadline time for alarm prioritisation

This commit is contained in:
Jeremy Lakeman 2012-07-12 10:15:16 +09:30
parent 35b4ba8594
commit 27c24f377e
10 changed files with 168 additions and 62 deletions

102
fdqueue.c
View File

@ -27,6 +27,7 @@ struct pollfd fds[MAX_WATCHED_FDS];
int fdcount=0;
struct sched_ent *fd_callbacks[MAX_WATCHED_FDS];
struct sched_ent *next_alarm=NULL;
struct sched_ent *next_deadline=NULL;
struct profile_total poll_stats={NULL,0,"Idle (in poll)",0,0,0};
void list_alarms(){
@ -48,11 +49,46 @@ void list_alarms(){
fds[i].fd);
}
int deadline(struct sched_ent *alarm){
struct sched_ent *node = next_deadline, *last = NULL;
if (alarm->deadline < alarm->alarm)
alarm->deadline = alarm->alarm;
while(node!=NULL){
if (node->alarm > alarm->alarm)
break;
last = node;
node = node->_next;
}
if (last == NULL){
next_deadline = alarm;
}else{
last->_next=alarm;
}
alarm->_prev = last;
if(node!=NULL)
node->_prev = alarm;
alarm->_next = node;
return 0;
}
// add an alarm to the list of scheduled function calls.
// simply populate .alarm with the absolute time, and .function with the method to call.
// on calling .poll.revents will be zero.
int schedule(struct sched_ent *alarm){
struct sched_ent *node = next_alarm, *last = NULL;
if (!alarm->function)
return WHY("Can't schedule if you haven't set the function pointer");
if (alarm->deadline < alarm->alarm)
alarm->deadline = alarm->alarm;
// if the alarm has already expired, move straight to the deadline queue
if (alarm->alarm <= overlay_gettime_ms())
return deadline(alarm);
while(node!=NULL){
if (node->alarm > alarm->alarm)
break;
@ -82,6 +118,8 @@ int unschedule(struct sched_ent *alarm){
prev->_next = next;
else if(next_alarm==alarm)
next_alarm = next;
else if(next_deadline==alarm)
next_deadline = next;
if (next)
next->_prev = prev;
@ -93,6 +131,9 @@ int unschedule(struct sched_ent *alarm){
// start watching a file handle, call this function again if you wish to change the event mask
int watch(struct sched_ent *alarm){
if (!alarm->function)
return WHY("Can't watch if you haven't set the function pointer");
if (alarm->_poll_index>=0 && fd_callbacks[alarm->_poll_index]==alarm){
// updating event flags
INFOF("Updating watch %s, #%d for %d", (alarm->stats?alarm->stats->name:"Unnamed"), alarm->poll.fd, alarm->poll.events);
@ -130,46 +171,45 @@ int unwatch(struct sched_ent *alarm){
void call_alarm(struct sched_ent *alarm, int revents){
struct call_stats call_stats;
struct profile_total *stats = alarm->stats;
call_stats.totals = alarm->stats;
if (stats)
if (call_stats.totals)
fd_func_enter(&call_stats);
alarm->poll.revents = revents;
alarm->function(alarm);
if (stats)
fd_func_exit(&call_stats, stats);
}
int fd_checkalarms()
{
long long now=overlay_gettime_ms();
if (next_alarm!=NULL&&next_alarm->alarm <=now){
struct sched_ent *alarm = next_alarm;
unschedule(alarm);
call_alarm(alarm, 0);
now=overlay_gettime_ms();
}
if (next_alarm)
return next_alarm->alarm - now;
return 15000;
if (call_stats.totals)
fd_func_exit(&call_stats);
}
int fd_poll()
{
int i, r;
int ms=60000;
long long now=overlay_gettime_ms();
/* See if any alarms have expired before we do anything.
This also returns the time to the next alarm that is due. */
int ms=fd_checkalarms();
/* Make sure we don't have any silly timeouts that will make us wait for ever. */
/* move alarms that have elapsed to the deadline queue */
while (next_alarm!=NULL&&next_alarm->alarm <=now){
struct sched_ent *alarm = next_alarm;
unschedule(alarm);
deadline(alarm);
}
/* work out how long we can block in poll */
if (next_deadline)
ms = 0;
else if (next_alarm){
ms = next_alarm->alarm - now;
}
/* Make sure we don't have any silly timeouts that will make us wait forever. */
if (ms<0) ms=0;
/* Wait for action or timeout */
/* check if any file handles have activity */
{
struct call_stats call_stats;
call_stats.totals=&poll_stats;
fd_func_enter(&call_stats);
r = poll(fds, fdcount, ms);
if (debug & DEBUG_IO) {
@ -185,9 +225,18 @@ int fd_poll()
}
DEBUGF("poll(fds=(%s), fdcount=%d, ms=%d) = %d", strbuf_str(b), fdcount, ms, r);
}
fd_func_exit(&call_stats, &poll_stats);
fd_func_exit(&call_stats);
now=overlay_gettime_ms();
}
/* call one alarm function, but only if its deadline time has elapsed OR there is no file activity */
if (next_deadline && (next_deadline->deadline <=now || (r==0))){
struct sched_ent *alarm = next_deadline;
unschedule(alarm);
call_alarm(alarm, 0);
now=overlay_gettime_ms();
}
/* If file descriptors are ready, then call the appropriate functions */
if (r>0) {
for(i=0;i<fdcount;i++)
@ -199,6 +248,5 @@ int fd_poll()
set_block(fds[i].fd);
}
}
return 0;
}

View File

@ -118,18 +118,20 @@ int overlayServerMode()
/* Create structures to use 1MB of RAM for testing */
overlay_route_init(1);
#define SCHEDULE(X, Y) \
#define SCHEDULE(X, Y, D) \
struct sched_ent _sched_##X; \
struct profile_total _stats_##X; \
bzero(&_sched_##X, sizeof(struct sched_ent)); \
bzero(&_stats_##X, sizeof(struct profile_total)); \
_sched_##X.stats = &_stats_##X; \
_sched_##X.function=X;\
_stats_##X.name="" #X "";\
_sched_##X.alarm=overlay_gettime_ms()+Y;\
_sched_##X.deadline=_sched_##X.alarm+D;\
schedule(&_sched_##X);
/* Periodically check for server shut down */
SCHEDULE(server_shutdown_check, 0);
SCHEDULE(server_shutdown_check, 0, 100);
/* Setup up MDP & monitor interface unix domain sockets */
overlay_mdp_setup_sockets();
@ -142,16 +144,16 @@ schedule(&_sched_##X);
/* Pick next rhizome files to grab every few seconds
from the priority list continuously being built from observed
bundle announcements */
SCHEDULE(rhizome_enqueue_suggestions, rhizome_fetch_interval_ms);
SCHEDULE(rhizome_enqueue_suggestions, rhizome_fetch_interval_ms, rhizome_fetch_interval_ms*3);
/* Periodically check for new interfaces */
SCHEDULE(overlay_interface_discover, 1);
SCHEDULE(overlay_interface_discover, 1, 100);
/* Periodically update route table. */
SCHEDULE(overlay_route_tick, 100);
SCHEDULE(overlay_route_tick, 100, 100);
/* Show CPU usage stats periodically */
SCHEDULE(fd_periodicstats, 3000);
SCHEDULE(fd_periodicstats, 3000, 500);
#undef SCHEDULE

View File

@ -237,6 +237,7 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
// run the first tick asap
I(alarm.alarm)=overlay_gettime_ms();
I(alarm.deadline)=I(alarm.alarm)+10;
schedule(&I(alarm));
return 0;
@ -307,6 +308,7 @@ int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockadd
// schedule an alarm for this interface
I(alarm.function)=overlay_dummy_poll;
I(alarm.alarm)=overlay_gettime_ms()+10;
I(alarm.deadline)=I(alarm.alarm);
dummy_poll_stats.name="overlay_dummy_poll";
I(alarm.stats)=&dummy_poll_stats;
schedule(&I(alarm));
@ -335,6 +337,7 @@ void overlay_interface_poll(struct sched_ent *alarm)
int i = (interface - overlay_interfaces);
overlay_tick_interface(i, now);
alarm->alarm=now+interface->tick_ms;
alarm->deadline=alarm->alarm+interface->tick_ms/2;
schedule(alarm);
return;
}
@ -369,7 +372,7 @@ void overlay_dummy_poll(struct sched_ent *alarm)
unsigned char transaction_id[8];
unsigned long long now = overlay_gettime_ms();
if (interface->last_tick_ms + interface->tick_ms <+ now){
if (interface->last_tick_ms + interface->tick_ms <= now){
// tick the interface
int i = (interface - overlay_interfaces);
overlay_tick_interface(i, now);
@ -379,6 +382,15 @@ void overlay_dummy_poll(struct sched_ent *alarm)
long long length=lseek(alarm->poll.fd,0,SEEK_END);
if (interface->offset>=length)
{
/* if there's no input, while we want to check for more soon,
we need to allow all other low priority alarms to fire first,
otherwise we'll dominate the scheduler without accomplishing anything */
alarm->alarm = overlay_gettime_ms()+20;
alarm->deadline = alarm->alarm + 10000;
if (alarm->alarm > interface->last_tick_ms + interface->tick_ms)
alarm->alarm = interface->last_tick_ms + interface->tick_ms;
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("At end of input on dummy interface %s", interface->name);
}
@ -414,9 +426,14 @@ void overlay_dummy_poll(struct sched_ent *alarm)
else
WARNF("Read %lld bytes from dummy interface", nread);
}
/* keep reading new packets as fast as possible,
but don't prevent other high priority alarms */
alarm->alarm = overlay_gettime_ms();
alarm->deadline = alarm->alarm + 200;
}
alarm->alarm = overlay_gettime_ms()+10;
schedule(alarm);
return ;
@ -645,6 +662,7 @@ void overlay_interface_discover(struct sched_ent *alarm){
}
alarm->alarm = overlay_gettime_ms()+5000;
alarm->deadline = alarm->alarm + 10000;
schedule(alarm);
return;
}

View File

@ -1314,6 +1314,7 @@ void overlay_route_tick(struct sched_ent *alarm)
/* Update callback interval based on how much work we have to do */
alarm->alarm = overlay_gettime_ms()+interval;
alarm->deadline = alarm->alarm+100;
schedule(alarm);
return;
}

View File

@ -25,16 +25,10 @@ struct call_stats *current_call=NULL;
void fd_clearstat(struct profile_total *s){
s->max_time = 0;
s->total_time = 0;
s->child_time = 0;
s->calls = 0;
}
void fd_update_stats(struct profile_total *s,long long elapsed)
{
s->total_time+=elapsed;
if (elapsed>s->max_time) s->max_time=elapsed;
s->calls++;
}
int fd_tallystats(struct profile_total *total,struct profile_total *a)
{
total->total_time+=a->total_time;
@ -45,10 +39,13 @@ int fd_tallystats(struct profile_total *total,struct profile_total *a)
int fd_showstat(struct profile_total *total, struct profile_total *a)
{
INFOF("%lldms (%2.1f%%) in %d calls (max %lldms, avg %.1fms) : %s",
a->total_time,a->total_time*100.0/total->total_time,
INFOF("%lldms (%2.1f%%) in %d calls (max %lldms, avg %.1fms, +child avg %.1fms) : %s",
a->total_time,
a->total_time*100.0/total->total_time,
a->calls,
a->max_time,a->total_time*1.00/a->calls,
a->max_time,
a->total_time*1.00/a->calls,
(a->total_time+a->child_time)*1.00/a->calls,
a->name);
return 0;
}
@ -170,9 +167,19 @@ void fd_periodicstats(struct sched_ent *alarm)
fd_showstats();
fd_clearstats();
alarm->alarm = overlay_gettime_ms()+3000;
alarm->deadline = alarm->alarm+1000;
schedule(alarm);
}
void dump_stack(){
struct call_stats *call = current_call;
while(call){
if (call->totals)
INFOF("%s",call->totals->name);
call=call->prev;
}
}
int fd_func_enter(struct call_stats *this_call)
{
this_call->enter_time=overlay_gettime_ms();
@ -182,26 +189,34 @@ int fd_func_enter(struct call_stats *this_call)
return 0;
}
int fd_func_exit(struct call_stats *this_call, struct profile_total *aggregate_stats)
int fd_func_exit(struct call_stats *this_call)
{
if (current_call != this_call)
WHYF("stack mismatch, exited through %s()",aggregate_stats->name);
WHYF("stack mismatch, exited through %s()",this_call->totals->name);
long long now = overlay_gettime_ms();
long long elapsed=now - this_call->enter_time;
current_call = this_call->prev;
if (!aggregate_stats->_initialised){
aggregate_stats->_initialised=1;
aggregate_stats->_next = stats_head;
fd_clearstat(aggregate_stats);
stats_head = aggregate_stats;
if (this_call->totals && !this_call->totals->_initialised){
this_call->totals->_initialised=1;
this_call->totals->_next = stats_head;
fd_clearstat(this_call->totals);
stats_head = this_call->totals;
}
if (current_call)
current_call->child_time+=elapsed;
fd_update_stats(aggregate_stats, (elapsed - this_call->child_time));
elapsed-=this_call->child_time;
if (this_call->totals){
this_call->totals->total_time+=elapsed;
this_call->totals->child_time+=this_call->child_time;
this_call->totals->calls++;
if (elapsed>this_call->totals->max_time) this_call->totals->max_time=elapsed;
}
return 0;
}

View File

@ -522,6 +522,7 @@ void rhizome_enqueue_suggestions(struct sched_ent *alarm)
candidate_count-=i;
}
alarm->alarm = overlay_gettime_ms() + rhizome_fetch_interval_ms;
alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms*3;
schedule(alarm);
return;
}
@ -673,6 +674,8 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
watch(&q->alarm);
/* And schedule a timeout alarm */
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
rhizome_file_fetch_queue_count++;
@ -734,6 +737,7 @@ void rhizome_fetch_write(rhizome_file_fetch_record *q){
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
q->request_ofs+=bytes;
if (q->request_ofs>=q->request_len) {
@ -780,6 +784,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
if (bytes>(q->file_len-q->file_ofs))
@ -855,6 +860,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
if (i<0) i=0;

View File

@ -231,6 +231,7 @@ void rhizome_client_poll(struct sched_ent *alarm)
if (bytes > 0) {
// reset inactivity timer
r->alarm.alarm = overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
r->alarm.deadline = r->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
unschedule(&r->alarm);
schedule(&r->alarm);
int i = r->request_length - 160;
@ -303,6 +304,7 @@ void rhizome_server_poll(struct sched_ent *alarm)
request->alarm.poll.fd=sock;
request->alarm.poll.events=POLLIN;
request->alarm.alarm = overlay_gettime_ms()+RHIZOME_IDLE_TIMEOUT;
request->alarm.deadline = request->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
// watch for the incoming http request
watch(&request->alarm);
// set an inactivity timeout to close the connection
@ -619,6 +621,7 @@ int rhizome_server_http_send_bytes(rhizome_http_request *r)
// reset inactivity timer
r->alarm.alarm = overlay_gettime_ms()+RHIZOME_IDLE_TIMEOUT;
r->alarm.deadline = r->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
unschedule(&r->alarm);
schedule(&r->alarm);

View File

@ -472,21 +472,23 @@ typedef struct overlay_frame {
#define CRYPT_SIGNED 2
#define CRYPT_PUBLIC 4
struct call_stats{
long long enter_time;
long long child_time;
struct call_stats *prev;
};
struct profile_total {
struct profile_total *_next;
int _initialised;
const char *name;
long long max_time;
long long total_time;
long long child_time;
int calls;
};
struct call_stats{
long long enter_time;
long long child_time;
struct profile_total *totals;
struct call_stats *prev;
};
struct sched_ent;
typedef void (*ALARM_FUNCP) (struct sched_ent *alarm);
@ -498,7 +500,10 @@ struct sched_ent{
ALARM_FUNCP function;
void *context;
struct pollfd poll;
// when we should first consider the alarm
long long alarm;
// the order we will prioritise the alarm
long long deadline;
struct profile_total *stats;
int _poll_index;
};
@ -1610,9 +1615,14 @@ void rhizome_server_poll(struct sched_ent *alarm);
int fd_clearstats();
int fd_showstats();
int fd_checkalarms();
int fd_func_exit(struct call_stats *this_call, struct profile_total *call_stats);
int fd_func_exit(struct call_stats *this_call);
int fd_func_enter(struct call_stats *this_call);
void dump_stack();
#define IN() static struct profile_total _aggregate_stats={NULL,0,__FUNCTION__,0,0,0}; struct call_stats _this_call; fd_func_enter(&_this_call);
#define OUT() fd_func_exit(&_this_call, &_aggregate_stats);
#define IN() static struct profile_total _aggregate_stats={NULL,0,__FUNCTION__,0,0,0}; \
struct call_stats _this_call; \
_this_call.totals=&_aggregate_stats; \
fd_func_enter(&_this_call);
#define OUT() fd_func_exit(&_this_call);
#define RETURN(X) { OUT() return(X); }

View File

@ -241,6 +241,7 @@ void server_shutdown_check(struct sched_ent *alarm)
}
if (alarm){
alarm->alarm = overlay_gettime_ms()+1000;
alarm->deadline = alarm->alarm+5000;
schedule(alarm);
}
}
@ -416,6 +417,7 @@ void signal_handler(int signal)
char buf[80];
signame(buf, sizeof(buf), signal);
INFOF("Caught %s", buf);
dump_stack();
switch (signal) {
case SIGQUIT:
serverCleanUp();

1
vomp.c
View File

@ -1447,5 +1447,6 @@ void vomp_process_tick(struct sched_ent *alarm){
monitor_tell_clients(msg, len, MONITOR_VOMP);
alarm->alarm = overlay_gettime_ms() + VOMP_CALL_STATUS_INTERVAL;
alarm->deadline = alarm->alarm + VOMP_CALL_STATUS_INTERVAL/2;
schedule(alarm);
}