Rework scheduler to allow for lazy but important alarms

This commit is contained in:
Jeremy Lakeman 2014-06-04 16:10:36 +09:30
parent 2a572d19bf
commit 6d4ad0e150
9 changed files with 255 additions and 190 deletions

325
fdqueue.c
View File

@ -58,143 +58,181 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
struct pollfd fds[MAX_WATCHED_FDS];
int fdcount=0;
struct sched_ent *fd_callbacks[MAX_WATCHED_FDS];
struct sched_ent *next_alarm=NULL;
struct sched_ent *next_deadline=NULL;
struct sched_ent *wake_list=NULL;
struct sched_ent *run_soon=NULL;
struct sched_ent *run_now=NULL;
struct profile_total poll_stats={NULL,0,"Idle (in poll)",0,0,0,0};
#define alloca_alarm_name(alarm) ((alarm)->stats ? alloca_str_toprint((alarm)->stats->name) : "Unnamed")
void list_alarms()
{
DEBUG("Alarms;");
time_ms_t now = gettime_ms();
struct sched_ent *alarm;
for (alarm = next_deadline; alarm; alarm = alarm->_next)
DEBUGF("%p %s deadline in %"PRId64"ms", alarm->function, alloca_alarm_name(alarm), alarm->deadline - now);
DEBUG("Run now;");
for (alarm = run_now; alarm; alarm=alarm->_next_run)
DEBUGF("%p %s deadline in %"PRId64"ms", alarm->function, alloca_alarm_name(alarm), alarm->run_before - now);
DEBUG("Run soon;");
for (alarm = run_soon; alarm; alarm=alarm->_next_run)
DEBUGF("%p %s run in %"PRId64"ms", alarm->function, alloca_alarm_name(alarm), alarm->run_after - now);
for (alarm = next_alarm; alarm; alarm = alarm->_next)
DEBUGF("%p %s in %"PRId64"ms, deadline in %"PRId64"ms", alarm->function, alloca_alarm_name(alarm), alarm->alarm - now, alarm->deadline - now);
DEBUG("Wake at;");
for (alarm = wake_list; alarm; alarm = alarm->_next_wake)
DEBUGF("%p %s wake in %"PRId64"ms", alarm->function, alloca_alarm_name(alarm), alarm->wake_at - now);
DEBUG("File handles;");
int i;
for (i = 0; i < fdcount; ++i)
DEBUGF("%s watching #%d", alloca_alarm_name(fd_callbacks[i]), fds[i].fd);
DEBUGF("%s watching #%d for %x", alloca_alarm_name(fd_callbacks[i]), fds[i].fd, fds[i].events);
}
int deadline(struct sched_ent *alarm)
static void insert_run_now(struct sched_ent *alarm)
{
struct sched_ent *node = next_deadline, *last = NULL;
if (alarm->deadline < alarm->alarm)
alarm->deadline = alarm->alarm;
struct sched_ent **list = &run_now;
while(node!=NULL){
if (node->deadline > alarm->deadline)
while(*list){
if ((*list)->run_before > alarm->run_before)
break;
last = node;
node = node->_next;
list = &(*list)->_next_run;
}
if (last == NULL){
next_deadline = alarm;
}else{
last->_next = alarm;
}
alarm->_prev = last;
if(node!=NULL)
node->_prev = alarm;
alarm->_next = node;
return 0;
alarm->_next_run = *list;
*list = alarm;
}
int is_scheduled(const struct sched_ent *alarm)
static void insert_run_soon(struct sched_ent *alarm)
{
return alarm->_next || alarm->_prev || alarm == next_alarm || alarm == next_deadline;
struct sched_ent **list = &run_soon;
while(*list){
if ((*list)->run_after > alarm->run_after)
break;
list = &(*list)->_next_run;
}
alarm->_next_run = *list;
*list = alarm;
}
static void remove_run_list(struct sched_ent *alarm, struct sched_ent **list)
{
while(*list){
if (*list==alarm){
*list = alarm->_next_run;
list = &alarm->_next_run;
alarm->_next_run=NULL;
return;
}else{
list = &(*list)->_next_run;
}
}
}
static void insert_wake_list(struct sched_ent *alarm)
{
if (alarm->wake_at == TIME_MS_NEVER_WILL)
return;
struct sched_ent **list = &wake_list, *last = NULL;
while(*list){
if ((*list)->wake_at > alarm->wake_at)
break;
last = (*list);
list = &last->_next_wake;
}
alarm->_next_wake = *list;
if (*list)
(*list)->_prev_wake = alarm;
alarm->_prev_wake = last;
*list = alarm;
}
static void remove_wake_list(struct sched_ent *alarm)
{
struct sched_ent *prev = alarm->_prev_wake;
struct sched_ent *next = alarm->_next_wake;
if (prev)
prev->_next_wake = next;
else if(wake_list==alarm)
wake_list = next;
if (next)
next->_prev_wake = prev;
alarm->_prev_wake = NULL;
alarm->_next_wake = NULL;
}
// move alarms from run_soon to run_now
static void move_run_list(){
time_ms_t now = gettime_ms();
while(run_soon && run_soon->run_after <= now){
struct sched_ent *alarm = run_soon;
run_soon = run_soon->_next_run;
remove_wake_list(alarm);
insert_run_now(alarm);
if (config.debug.io)
DEBUGF("Moved %s from run_soon to run_now", alloca_alarm_name(alarm));
}
}
// add an alarm to the list of scheduled function calls.
// simply populate .alarm with the absolute time, and .function with the method to call.
// on calling .poll.revents will be zero.
int _schedule(struct __sourceloc __whence, struct sched_ent *alarm)
void _schedule(struct __sourceloc __whence, struct sched_ent *alarm)
{
time_ms_t now = gettime_ms();
if (config.debug.io)
DEBUGF("schedule(alarm=%s) alarm=%.3f deadline=%.3f",
// TODO deprecate alarm and deadline, rename all uses to wake_at, run_before
alarm->wake_at = alarm->alarm;
alarm->run_before = alarm->deadline;
if (alarm->run_after == TIME_MS_NEVER_WILL || alarm->run_after==0)
alarm->run_after = alarm->wake_at;
if (config.debug.io){
time_ms_t now = gettime_ms();
DEBUGF("schedule(alarm=%s) run_after=%.3f wake_at=%.3f run_before=%.3f",
alloca_alarm_name(alarm),
(double)(alarm->alarm - now) / 1000,
(double)(alarm->deadline - now) / 1000
(double)(alarm->run_after - now) / 1000,
(double)(alarm->wake_at - now) / 1000,
(double)(alarm->run_before - now) / 1000
);
}
if (!alarm->stats)
WARN("schedule() called without supplying an alarm name");
struct sched_ent *node = next_alarm, *last = NULL;
WARN("schedule() called without supplying an alarm stats");
if (is_scheduled(alarm))
FATAL("Scheduling an alarm that is already scheduled");
assert(alarm->wake_at >= alarm->run_after);
assert(alarm->run_before >= alarm->run_after);
assert(!is_scheduled(alarm));
assert(alarm->function);
// TODO assert if the alarm times look odd? eg >1s ago or >1hr from now?
if (!alarm->function)
return WHY("Can't schedule if you haven't set the function pointer");
// don't bother to schedule an alarm that will (by definition) never run
if (alarm->alarm == TIME_MS_NEVER_WILL)
return 0;
if (alarm->deadline < alarm->alarm)
alarm->deadline = alarm->alarm;
if (now - alarm->deadline > 1000){
// 1000ms ago? thats silly, if you keep doing it noone else will get a turn.
FATALF("Alarm %s tried to schedule a deadline %"PRId64"ms ago",
alloca_alarm_name(alarm),
(now - alarm->deadline)
);
// not an error as it simplifies calling API use
if (alarm->run_after != TIME_MS_NEVER_WILL){
insert_wake_list(alarm);
insert_run_soon(alarm);
alarm->_scheduled=1;
}
// if the alarm has already expired, move straight to the deadline queue
if (alarm->alarm <= now)
return deadline(alarm);
while(node!=NULL){
if (node->alarm > alarm->alarm)
break;
last = node;
node = node->_next;
}
if (last == NULL){
next_alarm = alarm;
}else{
last->_next=alarm;
}
alarm->_prev = last;
if(node!=NULL)
node->_prev = alarm;
alarm->_next = node;
return 0;
}
// remove a function from the schedule before it has fired
// safe to unschedule twice...
int _unschedule(struct __sourceloc __whence, struct sched_ent *alarm)
void _unschedule(struct __sourceloc __whence, struct sched_ent *alarm)
{
if (!is_scheduled(alarm))
return;
if (config.debug.io)
DEBUGF("unschedule(alarm=%s)", alloca_alarm_name(alarm));
struct sched_ent *prev = alarm->_prev;
struct sched_ent *next = alarm->_next;
if (prev)
prev->_next = next;
else if(next_alarm==alarm)
next_alarm = next;
else if(next_deadline==alarm)
next_deadline = next;
if (next)
next->_prev = prev;
alarm->_prev = NULL;
alarm->_next = NULL;
return 0;
remove_run_list(alarm, &run_now);
remove_run_list(alarm, &run_soon);
remove_wake_list(alarm);
alarm->_scheduled=0;
alarm->run_after = TIME_MS_NEVER_WILL;
}
// start watching a file handle, call this function again if you wish to change the event mask
@ -289,38 +327,39 @@ static void call_alarm(struct sched_ent *alarm, int revents)
int fd_poll()
{
IN();
int i, r=0;
time_ms_t ms=60000;
time_ms_t now = gettime_ms();
if (!next_alarm && !next_deadline && fdcount==0)
// clear the run now list of any alarms that are overdue
if (run_now && run_now->run_before <= gettime_ms()){
struct sched_ent *alarm = run_now;
run_now = alarm->_next_run;
alarm->_scheduled=0;
alarm->run_after = TIME_MS_NEVER_WILL;
call_alarm(alarm, 0);
RETURN(1);
}
time_ms_t ms;
if (run_now){
ms=0;
}else if (wake_list){
ms = (wake_list->wake_at - gettime_ms());
if (ms<0)
ms = 0;
}else if(fdcount==0){
// nothing to do? we need to return instead of waiting forever.
RETURN(0);
/* move alarms that have elapsed to the deadline queue */
while (next_alarm!=NULL&&next_alarm->alarm <=now){
struct sched_ent *alarm = next_alarm;
unschedule(alarm);
deadline(alarm);
}else{
ms =-1;
}
/* work out how long we can block in poll */
if (next_deadline)
ms = 0;
else if (next_alarm){
ms = next_alarm->alarm - now;
}
/* Make sure we don't have any silly timeouts that will make us wait forever. */
if (ms<0) ms=0;
/* check if any file handles have activity */
{
// check for IO and/or wait for the next wake_at
int r=0;
if (fdcount || ms>0){
struct call_stats call_stats;
call_stats.totals=&poll_stats;
fd_func_enter(__HERE__, &call_stats);
if (fdcount==0){
if (ms)
sleep_ms(ms);
sleep_ms(ms);
}else{
r = poll(fds, fdcount, ms);
if (config.debug.io) {
@ -338,55 +377,47 @@ int fd_poll()
}
}
fd_func_exit(__HERE__, &call_stats);
now=gettime_ms();
}
// Reading new data takes priority over everything else
// Are any handles marked with POLLIN?
int in_count=0;
if (r>0){
for (i=0;i<fdcount;i++)
if (fds[i].revents & POLLIN)
in_count++;
}
/* call one alarm function, but only if its deadline time has elapsed OR there is no incoming file activity */
if (next_deadline && (next_deadline->deadline <=now || (in_count==0))){
struct sched_ent *alarm = next_deadline;
unschedule(alarm);
call_alarm(alarm, 0);
now=gettime_ms();
// after running a timed alarm, unless we already know there is data to read we want to check for more incoming IO before we send more outgoing.
if (in_count==0)
RETURN(1);
}
/* If file descriptors are ready, then call the appropriate functions */
move_run_list();
// We don't want a single alarm to be able to reschedule itself and starve all IO
// So we only check for new overdue alarms if we attempted to sleep
if (ms && run_now && run_now->run_before <= gettime_ms())
RETURN(1);
// process all watched IO handles once (we need to be fair)
if (r>0) {
int i;
for(i=fdcount -1;i>=0;i--){
if (fd_callbacks[i] && fd_callbacks[i]->poll.fd == fds[i].fd && fds[i].revents) {
// if any handles have POLLIN set, don't process any other handles
if (!(fds[i].revents&POLLIN || in_count==0))
continue;
int fd = fds[i].fd;
/* Call the alarm callback with the socket in non-blocking mode */
errno=0;
int fd = fds[i].fd;
set_nonblock(fd);
// Work around OSX behaviour that doesn't set POLLERR on
// devices that have been deconfigured, e.g., a USB serial adapter
// that has been removed.
if (errno == ENXIO) fds[i].revents|=POLLERR;
call_alarm(fd_callbacks[i], fds[i].revents);
/* The alarm may have closed and unwatched the descriptor, make sure this descriptor still matches */
// The alarm may have closed and unwatched the descriptor, make sure this descriptor still matches
if (i<fdcount && fds[i].fd == fd){
if (set_block(fds[i].fd))
FATALF("Alarm %p %s has a bad descriptor that wasn't closed!", fd_callbacks[i], alloca_alarm_name(fd_callbacks[i]));
}
}
}
// time may have passed while processing IO, or processing IO could trigger a new overdue alarm
move_run_list();
}else if (run_now){
// No IO, no overdue alarms but another alarm is runnable? run a single alarm before polling again
struct sched_ent *alarm = run_now;
run_now = alarm->_next_run;
alarm->_scheduled=0;
alarm->run_after = TIME_MS_NEVER_WILL;
call_alarm(alarm, 0);
}
RETURN(1);
OUT();
}

View File

@ -77,46 +77,70 @@ struct sched_ent;
typedef void (*ALARM_FUNCP) (struct sched_ent *alarm);
struct sched_ent{
struct sched_ent *_next;
struct sched_ent *_prev;
struct sched_ent *_next_wake;
struct sched_ent *_prev_wake;
struct sched_ent *_next_run;
uint8_t _scheduled;
ALARM_FUNCP function;
void *context;
struct pollfd poll;
// if the CPU is awake, you can run this function after this time
time_ms_t run_after;
// wake up the CPU at this time in order to run
time_ms_t wake_at;
// run this alarm in this order. if this time has passed, don't allow other IO
time_ms_t run_before;
// when we should first consider the alarm
time_ms_t alarm;
// the order we will prioritise the alarm
time_ms_t deadline;
struct profile_total *stats;
int _poll_index;
};
#define STRUCT_SCHED_ENT_UNUSED {.poll={.fd=-1}, ._poll_index=-1,}
#define STRUCT_SCHED_ENT_UNUSED {\
.poll={.fd=-1}, \
._poll_index=-1, \
.run_after=TIME_MS_NEVER_WILL, \
.alarm=TIME_MS_NEVER_WILL, \
.deadline=TIME_MS_NEVER_WILL, \
}
#define ALARM_STRUCT(X) _sched_##X
#define DECLARE_ALARM(X) \
extern struct sched_ent _sched_##X; \
extern struct sched_ent ALARM_STRUCT(X); \
void X(struct sched_ent *);
#define DEFINE_ALARM(X) \
void X(struct sched_ent *); \
struct profile_total _stats_##X = {.name=#X,}; \
struct sched_ent _sched_##X = { \
struct sched_ent ALARM_STRUCT(X) = { \
.poll={.fd=-1}, \
._poll_index=-1, \
.run_after=TIME_MS_NEVER_WILL, \
.alarm=TIME_MS_NEVER_WILL, \
.deadline=TIME_MS_NEVER_WILL, \
.stats = &_stats_##X, \
.function=X, \
};
#define RESCHEDULE_ALARM(X, A, D) \
#define RESCHEDULE(X, AFTER, WAIT, BEFORE) \
do{\
unschedule(&_sched_##X); \
_sched_##X.alarm=(A); \
_sched_##X.deadline=_sched_##X.alarm+(D); \
schedule(&_sched_##X); \
unschedule(X); \
(X)->run_after=(AFTER); \
(X)->alarm=(WAIT); \
(X)->deadline=(BEFORE); \
schedule(X); \
}while(0)
int is_scheduled(const struct sched_ent *alarm);
#define is_scheduled(X) ((X)->_scheduled)
int is_watching(struct sched_ent *alarm);
int _schedule(struct __sourceloc, struct sched_ent *alarm);
int _unschedule(struct __sourceloc, struct sched_ent *alarm);
void _schedule(struct __sourceloc, struct sched_ent *alarm);
void _unschedule(struct __sourceloc, struct sched_ent *alarm);
int _watch(struct __sourceloc, struct sched_ent *alarm);
int _unwatch(struct __sourceloc, struct sched_ent *alarm);
#define schedule(alarm) _schedule(__WHENCE__, alarm)

View File

@ -113,18 +113,15 @@ int overlayServerMode()
time_ms_t now = gettime_ms();
/* Periodically check for server shut down */
RESCHEDULE_ALARM(server_shutdown_check, now, 100);
/* Periodically reload configuration */
RESCHEDULE_ALARM(server_config_reload, now+config.server.config_reload_interval_ms, 100);
// Periodically check for server shut down
RESCHEDULE(&ALARM_STRUCT(server_shutdown_check), now, now+30000, now);
overlay_mdp_bind_internal_services();
olsr_init_socket();
/* Calculate (and possibly show) CPU usage stats periodically */
RESCHEDULE_ALARM(fd_periodicstats, now+3000, 500);
RESCHEDULE(&ALARM_STRUCT(fd_periodicstats), now+3000, now+30000, TIME_MS_NEVER_WILL);
cf_on_config_change();

View File

@ -1170,8 +1170,8 @@ static void overlay_mdp_scan(struct sched_ent *alarm)
}
if (state->current <= state->last){
alarm->alarm=gettime_ms()+500;
schedule(alarm);
time_ms_t now = gettime_ms();
RESCHEDULE(alarm, now+500, now+500, TIME_MS_NEVER_WILL);
}else{
DEBUG("Scan completed");
state->interface=NULL;
@ -1717,10 +1717,9 @@ static void overlay_mdp_poll(struct sched_ent *alarm)
interface->name, scans[i].last, scans[i].current);
continue;
}
scans[i].alarm.alarm=start;
scans[i].alarm.function=overlay_mdp_scan;
RESCHEDULE(&scans[i].alarm, start, start, start+500);
start+=100;
schedule(&scans[i].alarm);
}
}else{
struct overlay_interface *interface = overlay_interface_find(scan->addr, 1);
@ -1734,9 +1733,8 @@ static void overlay_mdp_poll(struct sched_ent *alarm)
scans[i].interface = interface;
scans[i].current = ntohl(scan->addr.s_addr);
scans[i].last = ntohl(scan->addr.s_addr);
scans[i].alarm.alarm=start;
scans[i].alarm.function=overlay_mdp_scan;
schedule(&scans[i].alarm);
RESCHEDULE(&scans[i].alarm, start, start, start+500);
}
}

View File

@ -1529,8 +1529,10 @@ int rhizome_store_manifest(rhizome_manifest *m)
m->version
);
monitor_announce_bundle(m);
if (serverMode)
RESCHEDULE_ALARM(rhizome_sync_announce, gettime_ms(), 10000);
if (serverMode){
time_ms_t now = gettime_ms();
RESCHEDULE(&ALARM_STRUCT(rhizome_sync_announce), now, now, TIME_MS_NEVER_WILL);
}
return 0;
}
rollback:

View File

@ -258,12 +258,15 @@ static struct link_state *get_link_state(struct subscriber *subscriber)
static void first_neighbour_found(){
// send rhizome sync periodically
RESCHEDULE_ALARM(rhizome_sync_announce, gettime_ms()+1000, 10000);
time_ms_t now = gettime_ms();
RESCHEDULE(&ALARM_STRUCT(rhizome_sync_announce),
now+1000, now+5000, TIME_MS_NEVER_WILL);
}
static void last_neighbour_gone(){
// stop trying to sync rhizome
RESCHEDULE_ALARM(rhizome_sync_announce, TIME_MS_NEVER_WILL, 0);
RESCHEDULE(&ALARM_STRUCT(rhizome_sync_announce),
TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
}
static struct neighbour *get_neighbour(struct subscriber *subscriber, char create)

View File

@ -208,9 +208,10 @@ void server_config_reload(struct sched_ent *alarm)
}
if (alarm) {
time_ms_t now = gettime_ms();
alarm->alarm = now + config.server.config_reload_interval_ms;
alarm->deadline = alarm->alarm + 100;
schedule(alarm);
RESCHEDULE(alarm,
now+config.server.config_reload_interval_ms,
TIME_MS_NEVER_WILL,
now+config.server.config_reload_interval_ms+100);
}
}
@ -269,9 +270,10 @@ void server_watchdog(struct sched_ent *alarm)
}
if (alarm) {
time_ms_t now = gettime_ms();
alarm->alarm = now + config.server.watchdog.interval_ms;
alarm->deadline = alarm->alarm + 100;
schedule(alarm);
RESCHEDULE(alarm,
now+config.server.watchdog.interval_ms,
now+config.server.watchdog.interval_ms,
now+100);
}
}
@ -286,15 +288,24 @@ void cf_on_config_change()
directory_service_init();
// check for interfaces at least once after config change
RESCHEDULE_ALARM(overlay_interface_discover, now, 100);
RESCHEDULE(&ALARM_STRUCT(overlay_interface_discover), now, now, now);
if (link_has_neighbours())
// send rhizome sync periodically
RESCHEDULE_ALARM(rhizome_sync_announce, now+1000, 10000);
RESCHEDULE(&ALARM_STRUCT(rhizome_sync_announce),
now+1000, now+1000, TIME_MS_NEVER_WILL);
if (config.server.watchdog.executable[0])
RESCHEDULE_ALARM(server_watchdog, now+config.server.watchdog.interval_ms, 100);
RESCHEDULE(&ALARM_STRUCT(server_watchdog),
now+config.server.watchdog.interval_ms,
now+config.server.watchdog.interval_ms,
now+100);
// Periodically check for modified configuration
RESCHEDULE(&ALARM_STRUCT(server_config_reload),
now+config.server.config_reload_interval_ms,
TIME_MS_NEVER_WILL,
now+config.server.config_reload_interval_ms+100);
}
/* Called periodically by the server process in its main loop.
@ -315,9 +326,7 @@ void server_shutdown_check(struct sched_ent *alarm)
}
}
if (alarm){
alarm->alarm = now + 1000;
alarm->deadline = alarm->alarm + 5000;
schedule(alarm);
RESCHEDULE(alarm, now+1000, now+30000, now+5000);
}
}

View File

@ -261,11 +261,11 @@ setup_ReloadConfigSetSync() {
set debug.mdprequests on \
set server.config_reload_interval_ms 600000 \
set server.motd "Abcdef"
conf_size=$(wc -l <"$SERVALINSTANCE_PATH/serval.conf")
assert [ "$conf_size" -gt 0 ]
start_servald_server
wait_until servald_http_server_started +A
get_servald_http_server_port PORTA +A
conf_size=$(wc -l <"$SERVALINSTANCE_PATH/serval.conf")
assert [ "$conf_size" -gt 0 ]
}
test_ReloadConfigSetSync() {
# Set the MOTD without changing the config file size and sync it, and the

1
vomp.c
View File

@ -387,6 +387,7 @@ static struct vomp_call_state *vomp_create_call(struct subscriber *remote,
call->last_activity=call->create_time;
call->alarm.alarm = call->create_time+VOMP_CALL_STATUS_INTERVAL;
call->alarm.deadline = call->alarm.alarm+10;
call->alarm.function = vomp_process_tick;
vomp_stats.name="vomp_process_tick";
call->alarm.stats=&vomp_stats;