Merge branch 'eventscheduler'

Conflicts:
	monitor.c
	serval.h
This commit is contained in:
gardners 2012-06-25 16:50:23 +09:30
commit b40a468276
18 changed files with 666 additions and 597 deletions

View File

@ -44,6 +44,7 @@ SERVALD_SRC_FILES = \
serval-dna/lsif.c \
serval-dna/dna_helper.c \
serval-dna/sighandlers.c \
serval-dna/fdqueue.c \
serval-dna/monitor.c \
serval-dna/monitor-cli.c \
serval-dna/codecs.c \

View File

@ -47,6 +47,7 @@ SRCS= main.c \
monitor-cli.c \
dna_helper.c \
sighandlers.c \
fdqueue.c \
codecs.c \
audiodevices.c \
audio_msm_g1.c \

View File

@ -693,6 +693,16 @@ app_server_start(int argc, const char *const *argv, struct command_line_option *
if (pid > 0) {
WHYF("Serval process already running (pid=%d)", pid);
/* BatPhone expects to see a pid here, anyway */
cli_puts("instancepath");
cli_delim(":");
cli_puts(serval_instancepath());
cli_delim("\n");
cli_puts("pid");
cli_delim(":");
cli_printf("%d", pid);
cli_delim("\n");
return 0;
}

365
fdqueue.c Normal file
View File

@ -0,0 +1,365 @@
/*
Serval Distributed Numbering Architecture (DNA)
Copyright (C) 2012 Paul Gardner-Stephen
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "serval.h"
#include <poll.h>
struct callback_stats {
long long max_time;
long long total_time;
int calls;
};
#define MAX_ALARMS 128
typedef struct callback_alarm {
void (*func)();
long long next_alarm;
long long repeat_every;
struct callback_stats stats;
} callback_alarm;
callback_alarm alarms[MAX_ALARMS];
int alarmcount=0;
#define MAX_WATCHED_FDS 128
struct pollfd fds[MAX_WATCHED_FDS];
int fdcount=0;
void(*fd_functions[MAX_WATCHED_FDS])(int fd);
struct callback_stats fd_stats[MAX_WATCHED_FDS];
struct callback_stats poll_stats={0,0,0};
/* @PGS/20120615 */
int last_valid=0;
int last_line;
const char *last_file;
const char *last_func;
long long last_time;
/* @PGS/20120615 */
void TIMING_PAUSE()
{
last_valid=0;
}
/* @PGS/20120615 */
void _TIMING_CHECK(const char *file,const char *func,int line)
{
long long now=overlay_gettime_ms();
if (last_valid) {
if (now-last_time>5) {
// More than 5ms spent in a given task, complain
char msg[1024];
snprintf(msg,1024,"Spent %lldms between %s:%d in %s() and here",
now-last_time,last_file,last_line,last_func);
logMessage(LOG_LEVEL_WARN,file,line,func,"%s",msg);
}
}
last_valid=1;
last_file=file;
last_func=func;
last_line=line;
last_time=now;
}
int fd_watch(int fd,void (*func)(int fd),int events)
{
if (fd<0||fd>=MAX_WATCHED_FDS)
return WHYF("Invalid file descriptor (%d) - must be between 0 and %d",
MAX_WATCHED_FDS-1);
if (fdcount>=MAX_WATCHED_FDS)
return WHYF("Currently watching too many file descriptors. This should never happen; report a bug.");
fds[fdcount].fd=fd;
fds[fdcount++].events=events;
if (func!=fd_functions[fd]) {
fd_stats[fd].max_time=0;
fd_stats[fd].total_time=0;
fd_stats[fd].calls=0;
}
fd_functions[fd]=func;
return 0;
}
int fd_teardown(int fd)
{
int i;
for(i=0;i<fdcount;i++)
if (fds[i].fd==fd) {
if (i<(fdcount-1)) {
/* Move last entry in list to this position, and wipe last entry in list */
fds[i]=fds[fdcount-1];
fds[fdcount-1].fd=-1;
fds[fdcount-1].events=0;
} else {
/* We are last entry in list, so just wipe */
fds[i].events=0;
fds[i].fd=-1;
}
fdcount--; i--;
}
close(fd);
return 0;
}
/* Automatically call a function every this many milli-seconds.
If repeat_every is zero, then the alarm will be a one-shot */
int fd_setalarm(void (*func),long long first_alarm_in,int repeat_every)
{
int i;
if (!func) return -1;
if (first_alarm_in<=0) first_alarm_in=repeat_every;
if (repeat_every<0) return -1;
for(i=0;i<alarmcount;i++)
{
if (func==alarms[i].func) break;
}
if (i>=MAX_ALARMS) return WHY("Too many alarms");
if (!first_alarm_in) {
/* remove old alarm */
alarms[i]=alarms[--alarmcount];
return 0;
} else {
/* Create new alarm, or update existing one */
if (alarms[i].func!=func) {
alarms[i].stats.calls=0;
alarms[i].stats.max_time=0;
alarms[i].stats.total_time=0;
}
alarms[i].func=func;
alarms[i].next_alarm=overlay_gettime_ms()+first_alarm_in;
alarms[i].repeat_every=repeat_every;
if (i>=alarmcount) alarmcount=i+1;
return 0;
}
}
void fd_update_stats(struct callback_stats *s,long long elapsed)
{
s->total_time+=elapsed;
if (elapsed>s->max_time) s->max_time=elapsed;
s->calls++;
}
int fd_checkalarms()
{
long long now=overlay_gettime_ms();
int i;
long long next_alarm_in=15000;
TIMING_PAUSE();
for(i=0;i<alarmcount;i++)
{
if (alarms[i].next_alarm&&alarms[i].next_alarm<=now) {
now=overlay_gettime_ms();
alarms[i].func();
long long elapsed=overlay_gettime_ms()-now;
fd_update_stats(&alarms[i].stats,elapsed);
if (!alarms[i].repeat_every) {
/* Alarm was one-shot, so erase alarm */
fd_setalarm(alarms[i].func,0,0);
i--;
continue;
} else
/* Alarm is repeating, so set next call */
alarms[i].next_alarm=now+alarms[i].repeat_every;
}
/* Work out if this alarm is next */
if (next_alarm_in>(alarms[i].next_alarm-now))
next_alarm_in=(alarms[i].next_alarm-now);
}
return next_alarm_in;
}
int fd_poll()
{
int i;
/* See if any alarms have expired before we do anything.
This also returns the time to the next alarm that is due. */
int ms=fd_checkalarms();
/* Make sure we don't have any silly timeouts that will make us wait for ever. */
if (ms<1) ms=1;
/* Wait for action or timeout */
long long now=overlay_gettime_ms();
int r=poll(fds, fdcount, ms);
long long elapsed=overlay_gettime_ms()-now;
fd_update_stats(&poll_stats,elapsed);
/* If file descriptors are ready, then call the appropriate functions */
if (r>0) {
for(i=0;i<fdcount;i++)
if (fds[i].revents) {
long long now=overlay_gettime_ms();
fd_functions[fds[i].fd](fds[i].fd);
long long elapsed=overlay_gettime_ms()-now;
fd_update_stats(&fd_stats[fds[i].fd],elapsed);
}
}
/* After all that action, we might have an alarm expire, so check the alarms
again */
fd_checkalarms();
return 0;
}
typedef struct func_descriptions {
void *addr;
char *description;
} func_descriptions;
func_descriptions func_names[]={
{overlay_check_ticks,"overlay_check_ticks"},
{overlay_dummy_poll,"overlay_dummy_poll"},
{overlay_interface_discover,"overlay_interface_discover"},
{overlay_route_tick,"overlay_route_tick"},
{rhizome_enqueue_suggestions,"rhizome_enqueue_suggestions"},
{server_shutdown_check,"server_shutdown_check"},
{monitor_client_poll,"monitor_client_poll"},
{monitor_poll,"monitor_poll"},
{overlay_interface_poll,"overlay_interface_poll"},
{overlay_mdp_poll,"overlay_mdp_poll"},
{rhizome_client_poll,"rhizome_client_poll"},
{rhizome_fetch_poll,"rhizome_fetch_poll"},
{rhizome_server_poll,"rhizome_server_poll"},
{fd_periodicstats,"fd_periodicstats"},
{vomp_tick,"vomp_tick"},
{NULL,NULL}
};
char *fd_funcname(void *addr)
{
int j;
char *funcname="unknown";
for(j=0;func_names[j].addr;j++)
if (func_names[j].addr==addr)
funcname=func_names[j].description;
return funcname;
}
int fd_list()
{
long long now=overlay_gettime_ms();
int i;
INFOF("List of timed callbacks:");
INFOF("------------------------");
for(i=0;i<alarmcount;i++) {
INFOF(alarms[i].repeat_every?"() in %lldms and every %lldms":"%s() in %lldms%*",
fd_funcname(alarms[i].func),
alarms[i].next_alarm-now,alarms[i].repeat_every);
}
INFOF("List of watched file descriptors:");
INFOF("---------------------------------");
for(i=0;i<fdcount;i++) {
char *eventdesc="<somethinged>";
if ((fds[i].events&POLL_IN)&&(fds[i].events&POLL_OUT))
eventdesc="read or written";
else if (fds[i].events&POLL_IN)
eventdesc="read";
else if (fds[i].events&POLL_OUT)
eventdesc="written";
INFOF("%s() when fd#%d can be %s",
fd_funcname(fd_functions[fds[i].fd]),fds[i].fd,eventdesc);
}
return 0;
}
int fd_tallystats(struct callback_stats *total,struct callback_stats *a)
{
total->total_time+=a->total_time;
total->calls+=a->calls;
if (a->max_time>total->max_time) total->max_time=a->max_time;
return 0;
}
int fd_showstat(struct callback_stats *total, struct callback_stats *a, char *msg)
{
WHYF("%lldms (%2.1f%%) in %d calls (max %lldms, avg %.1fms) : %s",
a->total_time,a->total_time*100.0/total->total_time,
a->calls,
a->max_time,a->total_time*1.00/a->calls,
msg);
return 0;
}
int fd_clearstat(struct callback_stats *s)
{
s->calls=0;
s->max_time=0;
s->total_time=0;
return 0;
}
int fd_clearstats()
{
int i;
fd_clearstat(&poll_stats);
for(i=0;i<alarmcount;i++)
fd_clearstat(&alarms[i].stats);
for(i=0;i<fdcount;i++)
fd_clearstat(&fd_stats[fds[i].fd]);
return 0;
}
int fd_showstats()
{
int i;
struct callback_stats total={0,0,0};
/* Get total time spent doing everything */
fd_tallystats(&total,&poll_stats);
for(i=0;i<alarmcount;i++)
fd_tallystats(&total,&alarms[i].stats);
for(i=0;i<fdcount;i++)
fd_tallystats(&total,&fd_stats[fds[i].fd]);
/* Now show stats */
INFOF("servald time usage stats:");
fd_showstat(&total,&poll_stats,"Idle (in poll)");
for(i=0;i<alarmcount;i++) {
char desc[1024];
snprintf(desc,1024,"%s() alarm callback",fd_funcname(alarms[i].func));
fd_showstat(&total,&alarms[i].stats,desc);
}
for(i=0;i<fdcount;i++) {
char desc[1024];
snprintf(desc,1024,"%s() fd#%d callback",
fd_funcname(fd_functions[fds[i].fd]),fds[i].fd);
fd_showstat(&total,&fd_stats[fds[i].fd],desc);
}
fd_showstat(&total,&total,"TOTAL");
return 0;
}
void fd_periodicstats()
{
fd_showstats();
fd_clearstats();
}

103
monitor.c
View File

@ -58,7 +58,7 @@ long long monitor_last_update_time=0;
int monitor_process_command(int index,char *cmd);
int monitor_process_data(int index);
static void monitor_new_socket(int s);
static void monitor_new_client(int s);
int monitor_named_socket=-1;
int monitor_setup_sockets()
@ -118,62 +118,25 @@ int monitor_setup_sockets()
WHY_perror("setsockopt");
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO)) WHY("Monitor server socket setup");
fd_watch(monitor_named_socket,monitor_poll,POLL_IN);
return 0;
error:
close(monitor_named_socket);
fd_teardown(monitor_named_socket);
monitor_named_socket=-1;
return -1;
}
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
void monitor_poll(int ignored_fd)
{
/* Make sure sockets are open */
monitor_setup_sockets();
int s,i,m;
unsigned char buffer[1024];
char msg[1024];
struct sockaddr *ignored_address=(struct sockaddr *)&buffer[0];
socklen_t ignored_length=sizeof(ignored_address);
/* This block should work, but in reality it doesn't.
poll() on linux is ALWAYS claiming that accept() can be
run. So we just have to check it whenever some other fd triggers
poll to break, which fortunately is fairly often. */
if ((*fdcount)>=fdmax) return -1;
if (monitor_named_socket>-1)
{
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO)) {
WHYF("Monitor named unix domain socket is poll() slot #%d (fd %d)\n",
*fdcount,monitor_named_socket);
}
fds[*fdcount].fd=monitor_named_socket;
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
int i;
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO))
WHYF("looking at %d monitor clients",monitor_socket_count);
for(i=0;i<monitor_socket_count;i++) {
if ((*fdcount)>=fdmax) return -1;
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO)) {
WHYF("Monitor named unix domain client socket is poll() slot #%d (fd %d)\n",
*fdcount,monitor_sockets[i].socket);
}
fds[*fdcount].fd=monitor_sockets[i].socket;
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
return 0;
}
int
monitor_poll(void) {
int bytes, i, m, s;
socklen_t addrlen;
long long now;
char msg[128];
struct monitor_context *c;
/* tell all monitor clients about status of all calls periodically */
now = overlay_gettime_ms();
long long now = overlay_gettime_ms();
if (monitor_last_update_time > (now + 1000)) {
WHY("Fixed run away monitor_last_update_time");
monitor_last_update_time = now + 1000;
@ -198,26 +161,31 @@ monitor_poll(void) {
fcntl(monitor_named_socket, F_SETFL,
fcntl(monitor_named_socket, F_GETFL, NULL) | O_NONBLOCK);
/* We don't care about the peer's address */
addrlen = 0;
ignored_length = 0;
while (
#ifdef HAVE_LINUX_IF_H
(s = accept4(monitor_named_socket, NULL, &addrlen,O_NONBLOCK))
(s = accept4(monitor_named_socket, NULL, &ignored_length,O_NONBLOCK))
#else
(s = accept(monitor_named_socket,NULL, &addrlen))
(s = accept(monitor_named_socket,NULL, &ignored_length))
#endif
!= -1
) {
addrlen = 0;
monitor_new_socket(s);
monitor_new_client(s);
}
if (errno != EAGAIN)
WHY_perror("accept");
}
void monitor_client_poll(int fd)
{
/* Read from any open connections */
int i;
for(i = 0;i < monitor_socket_count; i++) {
nextInSameSlot:
c = &monitor_sockets[i];
errno=0;
int bytes;
struct monitor_context *c=&monitor_sockets[i];
if (c->socket!=fd) continue;
fcntl(c->socket,F_SETFL,
fcntl(c->socket, F_GETFL, NULL) | O_NONBLOCK);
switch(c->state) {
@ -245,9 +213,9 @@ monitor_poll(void) {
WHY_perror("read");
/* all other errors; close socket */
WHYF("Tearing down monitor client #%d due to errno=%d (%s)",
i, errno, strerror(errno));
close(c->socket);
if (i == monitor_socket_count - 1) {
i,errno,strerror(errno)?strerror(errno):"<unknown error>");
fd_teardown(c->socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
@ -281,10 +249,10 @@ monitor_poll(void) {
break;
default:
/* all other errors; close socket */
WHYF("Tearing down monitor client #%d due to errno=%d (%s)",
i, errno, strerror(errno));
close(c->socket);
if (i == monitor_socket_count - 1) {
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
fd_teardown(c->socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
@ -311,11 +279,10 @@ monitor_poll(void) {
}
}
return 0;
return;
}
static void
monitor_new_socket(int s) {
static void monitor_new_client(int s) {
#ifdef linux
struct ucred ucred;
socklen_t len;
@ -589,7 +556,7 @@ int monitor_announce_bundle(rhizome_manifest *m)
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
close(monitor_sockets[i].socket);
fd_teardown(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
@ -637,7 +604,7 @@ int monitor_call_status(vomp_call_state *call)
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
close(monitor_sockets[i].socket);
fd_teardown(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
@ -703,7 +670,7 @@ int monitor_tell_clients(char *msg, int msglen, int mask)
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
close(monitor_sockets[i].socket);
fd_teardown(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;

203
overlay.c
View File

@ -70,41 +70,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "serval.h"
/* @PGS/20120615 */
int last_valid=0;
int last_line;
const char *last_file;
const char *last_func;
long long last_time;
/* @PGS/20120615 */
void TIMING_PAUSE()
{
last_valid=0;
}
/* @PGS/20120615 */
void _TIMING_CHECK(const char *file,const char *func,int line)
{
long long now=overlay_gettime_ms();
if (last_valid) {
if (now-last_time>5) {
// More than 5ms spent in a given task, complain
char msg[1024];
snprintf(msg,1024,"Spent %lldms between %s:%d in %s() and here",
now-last_time,last_file,last_line,last_func);
logMessage(LOG_LEVEL_WARN,file,line,func,"%s",msg);
}
}
last_valid=1;
last_file=file;
last_func=func;
last_line=line;
last_time=now;
}
int overlayMode=0;
overlay_txqueue overlay_tx[OQ_MAX];
@ -131,7 +96,7 @@ int overlayServerMode()
int i;
for(i=0;i<OQ_MAX;i++) {
overlay_tx[i].maxLength=100;
overlay_tx[i].latencyTarget=5000; /* Keep packets in queue for 5 seconds by default */
overlay_tx[i].latencyTarget=1000; /* Keep packets in queue for 1 second by default */
}
/* But expire voice/video call packets much sooner, as they just aren't any use if late */
overlay_tx[OQ_ISOCHRONOUS_VOICE].latencyTarget=500;
@ -144,155 +109,44 @@ int overlayServerMode()
of wifi latency anyway, so we'll live with it. Larger values will affect voice transport,
and smaller values would affect CPU and energy use, and make the simulation less realistic. */
struct pollfd fds[128];
int fdcount;
/* Create structures to use 1MB of RAM for testing */
overlay_route_init(1);
/* Setup up MDP & monitor interface unix domain sockets */
overlay_mdp_setup_sockets();
monitor_setup_sockets();
/* Get rhizome server started BEFORE populating fd list so that
the server's listen socket is in the list for poll() */
if (rhizome_enabled()) rhizome_server_poll();
if (rhizome_enabled()) rhizome_server_start();
/* Pick next rhizome files to grab every few seconds
from the priority list continuously being built from observed
bundle announcements */
fd_setalarm(rhizome_enqueue_suggestions,3000,3000);
while(1) {
/* Periodically check for new interfaces */
fd_setalarm(overlay_interface_discover,1,5000);
TIMING_CHECK();
/* Periodically check for server shut down */
fd_setalarm(server_shutdown_check,1,1000);
server_shutdown_check();
/* Periodically update route table.
(Alarm interval is dynamically updated by overlay_route_tick()
based on load/route table size etc) */
fd_setalarm(overlay_route_tick,1000,1000);
TIMING_CHECK();
/* Start scheduling interface ticks */
fd_setalarm(overlay_check_ticks,1,500);
/* Work out how long we can wait before we need to tick */
long long ms=overlay_time_until_next_tick();
memabuseCheck();
TIMING_CHECK();
//int filesPresent=0;
fds[0].fd=sock; fds[0].events=POLLIN;
fdcount=1;
rhizome_server_get_fds(fds,&fdcount,128);
TIMING_CHECK();
rhizome_fetching_get_fds(fds,&fdcount,128);
TIMING_CHECK();
overlay_mdp_get_fds(fds,&fdcount,128);
TIMING_CHECK();
monitor_get_fds(fds,&fdcount,128);
TIMING_CHECK();
/* Keep an eye on VoMP calls so that we can expire stale ones etc */
fd_setalarm(vomp_tick,1000,1000);
for(i=0;i<overlay_interface_count;i++)
{
/* Make socket blocking so that poll() behaves correctly. */
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)&(~O_NONBLOCK));
/* Show CPU usage stats periodically */
fd_setalarm(fd_periodicstats,3000,3000);
if ((!overlay_interfaces[i].fileP)&&(fdcount<128))
{
if (debug&DEBUG_IO) {
fprintf(stderr,"Interface %s is poll() slot #%d (fd %d)\n",
overlay_interfaces[i].name,
fdcount,
overlay_interfaces[i].fd);
}
fds[fdcount].fd=overlay_interfaces[i].fd;
fds[fdcount].events=POLLRDNORM;
fds[fdcount].revents=0;
fdcount++;
}
if (overlay_interfaces[i].fileP) {
//filesPresent=1;
if (ms>5) ms=5;
}
}
TIMING_CHECK();
/* Progressively update link scores to neighbours etc, and find out how long before
we should next tick the route table.
Basically the faster the CPU and the sparser the route table, the less often we
will need to tick in order to keep each tick nice and fast. */
int route_tick_interval=overlay_route_tick();
if (ms>route_tick_interval) ms=route_tick_interval;
int vomp_tick_time=vomp_tick_interval();
if (ms>vomp_tick_time) ms=vomp_tick_time;
TIMING_CHECK();
if (debug&DEBUG_VERBOSE_IO)
DEBUGF("Waiting via poll() for up to %lldms", ms);
TIMING_PAUSE();
/* Sanity check maximum poll timeout */
if (ms<1) ms=1;
if (ms>15000) ms=15000;
int r = poll(fds, fdcount, ms);
TIMING_CHECK();
if (r == -1)
WHY_perror("poll");
else if (debug&DEBUG_VERBOSE_IO) {
DEBUGF("poll() says %d file descriptors are ready", r);
int i;
for(i=0;i<fdcount;i++)
if (fds[i].revents)
DEBUGF("fd #%d is ready (0x%x)\n", fds[i].fd, fds[i].revents);
}
/* Do high-priority audio handling first */
TIMING_CHECK();
vomp_tick();
TIMING_CHECK();
if (r > 0) {
/* We have data, so try to receive it */
if (debug&DEBUG_IO) {
fprintf(stderr,"poll() reports %d fds ready\n",r);
int i;
for(i=0;i<fdcount;i++) {
if (fds[i].revents)
{
fprintf(stderr," #%d (fd %d): %d (",i,fds[i].fd,fds[i].revents);
if ((fds[i].revents&POLL_IN)==POLL_IN) fprintf(stderr,"POLL_IN,");
if ((fds[i].revents&POLLRDNORM)==POLLRDNORM) fprintf(stderr,"POLLRDNORM,");
if ((fds[i].revents&POLL_OUT)==POLL_OUT) fprintf(stderr,"POLL_OUT,");
if ((fds[i].revents&POLL_ERR)==POLL_ERR) fprintf(stderr,"POLL_ERR,");
if ((fds[i].revents&POLL_HUP)==POLL_HUP) fprintf(stderr,"POLL_HUP,");
if ((fds[i].revents&POLLNVAL)==POLLNVAL) fprintf(stderr,"POLL_NVAL,");
fprintf(stderr,")\n");
}
}
}
TIMING_CHECK();
overlay_rx_messages();
TIMING_CHECK();
if (rhizome_enabled()) {
TIMING_CHECK();
rhizome_server_poll();
TIMING_CHECK();
rhizome_fetch_poll();
TIMING_CHECK();
overlay_mdp_poll();
TIMING_CHECK();
monitor_poll();
TIMING_CHECK();
}
} else {
/* No data before tick occurred, so do nothing.
Well, for now let's just check anyway. */
if (debug&DEBUG_IO) fprintf(stderr,"poll() timeout.\n");
TIMING_CHECK();
overlay_rx_messages();
TIMING_CHECK();
if (rhizome_enabled()) {
TIMING_CHECK();
rhizome_server_poll();
TIMING_CHECK();
rhizome_fetch_poll();
TIMING_CHECK();
overlay_mdp_poll();
TIMING_CHECK();
monitor_poll();
TIMING_CHECK();
}
}
TIMING_CHECK();
/* Check if we need to trigger any ticks on any interfaces */
overlay_check_ticks();
TIMING_CHECK();
while(1) {
/* Check for activitiy and respond to it */
fd_poll();
}
return 0;
@ -435,7 +289,8 @@ int overlay_frame_process(int interface,overlay_frame *f)
if (!broadcast) {
if (overlay_get_nexthop(f->destination,f->nexthop,&len,
&f->nexthop_interface))
WHY("Could not find next hop for host - dropping frame");
WHYF("Could not find next hop for %s* - dropping frame",
overlay_render_sid_prefix(f->destination,7));
dontForward=1;
}
f->ttl--;

View File

@ -188,6 +188,8 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
I(fileP) = 0;
I(fd) = socket(PF_INET,SOCK_DGRAM,0);
fd_watch(I(fd),overlay_interface_poll,POLL_IN);
WHYF("Watching fd#%d for interface #%d",I(fd),interface);
if (I(fd) < 0) {
WHY_perror("socket()");
WHYF("Could not create UDP socket for interface: %s",strerror(errno));
@ -235,7 +237,7 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
return 0;
error:
close(I(fd));
fd_teardown(I(fd));
I(fd)=-1;
return -1;
#undef I
@ -293,11 +295,59 @@ int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockadd
}
overlay_interface_count++;
fd_setalarm(overlay_dummy_poll,10,10);
#undef I
return 0;
}
int overlay_rx_messages()
void overlay_interface_poll(int fd)
{
int i;
int plen=0;
unsigned char packet[16384];
for(i=0;i<overlay_interface_count;i++)
{
struct sockaddr src_addr;
unsigned int addrlen=sizeof(src_addr);
if (overlay_interfaces[i].fd!=fd) continue;
/* Read from UDP socket */
plen=1;
/* Read only one packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
// while (plen>0) {
int recvttl=1;
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)|O_NONBLOCK);
plen=recvwithttl(overlay_interfaces[i].fd,packet,sizeof(packet),
&recvttl,&src_addr,&addrlen);
if (plen<1) {
/* No more packets */
return;
} else {
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,"Read from real interface",
packet,plen);
fflush(stderr);
}
if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d (%s)\n",plen,i,overlay_interfaces[i].name);
if (packetOk(i,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
serval_packetvisualise(stderr,"Malformed packet", packet,plen);
}
}
// }
return;
}
return;
}
void overlay_dummy_poll()
{
int i;
@ -308,30 +358,28 @@ int overlay_rx_messages()
unsigned char packet[16384];
int plen=0;
int c[OVERLAY_MAX_INTERFACES];
int count=0;
/* Look at all interfaces */
for(i=0;i<overlay_interface_count;i++) { c[i]=(overlay_interfaces[i].observed>0); count+=c[i]; }
int count=1;
int dummys=0;
/* Grab packets from interfaces in round-robin fashion until all have been grabbed,
or until we have spent too long (maybe 10ms?) */
/* Check for input on any dummy interfaces that are attached to ordinary
files. We have to do it this way, because poll() says that ordinary
files are always ready for reading, even if at EOF.
Also, make sure we don't spend too much time here */
int now = overlay_gettime_ms();
while(count>0)
{
count=0;
for(i=0;i<overlay_interface_count;i++)
{
struct sockaddr src_addr;
unsigned int addrlen=sizeof(src_addr);
unsigned char transaction_id[8];
overlay_last_interface_number=i;
/* Set socket non-blocking before we try to read from it */
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)|O_NONBLOCK);
unsigned char transaction_id[8];
if (overlay_interfaces[i].fileP) {
dummys++;
/* Read from dummy interface file */
overlay_last_interface_number=i;
long long length=lseek(overlay_interfaces[i].fd,0,SEEK_END);
if (overlay_interfaces[i].offset>=length)
{
@ -372,34 +420,16 @@ int overlay_rx_messages()
}
}
} else {
/* Read from UDP socket */
int recvttl=1;
plen=recvwithttl(overlay_interfaces[i].fd,packet,sizeof(packet),
&recvttl,&src_addr,&addrlen);
if (plen<0) {
c[i]=0; count--;
} else {
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,"Read from real interface",
packet,plen);
fflush(stderr);
}
if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d (%s)\n",plen,i,overlay_interfaces[i].name);
if (packetOk(i,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
serval_packetvisualise(stderr,"Malformed packet", packet,plen);
}
}
}
}
/* Don't sit here forever, or else we will never send any packets */
if (overlay_gettime_ms()>(now+10)) break;
}
return 0;
/* Stop watching dummy nets if there are none active */
if (!dummys) fd_setalarm(overlay_dummy_poll,0,0);
return ;
}
int overlay_tx_messages()
@ -565,7 +595,7 @@ overlay_interface_register(char *name,
overlay_interfaces[i].broadcast_address.sin_addr.s_addr,
local.sin_addr.s_addr,
broadcast.sin_addr.s_addr);
close(overlay_interfaces[i].fd);
fd_teardown(overlay_interfaces[i].fd);
overlay_interfaces[i].fd = -1;
if (overlay_interface_init_socket(i, local, broadcast))
INFOF("Could not reinitialise changed interface %s", name);
@ -582,25 +612,11 @@ overlay_interface_register(char *name,
return 0;
}
static time_t overlay_last_interface_discover_time = 0;
int
overlay_interface_discover(void) {
void overlay_interface_discover(void) {
int no_route, i;
time_t now;
struct interface_rules *r;
struct sockaddr_in dummyaddr;
/* Don't waste too much time and effort on interface discovery,
especially if we can't attach to a given interface for some reason. */
now = time(NULL);
if (overlay_last_interface_discover_time > now)
overlay_last_interface_discover_time = now;
if ((now - overlay_last_interface_discover_time) < 2)
return 0;
overlay_last_interface_discover_time = now;
/* Mark all interfaces as not observed, so that we know if we need to cull any */
for(i = 0; i < overlay_interface_count; i++)
overlay_interfaces[i].observed = 0;
@ -618,7 +634,7 @@ overlay_interface_discover(void) {
/* We already know about this interface, so just update it */
overlay_interfaces[i].observed = 1;
else {
/* New interface, so register it */
/* New interface, so register it */
if (overlay_interface_init(r->namespec,dummyaddr,dummyaddr,
1000000,PORT_DNA,OVERLAY_INTERFACE_WIFI)) {
if (debug & DEBUG_OVERLAYINTERFACES) WHYF("Could not initialise newly seen interface %s", r->namespec);
@ -650,7 +666,7 @@ overlay_interface_discover(void) {
FATAL("Unable to get any interface information");
}
return 0;
return;
}
int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,overlay_frame *pax[],int *frame_pax,int frame_max_pax)
@ -806,14 +822,12 @@ int overlay_tick_interface(int i, long long now)
#define MAX_FRAME_PAX 1024
overlay_frame *pax[MAX_FRAME_PAX];
TIMING_CHECK();
if (overlay_interfaces[i].bits_per_second<1) {
/* An interface with no speed budget is for listening only, so doesn't get ticked */
return 0;
}
if (debug&DEBUG_OVERLAYINTERFACES) fprintf(stderr,"Ticking interface #%d\n",i);
if (0) WHYF("Ticking interface #%d\n",i);
/* Get a buffer ready, and limit it's size appropriately.
XXX size limit should be reduced from MTU.
@ -844,7 +858,6 @@ int overlay_tick_interface(int i, long long now)
Give priority to newly observed nodes so that good news travels quickly to help roaming.
XXX - Don't forget about PONGing reachability reports to allow use of monodirectional links.
*/
TIMING_CHECK();
overlay_stuff_packet_from_queue(i,e,OQ_MESH_MANAGEMENT,now,pax,&frame_pax,MAX_FRAME_PAX);
/* We previously limited manifest space to 3/4 of MTU, but that causes problems for
@ -853,30 +866,23 @@ TIMING_CHECK();
#warning reduce to <= mtu*3/4 once we have compacty binary canonical manifest format
ob_limitsize(e,overlay_interfaces[i].mtu*4/4);
TIMING_CHECK();
/* Add advertisements for ROUTES not Rhizome bundles.
Rhizome bundle advertisements are lower priority */
overlay_route_add_advertisements(i,e);
ob_limitsize(e,overlay_interfaces[i].mtu);
TIMING_CHECK();
/* 4. XXX Add lower-priority queued data */
overlay_stuff_packet_from_queue(i,e,OQ_ISOCHRONOUS_VIDEO,now,pax,&frame_pax,MAX_FRAME_PAX);
overlay_stuff_packet_from_queue(i,e,OQ_ORDINARY,now,pax,&frame_pax,MAX_FRAME_PAX);
overlay_stuff_packet_from_queue(i,e,OQ_OPPORTUNISTIC,now,pax,&frame_pax,MAX_FRAME_PAX);
/* 5. XXX Fill the packet up to a suitable size with anything that seems a good idea */
TIMING_CHECK();
if (rhizome_enabled())
overlay_rhizome_add_advertisements(i,e);
if (debug&DEBUG_PACKETCONSTRUCTION)
dump("assembled packet",&e->bytes[0],e->length);
TIMING_CHECK();
/* Now send the frame. This takes the form of a special DNA packet with a different
service code, which we setup earlier. */
if (debug&DEBUG_OVERLAYINTERFACES)
@ -928,8 +934,11 @@ TIMING_CHECK();
{
if ((*p)->dequeue) {
{
if (0) printf("dequeuing %p%s NOW\n",
*p,(*p)->isBroadcast?" (broadcast)":" (unicast)");
if (debug&DEBUG_QUEUES)
WHYF("dequeuing %s* -> %s* NOW (queue length=%d)",
overlay_render_sid_prefix((*p)->source,7),
overlay_render_sid_prefix((*p)->destination,7),
overlay_tx[q].length);
t=*p;
*p=t->next;
if (overlay_tx[q].last==t) overlay_tx[q].last=t->prev;
@ -962,48 +971,40 @@ TIMING_CHECK();
if (e) ob_free(e); e=NULL;
return WHY("overlay_broadcast_ensemble() failed");
}
TIMING_CHECK();
}
int
overlay_check_ticks(void) {
void overlay_check_ticks(void) {
/* Check if any interface(s) are due for a tick */
int i;
TIMING_CHECK();
/* Check for changes to interfaces */
overlay_interface_discover();
TIMING_CHECK();
long long now = overlay_gettime_ms();
/* Now check if the next tick time for the interfaces is no later than that time.
If so, trigger a tick on the interface. */
if (debug & DEBUG_OVERLAYINTERFACES) INFOF("Examining %d interfaces.",overlay_interface_count);
for(i = 0; i < overlay_interface_count; i++) {
TIMING_CHECK();
/* Only tick live interfaces */
if (overlay_interfaces[i].observed > 0) {
if (debug & DEBUG_VERBOSE_IO) INFOF("Interface %s ticks every %dms, last at %lld.",
overlay_interfaces[i].name,
overlay_interfaces[i].tick_ms,
overlay_interfaces[i].last_tick_ms);
overlay_interfaces[i].last_tick_ms);
if (now >= overlay_interfaces[i].last_tick_ms + overlay_interfaces[i].tick_ms) {
TIMING_CHECK();
/* This interface is due for a tick */
overlay_tick_interface(i, now);
TIMING_CHECK();
overlay_interfaces[i].last_tick_ms = now;
}
} else
if (debug & DEBUG_VERBOSE_IO) INFOF("Interface %s is awol.", overlay_interfaces[i].name);
TIMING_CHECK();
}
return 0;
/* Update interval until next tick */
fd_setalarm(overlay_check_ticks,overlay_time_until_next_tick(),500);
return;
}
long long overlay_time_until_next_tick()
@ -1017,14 +1018,22 @@ long long overlay_time_until_next_tick()
for(i=0;i<overlay_interface_count;i++)
if (overlay_interfaces[i].observed>0)
{
if (debug&DEBUG_VERBOSE_IO) fprintf(stderr,"Interface %s ticks every %dms, last at T-%lldms.\n",overlay_interfaces[i].name,
overlay_interfaces[i].tick_ms,now-overlay_interfaces[i].last_tick_ms);
long long thistick=
overlay_interfaces[i].tick_ms
-(now-overlay_interfaces[i].last_tick_ms);
if (0)
WHYF("Interface %s ticks every %dms, last at T-%lldms, next needed in %lldms.\n",
overlay_interfaces[i].name,
overlay_interfaces[i].tick_ms,now-overlay_interfaces[i].last_tick_ms,
thistick);
long long thistick=(overlay_interfaces[i].last_tick_ms+overlay_interfaces[i].tick_ms)-now;
if (thistick<0) thistick=0;
if (thistick<nexttick) nexttick=thistick;
if (0) WHYF("nexttick is now %lldms",nexttick);
}
if (0) WHYF("Next tick required in %lldms",nexttick);
return nexttick;
}

View File

@ -72,6 +72,8 @@ int overlay_mdp_setup_sockets()
int send_buffer_size=64*1024;
int res = setsockopt(mdp_abstract_socket, SOL_SOCKET, SO_SNDBUF,
&send_buffer_size, sizeof(send_buffer_size));
fd_watch(mdp_abstract_socket,overlay_mdp_poll,POLL_IN);
}
}
#endif
@ -104,6 +106,7 @@ int overlay_mdp_setup_sockets()
&send_buffer_size, sizeof(send_buffer_size));
if (res)
WHY_perror("setsockopt");
fd_watch(mdp_named_socket,overlay_mdp_poll,POLL_IN);
}
}
@ -111,38 +114,6 @@ int overlay_mdp_setup_sockets()
}
int overlay_mdp_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
{
/* Make sure sockets are open */
overlay_mdp_setup_sockets();
if ((*fdcount)>=fdmax) return -1;
if (mdp_abstract_socket>-1)
{
if (debug&DEBUG_IO) {
fprintf(stderr,"MDP abstract name space socket is poll() slot #%d (fd %d)\n",
*fdcount,mdp_abstract_socket);
}
fds[*fdcount].fd=mdp_abstract_socket;
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
if ((*fdcount)>=fdmax) return -1;
if (mdp_named_socket>-1)
{
if (debug&DEBUG_IO) {
fprintf(stderr,"MDP named unix domain socket is poll() slot #%d (fd %d)\n",
*fdcount,mdp_named_socket);
}
fds[*fdcount].fd=mdp_named_socket;
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
return 0;
}
#define MDP_MAX_BINDINGS 100
#define MDP_MAX_SOCKET_NAME_LEN 110
int mdp_bindings_initialised=0;
@ -438,6 +409,13 @@ int overlay_saw_mdp_frame(int interface, overlay_mdp_frame *mdp,long long now)
send back a connection refused type message? Silence is probably the
more prudent path.
*/
if (0)
WHYF("Received packet with listener (MDP ports: src=%s*:%d, dst=%d)",
overlay_render_sid_prefix(mdp->out.src.sid,7),
mdp->out.src.port,mdp->out.dst.port);
if ((!overlay_address_is_local(mdp->out.dst.sid))
&&(!overlay_address_is_broadcast(mdp->out.dst.sid)))
{
@ -477,7 +455,7 @@ int overlay_saw_mdp_frame(int interface, overlay_mdp_frame *mdp,long long now)
}
if (match>-1) {
struct sockaddr_un addr;
printf("unix domain socket '%s'\n",mdp_bindings_sockets[match]);
bcopy(mdp_bindings_sockets[match],&addr.sun_path[0],mdp_bindings_socket_name_lengths[match]);
addr.sun_family=AF_UNIX;
errno=0;
@ -948,7 +926,7 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
}
}
int overlay_mdp_poll()
void overlay_mdp_poll()
{
unsigned char buffer[16384];
int ttl;
@ -972,11 +950,14 @@ int overlay_mdp_poll()
switch(mdp->packetTypeAndFlags&MDP_TYPE_MASK) {
case MDP_GOODBYE:
return overlay_mdp_releasebindings(recvaddr_un,recvaddrlen);
overlay_mdp_releasebindings(recvaddr_un,recvaddrlen);
return;
case MDP_VOMPEVENT:
return vomp_mdp_event(mdp,recvaddr_un,recvaddrlen);
vomp_mdp_event(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_NODEINFO:
return overlay_route_node_info(mdp,recvaddr_un,recvaddrlen);
overlay_route_node_info(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_GETADDRS:
{
overlay_mdp_frame mdpreply;
@ -1035,17 +1016,20 @@ int overlay_mdp_poll()
mdpreply.addrlist.server_sid_count=count;
/* Send back to caller */
return overlay_mdp_reply(mdp_named_socket,
(struct sockaddr_un *)recvaddr,recvaddrlen,
&mdpreply);
overlay_mdp_reply(mdp_named_socket,
(struct sockaddr_un *)recvaddr,recvaddrlen,
&mdpreply);
return;
}
break;
case MDP_TX: /* Send payload (and don't treat it as system privileged) */
return overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen);
overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen);
return;
break;
case MDP_BIND: /* Bind to port */
return overlay_mdp_process_bind_request(mdp_named_socket,mdp,
recvaddr_un,recvaddrlen);
overlay_mdp_process_bind_request(mdp_named_socket,mdp,
recvaddr_un,recvaddrlen);
return;
break;
default:
/* Client is not allowed to send any other frame type */
@ -1066,7 +1050,7 @@ int overlay_mdp_poll()
fcntl(mdp_named_socket, F_GETFL, NULL)&(~O_NONBLOCK));
}
return 0;
return;
}
int overlay_mdp_relevant_bytes(overlay_mdp_frame *mdp)

View File

@ -199,13 +199,18 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,
}
/* Finally process the frame */
long long now=overlay_gettime_ms();
overlay_frame_process(interface,&f);
long long elapsed=overlay_gettime_ms()-now;
if (0) INFOF("overlay_frame_process (type=%d, len=%d) took %lldms",
f.type,f.bytecount,elapsed);
/* Skip the rest of the bytes in this frame so that we can examine the next one in this
ensemble */
if (debug&DEBUG_PACKETFORMATS) fprintf(stderr,"next ofs=%d, f.rfs=%d, len=%d\n",ofs,f.rfs,len);
ofs+=f.rfs;
}
if (0) INFOF("Finished processing overlay packet");
return 0;
}

View File

@ -244,6 +244,10 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
Complain if there are too many frames in the queue.
*/
if (0)
WHYF("Enqueuing packet for %s* (q[%d]length = %d)",
overlay_render_sid_prefix(p->destination,7),
q,overlay_tx[q].length);
if (q==OQ_ISOCHRONOUS_VOICE&&(!forceBroadcastP)) {
/* Dispatch voice data immediately.
Also tell Rhizome to back off a bit, so that voice traffic
@ -253,7 +257,7 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
int broadcast=overlay_address_is_broadcast(p->destination);
rhizome_saw_voice_traffic();
overlay_abbreviate_clear_most_recent_address();
if (broadcast) {
@ -311,7 +315,8 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
if (0) dump_payload(p,"queued for delivery");
if (overlay_tx[q].length>=overlay_tx[q].maxLength) return WHY("Queue congested");
if (overlay_tx[q].length>=overlay_tx[q].maxLength)
return WHYF("Queue #%d congested (size = %d)",q,overlay_tx[q].maxLength);
if (0) dump_queue("before",q);

View File

@ -1259,7 +1259,7 @@ int overlay_route_tick_next_neighbour_id=0;
int overlay_route_tick_neighbour_bundle_size=1;
int overlay_route_tick_next_node_bin_id=0;
int overlay_route_tick_node_bundle_size=1;
int overlay_route_tick()
void overlay_route_tick()
{
int n;
@ -1326,7 +1326,11 @@ int overlay_route_tick()
int interval=5000/ticks;
if (debug&DEBUG_OVERLAYROUTING) fprintf(stderr,"route tick interval = %dms (%d ticks per 5sec, neigh=%lldms, node=%lldms)\n",interval,ticks,neighbour_time,node_time);
return interval;
/* Update callback interval based on how much work we have to do */
fd_setalarm(overlay_route_tick,interval,interval);
return;
}
/* Ticking neighbours is easy; we just pretend we have heard from them again,

View File

@ -297,4 +297,6 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m,
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m,
struct sockaddr_in *peerip);
int rhizome_enqueue_suggestions();
void rhizome_enqueue_suggestions();
void rhizome_fetch_poll(int fd);

View File

@ -150,8 +150,10 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m)
long long dbVersion = -1;
if (sqlite_exec_int64(&dbVersion, "SELECT version FROM MANIFESTS WHERE id='%s';", id) == -1)
return WHY("Select failure");
if (dbVersion >= m->version)
return WHYF("We already have %s (%lld vs %lld)", id, dbVersion, m->version);
if (dbVersion >= m->version) {
if (0) WHYF("We already have %s (%lld vs %lld)", id, dbVersion, m->version);
return -1;
}
return 0;
/* Work out bin number in cache */
@ -485,7 +487,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m,
return 0;
}
int rhizome_enqueue_suggestions()
void rhizome_enqueue_suggestions()
{
int i;
for(i=0;i<candidate_count;i++)
@ -508,7 +510,7 @@ int rhizome_enqueue_suggestions()
candidate_count-=i;
}
return 0;
return;
}
int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip, int *manifest_kept)
@ -664,6 +666,8 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
close(sock);
return -1;
}
/* Watch for activity on the socket */
fd_watch(q->socket,rhizome_fetch_poll,POLL_OUT);
rhizome_file_fetch_queue_count++;
if (1||debug&DEBUG_RHIZOME) DEBUGF("Queued file for fetching into %s (%d in queue)",
q->manifest->dataFileName, rhizome_file_fetch_queue_count);
@ -693,65 +697,15 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
return 0;
}
long long rhizome_last_fetch=0;
int rhizome_poll_fetchP=0;
int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
{
/* Don't fetch quickly during voice calls */
rhizome_poll_fetchP=0;
long long now=overlay_gettime_ms();
if (now<rhizome_voice_timeout)
{
/* only fetch data once per 500ms during calls */
if ((rhizome_last_fetch+500)>now)
return 0;
}
rhizome_last_fetch=now;
rhizome_poll_fetchP=1;
int i;
if ((*fdcount)>=fdmax) return -1;
for(i=0;i<rhizome_file_fetch_queue_count;i++)
{
if ((*fdcount)>=fdmax) return -1;
if (debug&DEBUG_RHIZOMESYNC) {
DEBUGF("rhizome file fetch request #%d is poll() slot #%d (fd %d)",
i,*fdcount,file_fetch_queue[i].socket); }
fds[*fdcount].fd=file_fetch_queue[i].socket;
switch(file_fetch_queue[i].state) {
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
fds[*fdcount].events=POLLOUT; break;
case RHIZOME_FETCH_RXHTTPHEADERS:
case RHIZOME_FETCH_RXFILE:
default:
fds[*fdcount].events=POLLIN; break;
}
(*fdcount)++;
}
return 0;
}
long long rhizome_last_fetch_enqueue_time=0;
int rhizome_fetch_poll()
void rhizome_fetch_poll(int fd)
{
int rn;
if (!rhizome_poll_fetchP) return 0;
if (rhizome_last_fetch_enqueue_time<overlay_gettime_ms())
{
rhizome_enqueue_suggestions();
rhizome_last_fetch_enqueue_time=overlay_gettime_ms();
}
if (0&&debug&DEBUG_RHIZOME) DEBUGF("Checking %d active fetch requests",
rhizome_file_fetch_queue_count);
for(rn=0;rn<rhizome_file_fetch_queue_count;rn++)
{
rhizome_file_fetch_record *q=&file_fetch_queue[rn];
int action=0;
int bytes;
rhizome_file_fetch_record *q=&file_fetch_queue[rn];
if (q->socket!=fd) continue;
/* Make socket non-blocking */
fcntl(q->socket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)|O_NONBLOCK);
@ -775,6 +729,7 @@ int rhizome_fetch_poll()
}
q->request_len=0; q->request_ofs=0;
q->state=RHIZOME_FETCH_RXHTTPHEADERS;
fd_watch(q->socket,rhizome_fetch_poll,POLL_IN);
}
} else if (errno!=EAGAIN) {
WHY("Got error while sending HTTP request. Closing.");
@ -825,12 +780,12 @@ int rhizome_fetch_poll()
fclose(q->file); q->file=NULL;
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL)
return WHY("Manifest missing ID");
{ WHY("Manifest missing ID"); return; }
if (create_rhizome_import_dir() == -1)
return -1;
return;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename,"manifest.%s", id))
return -1;
return;
/* Do really write the manifest unchanged */
if (debug&DEBUG_RHIZOME) {
DEBUGF("manifest has %d signatories",q->manifest->sig_count);
@ -977,6 +932,9 @@ int rhizome_fetch_poll()
rhizome_manifest_free(file_fetch_queue[i].manifest);
file_fetch_queue[i].manifest=NULL;
/* close socket and stop watching it */
fd_teardown(file_fetch_queue[i].socket);
/* reshuffle higher numbered slot down if required */
if (i<(rhizome_file_fetch_queue_count-1))
bcopy(&file_fetch_queue[rhizome_file_fetch_queue_count-1],
@ -990,7 +948,5 @@ int rhizome_fetch_poll()
rhizome_file_fetch_queue_count);
}
}
return 0;
return;
}

View File

@ -89,7 +89,7 @@ int rhizome_server_start()
if (bind(rhizome_server_socket, (struct sockaddr *) &address,
sizeof(address)) < 0)
{
close(rhizome_server_socket);
fd_teardown(rhizome_server_socket);
rhizome_server_socket=-1000;
if (debug&DEBUG_RHIZOME) WHY("bind() failed starting rhizome http server");
return -1;
@ -99,44 +99,32 @@ int rhizome_server_start()
if (rc < 0)
{
perror("ioctl() failed");
close(rhizome_server_socket);
fd_teardown(rhizome_server_socket);
rhizome_server_socket=-1;
exit(-1);
}
if (listen(rhizome_server_socket,20))
{
close(rhizome_server_socket);
fd_teardown(rhizome_server_socket);
rhizome_server_socket=-1;
return WHY("listen() failed starting rhizome http server");
}
/* Add Rhizome HTTPd server to list of file descriptors to watch */
fd_watch(rhizome_server_socket,rhizome_server_poll,POLL_IN);
return 0;
}
int rhizome_poll_httpP=0;
int rhizome_server_poll()
void rhizome_client_poll(int fd)
{
struct sockaddr addr;
unsigned int addr_len=0;
int sock;
int rn;
if (!rhizome_poll_httpP) return 0;
/* Having the starting of the server here is helpful in that
if the port is taken by someone else, we will grab it fairly
swiftly once it becomes available. */
if (rhizome_server_socket<0) rhizome_server_start();
if (rhizome_server_socket<0) return 0;
/* Process the existing requests.
XXX - should use poll or select here */
if (0&&debug&DEBUG_RHIZOME) WHYF("Checking %d active connections",
rhizome_server_live_request_count);
for(rn=0;rn<rhizome_server_live_request_count;rn++)
{
rhizome_http_request *r=rhizome_live_http_requests[rn];
if (r->socket!=fd) continue;
switch(r->request_type)
{
case RHIZOME_HTTP_REQUEST_RECEIVING:
@ -194,9 +182,23 @@ int rhizome_server_poll()
/* Socket already has request -- so just try to send some data. */
rhizome_server_http_send_bytes(rn,r);
break;
}
}
/* We have processed the connection that has activity, so we can return
immediately */
return;
}
return;
}
void rhizome_server_poll(int ignored_file_descriptor)
{
struct sockaddr addr;
unsigned int addr_len=0;
int sock;
/* Deal with any new requests */
/* Make socket non-blocking */
fcntl(rhizome_server_socket,F_SETFL,
@ -210,17 +212,18 @@ int rhizome_server_poll()
/* We are now trying to read the HTTP request */
request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING;
rhizome_live_http_requests[rhizome_server_live_request_count++]=request;
/* Watch for input */
fd_watch(request->socket,rhizome_client_poll,POLL_IN);
}
fcntl(rhizome_server_socket,F_SETFL,
fcntl(rhizome_server_socket, F_GETFL, NULL)&(~O_NONBLOCK));
return 0;
}
int rhizome_server_close_http_request(int i)
{
close(rhizome_live_http_requests[i]->socket);
fd_teardown(rhizome_live_http_requests[i]->socket);
rhizome_server_free_http_request(rhizome_live_http_requests[i]);
/* Make it null, so that if we are the list in the list, the following
assignment still yields the correct behaviour */
@ -241,52 +244,6 @@ int rhizome_server_free_http_request(rhizome_http_request *r)
return 0;
}
long long rhizome_last_http_send=0;
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
{
int i;
if ((*fdcount)>=fdmax) return -1;
rhizome_poll_httpP=0;
/* Don't send quickly during voice calls */
long long now=overlay_gettime_ms();
if (now<rhizome_voice_timeout)
{
/* only send data once per 500ms during calls */
if ((rhizome_last_http_send+500)>now)
return 0;
}
rhizome_last_http_send=now;
rhizome_poll_httpP=1;
if (rhizome_server_socket>-1)
{
if (debug&DEBUG_IO) {
WHYF("rhizome http server is poll() slot #%d (fd %d)",
*fdcount,rhizome_server_socket);
}
fds[*fdcount].fd=rhizome_server_socket;
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
for(i=0;i<rhizome_server_live_request_count;i++)
{
if ((*fdcount)>=fdmax) return -1;
if (debug&DEBUG_IO) {
WHYF("rhizome http request #%d is poll() slot #%d (fd %d)",
i,*fdcount,rhizome_live_http_requests[i]->socket); }
fds[*fdcount].fd=rhizome_live_http_requests[i]->socket;
switch(rhizome_live_http_requests[i]->request_type) {
case RHIZOME_HTTP_REQUEST_RECEIVING:
fds[*fdcount].events=POLLIN; break;
default:
fds[*fdcount].events=POLLOUT; break;
}
(*fdcount)++;
}
return 0;
}
void hexFilter(char *s)
{
char *t;
@ -462,6 +419,9 @@ int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r)
int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
{
char id[1024];
/* Switching to writing, so update the call-back */
fd_watch(r->socket,rhizome_client_poll,POLL_OUT);
/* Clear request type flags */
r->request_type=0;
@ -498,6 +458,7 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
else if (sscanf(r->request,"GET /rhizome/file/%s HTTP/1.", id)==1)
{
/* Stream the specified file */
int dud=0;
int i;
hexFilter(id);

View File

@ -823,11 +823,10 @@ long long parse_quantity(char *q);
int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockaddr_in broadcast,
int speed_in_bits,int port,int type);
int overlay_interface_init_socket(int i,struct sockaddr_in src_addr,struct sockaddr_in broadcast);
int overlay_interface_discover();
int overlay_interface_discover();
void overlay_interface_discover();
long long overlay_time_until_next_tick();
int overlay_rx_messages();
int overlay_check_ticks();
void overlay_check_ticks();
int overlay_add_selfannouncement();
int overlay_frame_package_fmt1(overlay_frame *p,overlay_buffer *b);
int overlay_interface_args(const char *arg);
@ -1037,7 +1036,7 @@ int overlay_route_record_link(long long now,unsigned char *to,
unsigned char *via,int sender_interface,
unsigned int s1,unsigned int s2,int score,int gateways_en_route);
int overlay_route_dump();
int overlay_route_tick();
void overlay_route_tick();
int overlay_route_tick_neighbour(int neighbour_id,long long now);
int overlay_route_tick_node(int bin,int slot,long long now);
int overlay_route_add_advertisements(int interface,overlay_buffer *e);
@ -1053,7 +1052,7 @@ int overlay_route_saw_advertisements(int i,overlay_frame *f, long long now);
int overlay_rhizome_saw_advertisements(int i,overlay_frame *f, long long now);
int overlay_route_please_advertise(overlay_node *n);
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int rhizome_server_poll();
void rhizome_server_poll(int ignored_file_descriptor);
int rhizome_saw_voice_traffic();
int overlay_saw_mdp_containing_frame(int interface,overlay_frame *f,long long now);
@ -1092,7 +1091,7 @@ int overlay_address_is_broadcast(unsigned char *a);
int overlay_broadcast_generate_address(unsigned char *a);
int overlay_abbreviate_unset_current_sender();
int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int rhizome_fetch_poll();
void rhizome_fetch_poll(int fd);
int rhizome_opendb();
typedef struct dna_identity_status {
@ -1156,7 +1155,7 @@ int mkdirsn(const char *path, size_t len, mode_t mode);
#define FORM_SERVAL_INSTANCE_PATH(buf, path) (form_serval_instance_path(buf, sizeof(buf), (path)))
int overlay_mdp_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int overlay_mdp_poll();
void overlay_mdp_poll();
int overlay_mdp_reply_error(int sock,
struct sockaddr_un *recvaddr,int recvaddrlen,
int error_number,char *message);
@ -1427,7 +1426,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
int vomp_mdp_received(overlay_mdp_frame *mdp);
char *vomp_describe_state(int state);
char *vomp_describe_codec(int c);
int vomp_tick();
void vomp_tick();
int vomp_tick_interval();
int vomp_sample_size(int c);
int vomp_codec_timespan(int c);
@ -1476,7 +1475,8 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_setup_sockets();
int monitor_poll();
void monitor_poll(int ignored_fd);
void monitor_client_poll(int ignored_fd);
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_call_status(vomp_call_state *call);
int monitor_send_audio(vomp_call_state *call,overlay_mdp_frame *audio);
@ -1538,3 +1538,20 @@ void sigIoHandler(int signal);
#define WRITE_STR(fd, str) write(fd, str, strlen(str))
/* Event queue handling functions */
int fd_poll();
int fd_checkalarms();
int fd_setalarm(void (*func),long long first_alarm_in,int repeat_every);
int fd_teardown(int fd);
int fd_watch(int fd,void (*func)(int fd),int events);
int fd_list();
char *fd_funcname(void *addr);
int fd_showstats();
void fd_periodicstats();
int rhizome_server_start();
void rhizome_enqueue_suggestions();
int overlay_mdp_setup_sockets();
void overlay_interface_poll(int fd);
void overlay_dummy_poll();
void rhizome_client_poll(int fd);

View File

@ -209,8 +209,7 @@ int server(char *backing_file)
fprintf(f,"%d\n", server_getpid);
fclose(f);
if (!overlayMode) simpleServerMode();
else overlayServerMode();
overlayServerMode();
return 0;
}
@ -810,73 +809,6 @@ int createServerSocket()
return 0;
}
extern int sigIoFlag;
extern int rhizome_server_socket;
int simpleServerMode()
{
while(1) {
struct sockaddr recvaddr;
socklen_t recvaddrlen=sizeof(recvaddr);
struct pollfd fds[128];
int fdcount;
int len;
int r;
server_shutdown_check();
bzero((void *)&recvaddr,sizeof(recvaddr));
/* Get rhizome server started BEFORE populating fd list so that
the server's listen socket is in the list for poll() */
if (rhizome_enabled()) rhizome_server_poll();
/* Get list of file descripters to watch */
fds[0].fd=sock; fds[0].events=POLLIN;
fdcount=1;
rhizome_server_get_fds(fds,&fdcount,128);
if (debug&DEBUG_IO) {
printf("poll()ing file descriptors:");
{ int i;
for(i=0;i<fdcount;i++) { printf(" %d",fds[i].fd); } }
printf("\n");
}
/* Wait patiently for packets to arrive. */
if (rhizome_enabled()) rhizome_server_poll();
while ((r=poll(fds,fdcount,100000))<1) {
if (sigIoFlag) { sigIoFlag=0; break; }
sleep(0);
}
if (rhizome_enabled()) rhizome_server_poll();
unsigned char buffer[16384];
int ttl=-1; // unknown
if (fds[0].revents&POLLIN) {
len=recvwithttl(sock,buffer,sizeof(buffer),&ttl,&recvaddr,&recvaddrlen);
client_port=((struct sockaddr_in*)&recvaddr)->sin_port;
client_addr=((struct sockaddr_in*)&recvaddr)->sin_addr;
if (debug&DEBUG_DNAREQUESTS) fprintf(stderr,"Received packet from %s:%d (len=%d).\n",inet_ntoa(client_addr),client_port,len);
if (debug&DEBUG_PACKETRX) dump("recvaddr",(unsigned char *)&recvaddr,recvaddrlen);
if (debug&DEBUG_PACKETRX) dump("packet",(unsigned char *)buffer,len);
if (dropPacketP(len)) {
if (debug&DEBUG_SIMULATION) DEBUG("Simulation mode: Dropped packet due to simulated link parameters.");
continue;
}
/* Simple server mode doesn't really use interface numbers, so lie and say interface -1 */
if (packetOk(-1,buffer,len,NULL,ttl,&recvaddr,recvaddrlen,1)) {
if (debug&DEBUG_PACKETFORMATS) DEBUG("Ignoring invalid packet");
}
if (debug&DEBUG_PACKETRX) DEBUG("Finished processing packet, waiting for next one.");
}
}
return 0;
}
#ifdef DEBUG_MEM_ABUSE
unsigned char groundzero[65536];
int memabuseInitP=0;

View File

@ -54,7 +54,7 @@ setup_servald_instance() {
setup_servald_instances() {
setup_servald
DUMMYNET=/tmp/dummy
DUMMYNET=`pwd`/dummy.dat
rm $DUMMYNET
touch $DUMMYNET
assert [ -e $DUMMYNET ]
@ -63,7 +63,7 @@ setup_servald_instances() {
setup_servald_instance +B $DUMMYNET
SIDB=$sid
# Now make sure that they can see each other
sleep 5 # Should be plenty of time
sleep 10 # Should be plenty of time
set_instance +A
echo "Dummynet file $DUMMYNET after 5 seconds: "`ls -l $DUMMYNET`
executeOk_servald id peers

9
vomp.c
View File

@ -1457,7 +1457,7 @@ int app_vomp_monitor(int argc, const char *const *argv, struct command_line_opti
return overlay_mdp_client_done();
}
int vomp_tick()
void vomp_tick()
{
/* Send any reminder packets for call state, and also process any audio. */
unsigned long long now=overlay_gettime_ms();
@ -1508,11 +1508,6 @@ int vomp_tick()
break;
}
}
return 0;
return;
}
int vomp_tick_interval()
{
/* Work out the number of milliseconds until the next vomp tick is required. */
return 1000;
}