Refactor how functions are scheduled or file handes are watched

This commit is contained in:
Jeremy Lakeman 2012-07-02 13:19:54 +09:30
parent 89d3923557
commit 89566e4d3d
21 changed files with 1280 additions and 1472 deletions

View File

@ -22,7 +22,8 @@ SERVALD_SRC_FILES = \
serval-dna/gateway.c \
serval-dna/overlay.c \
serval-dna/overlay_broadcast.c \
serval-dna/packetformats.c \
serval-dna/performance_timing.c \
serval-dna/packetformats.c \
serval-dna/peers.c \
serval-dna/randombytes.c \
serval-dna/rhizome.c \

View File

@ -24,6 +24,7 @@ SRCS= main.c \
overlay_payload.c \
overlay_route.c \
packetformats.c \
performance_timing.c \
peers.c \
randombytes.c \
responses.c \

View File

@ -325,7 +325,7 @@ int getReplyPackets(int method,int peer,int batchP,struct response_set *response
if (debug&DEBUG_SIMULATION) fprintf(stderr,"Simulation mode: Dropped packet due to simulated link parameters.\n");
continue;
}
if (!packetOk(-1,buffer,len,transaction_id,ttl,recvaddr,recvaddrlen,0)) {
if (!packetOk(NULL,buffer,len,transaction_id,ttl,recvaddr,recvaddrlen,0)) {
/* Packet passes tests - extract responses and append them to the end of the response list */
if (extractResponses(client_addr,buffer,len,responses))
return WHY("Problem extracting response fields from reply packets");

480
fdqueue.c
View File

@ -20,423 +20,161 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "serval.h"
#include <poll.h>
struct callback_stats {
long long max_time;
long long total_time;
int calls;
};
#define MAX_ALARMS 128
typedef struct callback_alarm {
void (*func)();
long long next_alarm;
long long repeat_every;
struct callback_stats stats;
} callback_alarm;
callback_alarm alarms[MAX_ALARMS];
int alarmcount=0;
#define MAX_WATCHED_FDS 128
struct pollfd fds[MAX_WATCHED_FDS];
int fdcount=0;
void(*fd_functions[MAX_WATCHED_FDS])(int fd);
struct callback_stats fd_stats[MAX_WATCHED_FDS];
struct sched_ent *fd_callbacks[MAX_WATCHED_FDS];
struct sched_ent *next_alarm=NULL;
struct callback_stats poll_stats={NULL,0,"Idle (in poll)",0,0,0};
struct callback_stats poll_stats={0,0,0};
/* @PGS/20120615 */
int last_valid=0;
int last_line;
const char *last_file;
const char *last_func;
long long last_time;
/* @PGS/20120615 */
void TIMING_PAUSE()
{
last_valid=0;
}
/* @PGS/20120615 */
void _TIMING_CHECK(const char *file,const char *func,int line)
{
void list_alarms(){
long long now=overlay_gettime_ms();
if (last_valid) {
if (now-last_time>5) {
// More than 5ms spent in a given task, complain
char msg[1024];
snprintf(msg,1024,"Spent %lldms between %s:%d in %s() and here",
now-last_time,last_file,last_line,last_func);
logMessage(LOG_LEVEL_WARN,file,line,func,"%s",msg);
}
struct sched_ent *alarm = next_alarm;
int i;
INFO("Alarms;");
while(alarm){
INFOF("%s in %lldms", alarm->stats.name, alarm->alarm - now);
alarm = alarm->_next;
}
last_valid=1;
last_file=file;
last_func=func;
last_line=line;
last_time=now;
INFO("File handles;");
for (i=0;i<fdcount;i++)
INFOF("%s watching #%d", fd_callbacks[i]->stats.name, fds[i].fd);
}
int fd_watch(int fd,void (*func)(int fd),int events)
{
if (fd<0||fd>=MAX_WATCHED_FDS)
return WHYF("Invalid file descriptor (%d) - must be between 0 and %d",
MAX_WATCHED_FDS-1);
if (fdcount>=MAX_WATCHED_FDS)
return WHYF("Currently watching too many file descriptors. This should never happen; report a bug.");
int i;
for(i=0;i<fdcount;i++)
if (fds[i].fd==fd)
// add an alarm to the list of scheduled function calls.
// simply populate .alarm with the absolute time, and .function with the method to call.
// on calling .poll.revents will be zero.
int schedule(struct sched_ent *alarm){
long long now=overlay_gettime_ms();
struct sched_ent *node = next_alarm, *last = NULL;
while(node!=NULL){
if (node->alarm > alarm->alarm)
break;
fds[i].fd=fd;
fds[i].events=events;
if (i==fdcount)
fdcount++;
last = node;
node = node->_next;
}
if (last == NULL){
next_alarm = alarm;
}else{
last->_next=alarm;
}
alarm->_prev = last;
if(node!=NULL)
node->_prev = alarm;
alarm->_next = node;
if (func!=fd_functions[fd]) {
fd_stats[fd].max_time=0;
fd_stats[fd].total_time=0;
fd_stats[fd].calls=0;
}
fd_functions[fd]=func;
return 0;
}
int fd_teardown(int fd)
{
int i;
for(i=0;i<fdcount;i++)
if (fds[i].fd==fd) {
if (i<(fdcount-1)) {
/* Move last entry in list to this position, and wipe last entry in list */
fds[i]=fds[fdcount-1];
fds[fdcount-1].fd=-1;
fds[fdcount-1].events=0;
} else {
/* We are last entry in list, so just wipe */
fds[i].events=0;
fds[i].fd=-1;
}
fdcount--; i--;
}
close(fd);
// remove a function from the schedule before it has fired
// safe to unschedule twice...
int unschedule(struct sched_ent *alarm){
struct sched_ent *prev = alarm->_prev;
struct sched_ent *next = alarm->_next;
if (prev!=NULL)
prev->_next = next;
else if(next_alarm==alarm)
next_alarm = next;
if (next!=NULL)
next->_prev = prev;
alarm->_prev = NULL;
alarm->_next = NULL;
return 0;
}
/* Automatically call a function every this many milli-seconds.
If repeat_every is zero, then the alarm will be a one-shot */
int fd_setalarm(void (*func),long long first_alarm_in,int repeat_every)
{
int i;
if (!func) return -1;
if (first_alarm_in<=0) first_alarm_in=repeat_every;
if (repeat_every<0) return -1;
for(i=0;i<alarmcount;i++)
{
if (func==alarms[i].func) break;
}
if (i>=MAX_ALARMS) return WHY("Too many alarms");
if (!first_alarm_in) {
/* remove old alarm */
alarms[i]=alarms[--alarmcount];
return 0;
} else {
/* Create new alarm, or update existing one */
if (alarms[i].func!=func) {
alarms[i].stats.calls=0;
alarms[i].stats.max_time=0;
alarms[i].stats.total_time=0;
}
alarms[i].func=func;
alarms[i].next_alarm=overlay_gettime_ms()+first_alarm_in;
alarms[i].repeat_every=repeat_every;
if (i>=alarmcount) alarmcount=i+1;
return 0;
// start watching a file handle, call this function again if you wish to change the event mask
int watch(struct sched_ent *alarm){
if (alarm->_poll_index>=0 && fd_callbacks[alarm->_poll_index]==alarm){
// updating event flags
INFOF("Updating watch %s, #%d for %d", alarm->stats.name, alarm->poll.fd, alarm->poll.events);
}else{
INFOF("Adding watch %s, #%d for %d", alarm->stats.name, alarm->poll.fd, alarm->poll.events);
if (fdcount>=MAX_WATCHED_FDS)
return WHY("Too many file handles to watch");
fd_callbacks[fdcount]=alarm;
alarm->_poll_index=fdcount;
fdcount++;
}
fds[alarm->_poll_index]=alarm->poll;
return 0;
}
void fd_update_stats(struct callback_stats *s,long long elapsed)
{
s->total_time+=elapsed;
if (elapsed>s->max_time) s->max_time=elapsed;
s->calls++;
// stop watching a file handle
int unwatch(struct sched_ent *alarm){
int index = alarm->_poll_index;
if (index <0 || fds[index].fd!=alarm->poll.fd)
return WHY("Attempted to unwatch a handle that is not being watched");
fdcount--;
if (index!=fdcount){
// squash fds
fds[index] = fds[fdcount];
fd_callbacks[index] = fd_callbacks[fdcount];
fd_callbacks[index]->_poll_index=index;
}
fds[fdcount].fd=-1;
fd_callbacks[fdcount]=NULL;
alarm->_poll_index=-1;
INFOF("%s stopped watching #%d for %d", alarm->stats.name, alarm->poll.fd, alarm->poll.events);
return 0;
}
void call_alarm(struct sched_ent *alarm, int revents){
struct call_stats call_stats;
fd_func_enter(&call_stats);
alarm->poll.revents = revents;
alarm->function(alarm);
fd_func_exit(&call_stats, &alarm->stats);
}
int fd_checkalarms()
{
long long now=overlay_gettime_ms();
int i;
long long next_alarm_in=15000;
TIMING_PAUSE();
for(i=0;i<alarmcount;i++)
{
if (alarms[i].next_alarm&&alarms[i].next_alarm<=now) {
now=overlay_gettime_ms();
alarms[i].func();
long long elapsed=overlay_gettime_ms()-now;
fd_update_stats(&alarms[i].stats,elapsed);
if (!alarms[i].repeat_every) {
/* Alarm was one-shot, so erase alarm */
fd_setalarm(alarms[i].func,0,0);
i--;
continue;
} else
/* Alarm is repeating, so set next call */
alarms[i].next_alarm=now+alarms[i].repeat_every;
}
/* Work out if this alarm is next */
if (next_alarm_in>(alarms[i].next_alarm-now))
next_alarm_in=(alarms[i].next_alarm-now);
}
return next_alarm_in;
if (next_alarm!=NULL&&next_alarm->alarm <=now){
struct sched_ent *alarm = next_alarm;
unschedule(alarm);
call_alarm(alarm, 0);
now=overlay_gettime_ms();
}
if (next_alarm)
return next_alarm->alarm - now;
return 15000;
}
int fd_poll()
{
int i;
int i, r;
/* See if any alarms have expired before we do anything.
This also returns the time to the next alarm that is due. */
int ms=fd_checkalarms();
/* Make sure we don't have any silly timeouts that will make us wait for ever. */
if (ms<1) ms=1;
if (ms<0) ms=0;
/* Wait for action or timeout */
long long now=overlay_gettime_ms();
int r=poll(fds, fdcount, ms);
long long elapsed=overlay_gettime_ms()-now;
fd_update_stats(&poll_stats,elapsed);
{
struct call_stats call_stats;
fd_func_enter(&call_stats);
r=poll(fds, fdcount, ms);
fd_func_exit(&call_stats, &poll_stats);
}
/* If file descriptors are ready, then call the appropriate functions */
if (r>0) {
for(i=0;i<fdcount;i++)
if (fds[i].revents) {
long long now=overlay_gettime_ms();
/* Make socket non-blocking */
SET_NONBLOCKING(fds[i].fd);
fd_functions[fds[i].fd](fds[i].fd);
call_alarm(fd_callbacks[i], fds[i].revents);
/* Make socket blocking again */
SET_BLOCKING(fds[i].fd);
long long elapsed=overlay_gettime_ms()-now;
fd_update_stats(&fd_stats[fds[i].fd],elapsed);
}
}
/* After all that action, we might have an alarm expire, so check the alarms
again */
fd_checkalarms();
return 0;
}
typedef struct func_descriptions {
void *addr;
char *description;
} func_descriptions;
func_descriptions func_names[]={
{overlay_check_ticks,"overlay_check_ticks"},
{overlay_dummy_poll,"overlay_dummy_poll"},
{overlay_interface_discover,"overlay_interface_discover"},
{overlay_route_tick,"overlay_route_tick"},
{rhizome_enqueue_suggestions,"rhizome_enqueue_suggestions"},
{server_shutdown_check,"server_shutdown_check"},
{monitor_client_poll,"monitor_client_poll"},
{monitor_poll,"monitor_poll"},
{overlay_interface_poll,"overlay_interface_poll"},
{overlay_mdp_poll,"overlay_mdp_poll"},
{rhizome_client_poll,"rhizome_client_poll"},
{rhizome_fetch_poll,"rhizome_fetch_poll"},
{rhizome_server_poll,"rhizome_server_poll"},
{fd_periodicstats,"fd_periodicstats"},
{vomp_tick,"vomp_tick"},
{rhizome_check_connections,"rhizome_check_connections"},
{NULL,NULL}
};
#define MAX_FUNCS 1024
struct callback_stats called_funcs[MAX_FUNCS];
const char *called_func_names[MAX_FUNCS];
int func_count=0;
#define MAX_CALL_DEPTH 128
struct {
int func_id;
int enter_time;
int child_time;
} call_stack[MAX_CALL_DEPTH];
int call_stack_depth=0;
char *fd_funcname(void *addr)
{
int j;
char *funcname="unknown";
for(j=0;func_names[j].addr;j++)
if (func_names[j].addr==addr)
funcname=func_names[j].description;
return funcname;
}
int fd_list()
{
long long now=overlay_gettime_ms();
int i;
INFOF("List of timed callbacks:");
INFOF("------------------------");
for(i=0;i<alarmcount;i++) {
INFOF(alarms[i].repeat_every?"() in %lldms and every %lldms":"%s() in %lldms%*",
fd_funcname(alarms[i].func),
alarms[i].next_alarm-now,alarms[i].repeat_every);
}
INFOF("List of watched file descriptors:");
INFOF("---------------------------------");
for(i=0;i<fdcount;i++) {
char *eventdesc="<somethinged>";
if ((fds[i].events&POLLIN)&&(fds[i].events&POLLOUT))
eventdesc="read or written";
else if (fds[i].events&POLLIN)
eventdesc="read";
else if (fds[i].events&POLLOUT)
eventdesc="written";
INFOF("%s() when fd#%d can be %s",
fd_funcname(fd_functions[fds[i].fd]),fds[i].fd,eventdesc);
}
return 0;
}
int fd_tallystats(struct callback_stats *total,struct callback_stats *a)
{
total->total_time+=a->total_time;
total->calls+=a->calls;
if (a->max_time>total->max_time) total->max_time=a->max_time;
return 0;
}
int fd_showstat(struct callback_stats *total, struct callback_stats *a, const char *msg)
{
WHYF("%lldms (%2.1f%%) in %d calls (max %lldms, avg %.1fms) : %s",
a->total_time,a->total_time*100.0/total->total_time,
a->calls,
a->max_time,a->total_time*1.00/a->calls,
msg);
return 0;
}
int fd_clearstat(struct callback_stats *s)
{
s->calls=0;
s->max_time=0;
s->total_time=0;
return 0;
}
int fd_clearstats()
{
int i;
fd_clearstat(&poll_stats);
for(i=0;i<alarmcount;i++)
fd_clearstat(&alarms[i].stats);
for(i=0;i<fdcount;i++)
fd_clearstat(&fd_stats[fds[i].fd]);
for(i=0;i<func_count;i++)
fd_clearstat(&called_funcs[i]);
return 0;
}
int fd_showstats()
{
int i;
struct callback_stats total={0,0,0};
/* Get total time spent doing everything */
fd_tallystats(&total,&poll_stats);
for(i=0;i<alarmcount;i++)
fd_tallystats(&total,&alarms[i].stats);
for(i=0;i<fdcount;i++)
fd_tallystats(&total,&fd_stats[fds[i].fd]);
/* Now show stats */
INFOF("servald time usage stats:");
fd_showstat(&total,&poll_stats,"Idle (in poll)");
for(i=0;i<alarmcount;i++) {
char desc[1024];
snprintf(desc,1024,"%s() alarm callback",fd_funcname(alarms[i].func));
fd_showstat(&total,&alarms[i].stats,desc);
}
for(i=0;i<fdcount;i++) {
char desc[1024];
snprintf(desc,1024,"%s() fd#%d %s%scallback",
fd_funcname(fd_functions[fds[i].fd]),fds[i].fd,
fds[i].events&POLLIN?"IN ":"",
fds[i].events&POLLOUT?"OUT ":"");
fd_showstat(&total,&fd_stats[fds[i].fd],desc);
}
fd_showstat(&total,&total,"TOTAL");
INFOF("servald function time statistics:");
for(i=0;i<func_count;i++)
if (called_funcs[i].calls)
fd_showstat(&total,&called_funcs[i],called_func_names[i]);
return 0;
}
void fd_periodicstats()
{
fd_showstats();
fd_clearstats();
}
int fd_next_funcid(const char *funcname)
{
if (func_count>=MAX_FUNCS) return MAX_FUNCS-1;
fd_clearstat(&called_funcs[func_count]);
called_func_names[func_count]=funcname;
return func_count++;
}
int fd_func_enter(int funcid)
{
if (call_stack_depth>=MAX_CALL_DEPTH) return 0;
call_stack[call_stack_depth].func_id=funcid;
call_stack[call_stack_depth].enter_time=overlay_gettime_ms();
call_stack[call_stack_depth].child_time=0;
call_stack_depth++;
return 0;
}
int fd_func_exit(int funcid)
{
if (funcid!=call_stack[call_stack_depth-1].func_id)
exit(WHYF("func_id mismatch: entered through %s(), but exited through %s()",
called_func_names[call_stack[call_stack_depth-1].func_id],
called_func_names[funcid]));
long long elapsed=overlay_gettime_ms()-call_stack[call_stack_depth-1].enter_time;
long long self_elapsed=elapsed-call_stack[call_stack_depth-1].child_time;
if (call_stack_depth>1) {
int d=call_stack_depth-2;
call_stack[d].child_time+=elapsed;
}
fd_update_stats(&called_funcs[funcid],self_elapsed);
call_stack_depth--;
return 0;
}

1
lsif.c
View File

@ -50,6 +50,7 @@
#if __MACH__
#include <net/if_dl.h>
#endif
#include <net/if.h>
/* On platforms that have variable length
ifreq use the old fixed length interface instead */

327
monitor.c
View File

@ -34,11 +34,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MONITOR_LINE_LENGTH 160
#define MONITOR_DATA_SIZE MAX_AUDIO_BYTES
struct monitor_context {
struct sched_ent alarm;
#define MONITOR_VOMP (1<<0)
#define MONITOR_RHIZOME (1<<1)
#define MONITOR_PEERS (1<<2)
int flags;
int socket;
char line[MONITOR_LINE_LENGTH];
int line_length;
#define MONITOR_STATE_COMMAND 1
@ -56,23 +56,21 @@ int monitor_socket_count=0;
struct monitor_context monitor_sockets[MAX_MONITOR_SOCKETS];
long long monitor_last_update_time=0;
int monitor_process_command(int index,char *cmd);
int monitor_process_data(int index);
int monitor_process_command(struct monitor_context *c);
int monitor_process_data(struct monitor_context *c);
static void monitor_new_client(int s);
int monitor_named_socket=-1;
struct sched_ent named_socket;
int monitor_setup_sockets()
{
struct sockaddr_un name;
int len;
int sock;
bzero(&name, sizeof(name));
name.sun_family = AF_UNIX;
if (monitor_named_socket!=-1)
return 0;
if ((monitor_named_socket = socket(AF_UNIX, SOCK_STREAM, 0))==-1) {
if ((sock = socket(AF_UNIX, SOCK_STREAM, 0))==-1) {
WHY_perror("socket");
goto error;
}
@ -96,38 +94,41 @@ int monitor_setup_sockets()
len = 1+strlen(name.sun_path) + sizeof(name.sun_family);
#endif
if(bind(monitor_named_socket, (struct sockaddr *)&name, len)==-1) {
if(bind(sock, (struct sockaddr *)&name, len)==-1) {
WHY_perror("bind");
goto error;
}
if(listen(monitor_named_socket,MAX_MONITOR_SOCKETS)==-1) {
if(listen(sock,MAX_MONITOR_SOCKETS)==-1) {
WHY_perror("listen");
goto error;
}
int reuseP=1;
if(setsockopt(monitor_named_socket, SOL_SOCKET, SO_REUSEADDR,
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
&reuseP, sizeof(reuseP)) < 0) {
WHY_perror("setsockopt");
WHY("Could not indicate reuse addresses. Not necessarily a problem (yet)");
}
int send_buffer_size=64*1024;
if(setsockopt(monitor_named_socket, SOL_SOCKET, SO_RCVBUF,
if(setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
&send_buffer_size, sizeof(send_buffer_size))==-1)
WHY_perror("setsockopt");
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO)) WHY("Monitor server socket setup");
fd_watch(monitor_named_socket,monitor_poll,POLLIN);
named_socket.function=monitor_poll;
named_socket.stats.name="monitor_poll";
named_socket.poll.fd=sock;
named_socket.poll.events=POLLIN;
watch(&named_socket);
return 0;
error:
fd_teardown(monitor_named_socket);
monitor_named_socket=-1;
close(sock);
return -1;
}
void monitor_poll(int ignored_fd)
void monitor_poll(struct sched_ent *alarm)
{
int s,i,m;
unsigned char buffer[1024];
@ -153,7 +154,7 @@ void monitor_poll(int ignored_fd)
/* And let far-end know that call is still alive */
snprintf(msg,sizeof(msg) -1,"\nKEEPALIVE:%06x\n", vomp_call_states[i].local.session);
for(m = 0;m < monitor_socket_count; m++)
WRITE_STR(monitor_sockets[m].socket,msg);
WRITE_STR(monitor_sockets[m].alarm.poll.fd,msg);
}
}
@ -162,9 +163,9 @@ void monitor_poll(int ignored_fd)
ignored_length = 0;
while (
#ifdef HAVE_LINUX_IF_H
(s = accept4(monitor_named_socket, NULL, &ignored_length,O_NONBLOCK))
(s = accept4(alarm->poll.fd, NULL, &ignored_length,O_NONBLOCK))
#else
(s = accept(monitor_named_socket,NULL, &ignored_length))
(s = accept(alarm->poll.fd,NULL, &ignored_length))
#endif
!= -1
) {
@ -174,107 +175,100 @@ void monitor_poll(int ignored_fd)
WHY_perror("accept");
}
void monitor_client_poll(int fd)
void monitor_client_close(struct monitor_context *c){
struct monitor_context *last;
unwatch(&c->alarm);
close(c->alarm.poll.fd);
c->alarm.poll.fd=-1;
monitor_socket_count--;
last = &monitor_sockets[monitor_socket_count];
if (last != c){
unwatch(&last->alarm);
bcopy(last, c,
sizeof(struct monitor_context));
watch(&c->alarm);
}
}
void monitor_client_poll(struct sched_ent *alarm)
{
/* Read from any open connections */
int i;
for(i = 0;i < monitor_socket_count; i++) {
nextInSameSlot:
errno=0;
int bytes;
struct monitor_context *c=&monitor_sockets[i];
if (c->socket!=fd) continue;
switch(c->state) {
case MONITOR_STATE_COMMAND:
bytes = 1;
while(bytes == 1) {
if (c->line_length >= MONITOR_LINE_LENGTH) {
/* line too long */
c->line[MONITOR_LINE_LENGTH-1] = 0;
monitor_process_command(i, c->line);
bytes = -1;
break;
}
bytes = read(c->socket, &c->line[c->line_length], 1);
if (bytes < 1) {
switch(errno) {
case EINTR:
case ENOTRECOVERABLE:
/* transient errors */
WHY_perror("read");
break;
case EAGAIN:
break;
default:
WHY_perror("read");
/* all other errors; close socket */
WHYF("Tearing down monitor client #%d due to errno=%d (%s)",
i,errno,strerror(errno)?strerror(errno):"<unknown error>");
fd_teardown(c->socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
bcopy(&monitor_sockets[monitor_socket_count-1],
&monitor_sockets[i],
sizeof(struct monitor_context));
monitor_socket_count--;
goto nextInSameSlot;
}
}
}
if (bytes > 0 && (c->line[c->line_length] != '\r')) {
c->line_length += bytes;
if (c->line[c->line_length-1] == '\n') {
/* got command */
c->line[c->line_length-1] = 0; /* trim new line for easier parsing */
monitor_process_command(i, c->line);
break;
}
}
struct monitor_context *c=(struct monitor_context *)alarm;
errno=0;
int bytes;
switch(c->state) {
case MONITOR_STATE_COMMAND:
bytes = 1;
while(bytes == 1) {
if (c->line_length >= MONITOR_LINE_LENGTH) {
/* line too long */
c->line[MONITOR_LINE_LENGTH-1] = 0;
monitor_process_command(c);
bytes = -1;
break;
}
break;
case MONITOR_STATE_DATA:
bytes = read(c->socket,
&c->buffer[c->data_offset],
c->data_expected-c->data_offset);
bytes = read(c->alarm.poll.fd, &c->line[c->line_length], 1);
if (bytes < 1) {
switch(errno) {
case EAGAIN: case EINTR:
case EINTR:
case ENOTRECOVERABLE:
/* transient errors */
WHY_perror("read");
break;
case EAGAIN:
break;
default:
WHY_perror("read");
/* all other errors; close socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
fd_teardown(c->socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
bcopy(&monitor_sockets[monitor_socket_count - 1],
&monitor_sockets[i],
sizeof(struct monitor_context));
monitor_socket_count--;
goto nextInSameSlot;
}
WHYF("Tearing down monitor client due to errno=%d (%s)",
errno,strerror(errno)?strerror(errno):"<unknown error>");
monitor_client_close(c);
return;
}
} else {
c->data_offset += bytes;
if (c->data_offset >= c->data_expected)
{
/* we have the binary data we were expecting. */
monitor_process_data(i);
c->state = MONITOR_STATE_COMMAND;
}
}
break;
default:
c->state = MONITOR_STATE_COMMAND;
WHY("fixed monitor connection state");
if (bytes > 0 && (c->line[c->line_length] != '\r')) {
c->line_length += bytes;
if (c->line[c->line_length-1] == '\n') {
/* got command */
c->line[c->line_length-1] = 0; /* trim new line for easier parsing */
monitor_process_command(c);
break;
}
}
}
break;
case MONITOR_STATE_DATA:
bytes = read(c->alarm.poll.fd,
&c->buffer[c->data_offset],
c->data_expected-c->data_offset);
if (bytes < 1) {
switch(errno) {
case EAGAIN: case EINTR:
/* transient errors */
break;
default:
/* all other errors; close socket */
WHYF("Tearing down monitor client due to errno=%d",
errno);
monitor_client_close(c);
return;
}
} else {
c->data_offset += bytes;
if (c->data_offset >= c->data_expected)
{
/* we have the binary data we were expecting. */
monitor_process_data(c);
c->state = MONITOR_STATE_COMMAND;
}
}
break;
default:
c->state = MONITOR_STATE_COMMAND;
WHY("fixed monitor connection state");
}
return;
}
@ -315,21 +309,23 @@ static void monitor_new_client(int s) {
WHYF("monitor.socket client has wrong uid (%d versus %d)", otheruid,getuid());
WRITE_STR(s, "\nCLOSE:Incorrect UID\n");
goto error;
} else if (monitor_socket_count >= MAX_MONITOR_SOCKETS
}
if (monitor_socket_count >= MAX_MONITOR_SOCKETS
||monitor_socket_count < 0) {
WRITE_STR(s, "\nCLOSE:All sockets busy\n");
goto error;
} else {
c = &monitor_sockets[monitor_socket_count];
c->socket = s;
c->line_length = 0;
c->state = MONITOR_STATE_COMMAND;
monitor_socket_count++;
WRITE_STR(s,"\nMONITOR:You are talking to servald\n");
INFOF("Got %d clients", monitor_socket_count);
}
fd_watch(s, monitor_client_poll, POLLIN);
c = &monitor_sockets[monitor_socket_count++];
c->alarm.function = monitor_client_poll;
c->alarm.stats.name="monitor_client_poll";
c->alarm.poll.fd = s;
c->alarm.poll.events=POLLIN;
c->line_length = 0;
c->state = MONITOR_STATE_COMMAND;
WRITE_STR(s,"\nMONITOR:You are talking to servald\n");
INFOF("Got %d clients", monitor_socket_count);
watch(&c->alarm);
return;
@ -338,22 +334,21 @@ static void monitor_new_client(int s) {
return;
}
int monitor_process_command(int index,char *cmd)
int monitor_process_command(struct monitor_context *c)
{
int callSessionToken,sampleType,bytes;
char sid[MONITOR_LINE_LENGTH],localDid[MONITOR_LINE_LENGTH];
char remoteDid[MONITOR_LINE_LENGTH],digits[MONITOR_LINE_LENGTH];
overlay_mdp_frame mdp;
char *cmd = c->line;
IN();
mdp.packetTypeAndFlags=MDP_VOMPEVENT;
struct monitor_context *c=&monitor_sockets[index];
c->line_length=0;
if (strlen(cmd)>MONITOR_LINE_LENGTH) {
WRITE_STR(c->socket,"\nERROR:Command too long\n");
WRITE_STR(c->alarm.poll.fd,"\nERROR:Command too long\n");
RETURN(-1);
}
@ -423,7 +418,7 @@ int monitor_process_command(int index,char *cmd)
int cn=0,in=0,kp=0;
if(!keyring_next_identity(keyring,&cn,&in,&kp))
{
WRITE_STR(c->socket,"\nERROR:no local identity, so cannot place call\n");
WRITE_STR(c->alarm.poll.fd,"\nERROR:no local identity, so cannot place call\n");
}
else {
bcopy(keyring->contexts[cn]->identities[in]
@ -464,7 +459,7 @@ int monitor_process_command(int index,char *cmd)
int digit=vomp_parse_dtmf_digit(digits[i]);
if (digit<0) {
snprintf(msg,1024,"\nERROR: invalid DTMF digit 0x%02x\n",digit);
WRITE_STR(c->socket,msg);
WRITE_STR(c->alarm.poll.fd,msg);
}
mdp.vompevent.audio_bytes[mdp.vompevent.audio_sample_bytes]
=(digit<<4); /* 80ms standard tone duration, so that it is a multiple
@ -476,15 +471,14 @@ int monitor_process_command(int index,char *cmd)
}
snprintf(msg,1024,"\nMONITORSTATUS:%d\n",c->flags);
WRITE_STR(c->socket,msg);
WRITE_STR(c->alarm.poll.fd,msg);
RETURN(0);
}
int monitor_process_data(int index)
int monitor_process_data(struct monitor_context *c)
{
/* Called when we have received an entire data sample */
struct monitor_context *c=&monitor_sockets[index];
c->state=MONITOR_STATE_COMMAND;
if (vomp_sample_size(c->sample_codec)!=c->data_offset)
@ -494,7 +488,7 @@ int monitor_process_data(int index)
vomp_call_state *call=vomp_find_call_by_session(c->sample_call_session_token);
if (!call) {
WRITE_STR(c->socket,"\nERROR:No such call\n");
WRITE_STR(c->alarm.poll.fd,"\nERROR:No such call\n");
return -1;
}
@ -528,35 +522,24 @@ int monitor_announce_bundle(rhizome_manifest *m)
sender,
recipient,
m->dataFileName?m->dataFileName:"");
for(i=0;i<monitor_socket_count;i++)
for(i=monitor_socket_count -1;i>=0;i--)
{
if (!(monitor_sockets[i].flags&MONITOR_RHIZOME))
continue;
nextInSameSlot:
errno=0;
errno=0;
SET_NONBLOCKING(monitor_sockets[i].socket);
WRITE_STR(monitor_sockets[i].socket,msg);
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
WRITE_STR(monitor_sockets[i].alarm.poll.fd,msg);
SET_BLOCKING(monitor_sockets[i].socket);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
fd_teardown(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
bcopy(&monitor_sockets[monitor_socket_count-1],
&monitor_sockets[i],
sizeof(struct monitor_context));
monitor_socket_count--;
goto nextInSameSlot;
}
}
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client due to errno=%d",
errno);
monitor_client_close(&monitor_sockets[i]);
}
}
return 0;
}
@ -580,30 +563,19 @@ int monitor_call_status(vomp_call_state *call)
overlay_render_sid(call->remote.sid),
call->local.did,call->remote.did);
msg[1023]=0;
for(i=0;i<monitor_socket_count;i++)
for(i=monitor_socket_count -1;i>=0;i--)
{
if (!(monitor_sockets[i].flags&MONITOR_VOMP))
continue;
nextInSameSlot:
errno=0;
SET_NONBLOCKING(monitor_sockets[i].socket);
WRITE_STR(monitor_sockets[i].socket,msg);
SET_BLOCKING(monitor_sockets[i].socket);
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
WRITE_STR(monitor_sockets[i].alarm.poll.fd,msg);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
fd_teardown(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
bcopy(&monitor_sockets[monitor_socket_count-1],
&monitor_sockets[i],
sizeof(struct monitor_context));
monitor_socket_count--;
goto nextInSameSlot;
}
monitor_client_close(&monitor_sockets[i]);
}
}
}
@ -647,31 +619,20 @@ int monitor_tell_clients(char *msg, int msglen, int mask)
{
int i;
IN();
for(i=0;i<monitor_socket_count;i++)
for(i=monitor_socket_count -1;i>=0;i--)
{
if (!(monitor_sockets[i].flags&mask))
continue;
nextInSameSlot:
errno=0;
SET_NONBLOCKING(monitor_sockets[i].socket);
write(monitor_sockets[i].socket, msg, msglen);
SET_BLOCKING(monitor_sockets[i].socket);
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
write(monitor_sockets[i].alarm.poll.fd, msg, msglen);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
// WHYF("Writing AUDIOPACKET to client");
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
i,errno);
fd_teardown(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
bcopy(&monitor_sockets[monitor_socket_count-1],
&monitor_sockets[i],
sizeof(struct monitor_context));
monitor_socket_count--;
goto nextInSameSlot;
}
monitor_client_close(&monitor_sockets[i]);
}
}
RETURN(0);

View File

@ -112,6 +112,11 @@ int overlayServerMode()
/* Create structures to use 1MB of RAM for testing */
overlay_route_init(1);
#define SCHEDULE(X, Y) struct sched_ent _sched_##X; bzero(&_sched_##X, sizeof(struct sched_ent)); _sched_##X.function=X;_sched_##X.stats.name="" #X "";_sched_##X.alarm=overlay_gettime_ms()+Y; schedule(&_sched_##X);
/* Periodically check for server shut down */
SCHEDULE(server_shutdown_check, 0);
/* Setup up MDP & monitor interface unix domain sockets */
overlay_mdp_setup_sockets();
monitor_setup_sockets();
@ -119,32 +124,30 @@ int overlayServerMode()
/* Get rhizome server started BEFORE populating fd list so that
the server's listen socket is in the list for poll() */
if (rhizome_enabled()) rhizome_server_start();
/* Pick next rhizome files to grab every few seconds
from the priority list continuously being built from observed
bundle announcements */
fd_setalarm(rhizome_enqueue_suggestions,3000,3000);
SCHEDULE(rhizome_enqueue_suggestions, 3000);
/* Periodically check for new interfaces */
fd_setalarm(overlay_interface_discover,1,5000);
/* Periodically check for server shut down */
fd_setalarm(server_shutdown_check,1,1000);
/* Periodically update route table.
(Alarm interval is dynamically updated by overlay_route_tick()
based on load/route table size etc) */
fd_setalarm(overlay_route_tick,1000,1000);
SCHEDULE(overlay_interface_discover, 1);
/* Start scheduling interface ticks */
fd_setalarm(overlay_check_ticks,1,500);
SCHEDULE(overlay_check_ticks, 2);
/* Periodically update route table. */
SCHEDULE(overlay_route_tick, 100);
/* Keep an eye on VoMP calls so that we can expire stale ones etc */
fd_setalarm(vomp_tick,1000,1000);
SCHEDULE(vomp_tick, 1000);
/* Show CPU usage stats periodically */
fd_setalarm(fd_periodicstats,3000,3000);
SCHEDULE(fd_periodicstats, 3000);
while(1) {
#undef SCHEDULE
while(1) {
/* Check for activitiy and respond to it */
fd_poll();
}
@ -152,7 +155,7 @@ int overlayServerMode()
return 0;
}
int overlay_frame_process(int interface,overlay_frame *f)
int overlay_frame_process(struct overlay_interface *interface,overlay_frame *f)
{
IN();
if (!f) RETURN(WHY("f==NULL"));
@ -188,13 +191,13 @@ int overlay_frame_process(int interface,overlay_frame *f)
break;
case OA_UNSUPPORTED:
default:
// TODO tell the sender
/* If we don't support the address format, we should probably tell
the sender. Again, we queue this up, and cancel it if someone else
tells them in the meantime to avoid an Opposition Event (like a Hanson
Event, but repeatedly berating any node that holds a different policy
to itself. */
WHY("Packet with unsupported address format");
overlay_interface_repeat_abbreviation_policy[interface]=1;
RETURN(-1);
break;
}
@ -214,7 +217,7 @@ int overlay_frame_process(int interface,overlay_frame *f)
if (forMe) {
/* It's for us, so resolve the addresses */
if (overlay_frame_resolve_addresses(interface,f))
if (overlay_frame_resolve_addresses(f))
RETURN(WHY("Failed to resolve destination and sender addresses in frame"));
broadcast=overlay_address_is_broadcast(f->destination);
if (debug&DEBUG_OVERLAYFRAMES) {
@ -355,19 +358,20 @@ int overlay_frame_process(int interface,overlay_frame *f)
}
}
int id = (interface - overlay_interfaces);
switch(f->type)
{
case OF_TYPE_SELFANNOUNCE:
overlay_route_saw_selfannounce(interface,f,now);
overlay_route_saw_selfannounce(f,now);
break;
case OF_TYPE_SELFANNOUNCE_ACK:
overlay_route_saw_selfannounce_ack(interface,f,now);
overlay_route_saw_selfannounce_ack(f,now);
break;
case OF_TYPE_NODEANNOUNCE:
overlay_route_saw_advertisements(interface,f,now);
overlay_route_saw_advertisements(id,f,now);
break;
case OF_TYPE_RHIZOME_ADVERT:
overlay_rhizome_saw_advertisements(interface,f,now);
overlay_rhizome_saw_advertisements(id,f,now);
break;
case OF_TYPE_DATA:
case OF_TYPE_DATA_VOICE:
@ -380,7 +384,7 @@ int overlay_frame_process(int interface,overlay_frame *f)
dump("payload",&f->payload->bytes[0],f->payload->length);
fflush(stdout);
}
overlay_saw_mdp_containing_frame(interface,f,now);
overlay_saw_mdp_containing_frame(f,now);
break;
default:
fprintf(stderr,"Unsupported f->type=0x%x\n",f->type);

View File

@ -329,10 +329,10 @@ int overlay_abbreviate_address(unsigned char *in,unsigned char *out,int *ofs)
}
int overlay_abbreviate_expand_address(int interface,unsigned char *in,int *inofs,unsigned char *out,int *ofs)
int overlay_abbreviate_expand_address(unsigned char *in,int *inofs,unsigned char *out,int *ofs)
{
int bytes=0,r;
if (debug&DEBUG_OVERLAYABBREVIATIONS) fprintf(stderr,"Address first byte/abbreviation code=%02x (input offset=%d)\n",in[*inofs],*inofs);
if (debug&DEBUG_OVERLAYABBREVIATIONS) DEBUGF("Address first byte/abbreviation code=%02x (input offset=%d)\n",in[*inofs],*inofs);
switch(in[*inofs])
{
case OA_CODE_02: case OA_CODE_04: case OA_CODE_0C:
@ -345,14 +345,14 @@ int overlay_abbreviate_expand_address(int interface,unsigned char *in,int *inofs
selfannounce in this packet. Naturally it cannot be
used to encode the sender's address there ;) */
(*inofs)++;
if (debug&DEBUG_OVERLAYABBREVIATIONS) fprintf(stderr,"Resolving OA_CODE_SELF.\n");
if (debug&DEBUG_OVERLAYABBREVIATIONS) DEBUGF("Resolving OA_CODE_SELF.\n");
if (overlay_abbreviate_current_sender_set) {
bcopy(&overlay_abbreviate_current_sender.b[0],&out[*ofs],SID_SIZE);
overlay_abbreviate_set_most_recent_address(&out[*ofs]);
(*ofs)+=SID_SIZE;
return OA_RESOLVED;
} else {
if (debug&DEBUG_OVERLAYABBREVIATIONS) fprintf(stderr,"Cannot resolve OA_CODE_SELF until we can resolve sender's address.\n");
if (debug&DEBUG_OVERLAYABBREVIATIONS) DEBUGF("Cannot resolve OA_CODE_SELF until we can resolve sender's address.\n");
return OA_UNINITIALISED;
}
case OA_CODE_INDEX: /* single byte index look up */

View File

@ -55,7 +55,7 @@ int overlay_route_please_advertise(overlay_node *n)
else return 1;
}
int overlay_route_add_advertisements(int interface,overlay_buffer *e)
int overlay_route_add_advertisements(overlay_buffer *e)
{
/* Construct a route advertisement frame and append it to e.

View File

@ -43,8 +43,7 @@ struct interface_rules *interface_filter=NULL;
unsigned int overlay_sequence_number=0;
/* Do we need to repeat our abbreviation policy? */
int overlay_interface_repeat_abbreviation_policy[OVERLAY_MAX_INTERFACES]={1};
long long overlay_next_tick();
/* Return milliseconds since server started. First call will always return zero.
Must use long long, not time_t, as time_t can be 32bits, which is too small for
@ -187,29 +186,28 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
I(broadcast_address) = broadcast;
I(fileP) = 0;
I(fd) = socket(PF_INET,SOCK_DGRAM,0);
fd_watch(I(fd),overlay_interface_poll,POLLIN);
WHYF("Watching fd#%d for interface #%d",I(fd),interface);
if (I(fd) < 0) {
I(alarm.poll.fd) = socket(PF_INET,SOCK_DGRAM,0);
if (I(alarm.poll.fd) < 0) {
WHY_perror("socket()");
WHYF("Could not create UDP socket for interface: %s",strerror(errno));
goto error;
} else
INFOF("interface #%d fd=%d",interface, I(fd));
INFOF("interface #%d fd=%d",interface, I(alarm.poll.fd));
int reuseP = 1;
if (setsockopt(I(fd), SOL_SOCKET, SO_REUSEADDR, &reuseP, sizeof(reuseP)) < 0) {
if (setsockopt(I(alarm.poll.fd), SOL_SOCKET, SO_REUSEADDR, &reuseP, sizeof(reuseP)) < 0) {
WHY_perror("setsockopt(SO_REUSEADR)");
goto error;
}
#ifdef SO_REUSEPORT
if (setsockopt(I(fd), SOL_SOCKET, SO_REUSEPORT, &reuseP, sizeof(reuseP)) < 0) {
if (setsockopt(I(alarm.poll.fd), SOL_SOCKET, SO_REUSEPORT, &reuseP, sizeof(reuseP)) < 0) {
WHY_perror("setsockopt(SO_REUSEPORT)");
goto error;
}
#endif
int broadcastP = 1;
if (setsockopt(I(fd), SOL_SOCKET, SO_BROADCAST, &broadcastP, sizeof(broadcastP)) < 0) {
if (setsockopt(I(alarm.poll.fd), SOL_SOCKET, SO_BROADCAST, &broadcastP, sizeof(broadcastP)) < 0) {
WHY_perror("setsockopt");
goto error;
}
@ -217,7 +215,7 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
/* Automatically close socket on calls to exec().
This makes life easier when we restart with an exec after receiving
a bad signal. */
fcntl(I(fd), F_SETFL, fcntl(I(fd), F_GETFL, NULL) | O_CLOEXEC);
fcntl(I(alarm.poll.fd), F_SETFL, fcntl(I(alarm.poll.fd), F_GETFL, NULL) | O_CLOEXEC);
/* @PGS/20120615
Use the broadcast address, so that we can reliably receive broadcast
@ -226,7 +224,7 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
*/
broadcast.sin_family = AF_INET;
broadcast.sin_port = htons(I(port));
if (bind(I(fd), (struct sockaddr *)&broadcast, sizeof(broadcast))) {
if (bind(I(alarm.poll.fd), (struct sockaddr *)&broadcast, sizeof(broadcast))) {
WHY_perror("bind");
WHY("MP HLR server could not bind to requested UDP port (bind() failed)");
goto error;
@ -234,11 +232,16 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
assert(inet_ntop(AF_INET, (const void *)&broadcast.sin_addr, srctxt, INET_ADDRSTRLEN) != NULL);
if (debug & (DEBUG_PACKETRX | DEBUG_IO)) INFOF("Bound to %s:%d", srctxt, ntohs(broadcast.sin_port));
I(alarm.poll.events)=POLLIN;
I(alarm.function) = overlay_interface_poll;
I(alarm.stats.name)="overlay_interface_poll";
watch(&I(alarm));
return 0;
error:
fd_teardown(I(fd));
I(fd)=-1;
close(I(alarm.poll.fd));
I(alarm.poll.fd)=-1;
return -1;
#undef I
}
@ -256,14 +259,13 @@ int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockadd
/* Pick a reasonable default MTU.
This will ultimately get tuned by the bandwidth and other properties of the interface */
I(mtu)=1200;
I(observed)=1;
I(bits_per_second)=speed_in_bits;
I(port)=port;
I(type)=type;
I(tick_ms)=500;
I(last_tick_ms)=0;
I(fd)=0;
I(alarm.poll.fd)=0;
switch(type) {
case OVERLAY_INTERFACE_PACKETRADIO: I(tick_ms)=15000; break;
case OVERLAY_INTERFACE_ETHERNET: I(tick_ms)=500; break;
@ -281,151 +283,119 @@ int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockadd
if (!FORM_SERVAL_INSTANCE_PATH(dummyfile, &name[1]))
return WHY("could not form dummy interfance name");
if ((I(fd) = open(dummyfile,O_APPEND|O_RDWR)) < 1) {
if ((I(alarm.poll.fd) = open(dummyfile,O_APPEND|O_RDWR)) < 1) {
return WHY("could not open dummy interface file for append");
}
/* Seek to end of file as initial reading point */
I(offset)=lseek(I(fd),0,SEEK_END); /* socket gets reused to hold file offset */
I(offset)=lseek(I(alarm.poll.fd),0,SEEK_END); /* socket gets reused to hold file offset */
/* XXX later add pretend location information so that we can decide which "packets" to receive
based on closeness */
// schedule an alarm for this interface
I(alarm.function)=overlay_dummy_poll;
I(alarm.alarm)=overlay_gettime_ms()+10;
I(alarm.stats.name)="overlay_dummy_poll";
schedule(&I(alarm));
} else {
if (overlay_interface_init_socket(overlay_interface_count,src_addr,broadcast))
return WHY("overlay_interface_init_socket() failed");
}
overlay_interface_count++;
fd_setalarm(overlay_dummy_poll,10,10);
#undef I
return 0;
}
void overlay_interface_poll(int fd)
void overlay_interface_poll(struct sched_ent *alarm)
{
int i;
struct overlay_interface *interface = (overlay_interface *)alarm;
int plen=0;
unsigned char packet[16384];
for(i=0;i<overlay_interface_count;i++)
{
struct sockaddr src_addr;
unsigned int addrlen=sizeof(src_addr);
if (overlay_interfaces[i].fd!=fd) continue;
/* Read from UDP socket */
plen=1;
/* Read only one packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
// while (plen>0) {
int recvttl=1;
plen=recvwithttl(overlay_interfaces[i].fd,packet,sizeof(packet),
&recvttl,&src_addr,&addrlen);
if (plen<1) {
/* No more packets */
return;
} else {
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,"Read from real interface",
packet,plen);
fflush(stderr);
}
if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d (%s)\n",plen,i,overlay_interfaces[i].name);
if (packetOk(i,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
serval_packetvisualise(stderr,"Malformed packet", packet,plen);
}
}
// }
return;
struct sockaddr src_addr;
unsigned int addrlen=sizeof(src_addr);
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
int recvttl=1;
plen=recvwithttl(alarm->poll.fd,packet,sizeof(packet),
&recvttl,&src_addr,&addrlen);
if (plen<1) {
return;
} else {
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,"Read from real interface",
packet,plen);
fflush(stderr);
}
if (debug&DEBUG_OVERLAYINTERFACES)DEBUGF("Received %d bytes on interface %s\n",plen,interface->name);
if (packetOk(interface,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
serval_packetvisualise(stderr,"Malformed packet", packet,plen);
}
}
return;
}
void overlay_dummy_poll()
void overlay_dummy_poll(struct sched_ent *alarm)
{
int i;
overlay_interface *interface = (overlay_interface *)alarm;
/* Grab packets, unpackage and dispatch frames to consumers */
/* XXX Okay, so how are we managing out-of-process consumers?
They need some way to register their interest in listening to a port.
*/
unsigned char packet[16384];
int plen=0;
int c[OVERLAY_MAX_INTERFACES];
int count=1;
int dummys=0;
struct sockaddr src_addr;
unsigned int addrlen=sizeof(src_addr);
unsigned char transaction_id[8];
/* Check for input on any dummy interfaces that are attached to ordinary
files. We have to do it this way, because poll() says that ordinary
files are always ready for reading, even if at EOF.
Also, make sure we don't spend too much time here */
int now = overlay_gettime_ms();
while(count>0)
/* Read from dummy interface file */
long long length=lseek(alarm->poll.fd,0,SEEK_END);
if (interface->offset>=length)
{
count=0;
for(i=0;i<overlay_interface_count;i++)
{
struct sockaddr src_addr;
unsigned int addrlen=sizeof(src_addr);
unsigned char transaction_id[8];
if (overlay_interfaces[i].fileP) {
dummys++;
/* Read from dummy interface file */
overlay_last_interface_number=i;
long long length=lseek(overlay_interfaces[i].fd,0,SEEK_END);
if (overlay_interfaces[i].offset>=length)
{
if (debug&DEBUG_OVERLAYINTERFACES)
fprintf(stderr,"At end of input on dummy interface #%d\n",i);
}
else
{
lseek(overlay_interfaces[i].fd,overlay_interfaces[i].offset,SEEK_SET);
if (debug&DEBUG_OVERLAYINTERFACES)
fprintf(stderr,"Reading from interface #%d log at offset %d, end of file at %lld.\n",i,
overlay_interfaces[i].offset,length);
if (read(overlay_interfaces[i].fd,&packet[0],2048)==2048)
{
overlay_interfaces[i].offset+=2048;
plen=2048-128;
plen=packet[110]+(packet[111]<<8);
if (plen>(2048-128)) plen=-1;
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,
"Read from dummy interface",
&packet[128],plen);
fflush(stderr);
}
bzero(&transaction_id[0],8);
bzero(&src_addr,sizeof(src_addr));
if ((plen>=0)&&(packet[0]==0x01)&&!(packet[1]|packet[2]|packet[3])) {
{ if (packetOk(i,&packet[128],plen,transaction_id,
-1 /* fake TTL */,
&src_addr,addrlen,1))
WHY("Malformed or unsupported packet from dummy interface (packetOK() failed)"); } }
else WHY("Invalid packet version in dummy interface");
}
else {
if (debug&DEBUG_IO) fprintf(stderr,"Read NOTHING from dummy interface\n");
c[i]=0; count--;
}
}
} else {
}
}
/* Don't sit here forever, or else we will never send any packets */
if (overlay_gettime_ms()>(now+10)) break;
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("At end of input on dummy interface %s\n",interface->name);
}
/* Stop watching dummy nets if there are none active */
if (!dummys) fd_setalarm(overlay_dummy_poll,0,0);
else
{
lseek(alarm->poll.fd,interface->offset,SEEK_SET);
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Reading from interface %s log at offset %d, end of file at %lld.\n",interface->name,
interface->offset,length);
if (read(alarm->poll.fd,&packet[0],2048)==2048)
{
interface->offset+=2048;
plen=2048-128;
plen=packet[110]+(packet[111]<<8);
if (plen>(2048-128)) plen=-1;
if (debug&DEBUG_PACKETRX) {
fflush(stdout);
serval_packetvisualise(stderr,
"Read from dummy interface",
&packet[128],plen);
fflush(stderr);
}
bzero(&transaction_id[0],8);
bzero(&src_addr,sizeof(src_addr));
if ((plen>=0)&&(packet[0]==0x01)&&!(packet[1]|packet[2]|packet[3])) {
{ if (packetOk(interface,&packet[128],plen,transaction_id,
-1 /* fake TTL */,
&src_addr,addrlen,1))
WHY("Malformed or unsupported packet from dummy interface (packetOK() failed)"); } }
else WHY("Invalid packet version in dummy interface");
}
else {
if (debug&DEBUG_IO) fprintf(stderr,"Read NOTHING from dummy interface\n");
}
}
alarm->alarm = overlay_gettime_ms()+10;
schedule(alarm);
return ;
}
@ -509,7 +479,7 @@ int overlay_broadcast_ensemble(int interface_number,
bzero(&buf[128+len],2048-(128+len));
bcopy(bytes,&buf[128],len);
if (write(overlay_interfaces[interface_number].fd,buf,2048)!=2048)
if (write(overlay_interfaces[interface_number].alarm.poll.fd,buf,2048)!=2048)
{
WHY_perror("write");
return WHY("write() failed");
@ -519,7 +489,8 @@ int overlay_broadcast_ensemble(int interface_number,
}
else
{
if(sendto(overlay_interfaces[interface_number].fd, bytes, len, 0, (struct sockaddr *)&s, sizeof(struct sockaddr_in)) != len)
if(sendto(overlay_interfaces[interface_number].alarm.poll.fd,
bytes, len, 0, (struct sockaddr *)&s, sizeof(struct sockaddr_in)) != len)
{
/* Failed to send */
WHY_perror("sendto(c)");
@ -593,8 +564,9 @@ overlay_interface_register(char *name,
overlay_interfaces[i].broadcast_address.sin_addr.s_addr,
local.sin_addr.s_addr,
broadcast.sin_addr.s_addr);
fd_teardown(overlay_interfaces[i].fd);
overlay_interfaces[i].fd = -1;
unwatch(&overlay_interfaces[i].alarm);
close(overlay_interfaces[i].alarm.poll.fd);
overlay_interfaces[i].alarm.poll.fd = -1;
if (overlay_interface_init_socket(i, local, broadcast))
INFOF("Could not reinitialise changed interface %s", name);
}
@ -610,7 +582,7 @@ overlay_interface_register(char *name,
return 0;
}
void overlay_interface_discover(void) {
void overlay_interface_discover(struct sched_ent *alarm){
int no_route, i;
struct interface_rules *r;
struct sockaddr_in dummyaddr;
@ -664,6 +636,8 @@ void overlay_interface_discover(void) {
FATAL("Unable to get any interface information");
}
alarm->alarm = overlay_gettime_ms()+5000;
schedule(alarm);
return;
}
@ -866,7 +840,7 @@ int overlay_tick_interface(int i, long long now)
/* Add advertisements for ROUTES not Rhizome bundles.
Rhizome bundle advertisements are lower priority */
overlay_route_add_advertisements(i,e);
overlay_route_add_advertisements(e);
ob_limitsize(e,overlay_interfaces[i].mtu);
@ -971,9 +945,7 @@ int overlay_tick_interface(int i, long long now)
}
}
void overlay_check_ticks(void) {
void overlay_check_ticks(struct sched_ent *alarm) {
/* Check if any interface(s) are due for a tick */
int i;
@ -1000,16 +972,17 @@ void overlay_check_ticks(void) {
}
/* Update interval until next tick */
fd_setalarm(overlay_check_ticks,overlay_time_until_next_tick(),500);
alarm->alarm = overlay_next_tick();
schedule(alarm);
return;
}
long long overlay_time_until_next_tick()
long long overlay_next_tick()
{
/* By default only tick once per day */
long long nexttick=86400*1000;
long long now=overlay_gettime_ms();
long long nexttick=86400*1000;
int i;
if (debug&DEBUG_VERBOSE_IO)fprintf(stderr,"Tick-check on %d interfaces at %lldms\n",overlay_interface_count,now);
@ -1032,7 +1005,7 @@ long long overlay_time_until_next_tick()
}
if (0) WHYF("Next tick required in %lldms",nexttick);
return nexttick;
return now + nexttick;
}
long long parse_quantity(char *q)

View File

@ -19,8 +19,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "serval.h"
#include <sys/stat.h>
int mdp_abstract_socket=-1;
int mdp_named_socket=-1;
struct sched_ent mdp_abstract;
struct sched_ent mdp_named;
int overlay_mdp_setup_sockets()
{
struct sockaddr_un name;
@ -31,9 +32,9 @@ int overlay_mdp_setup_sockets()
#ifndef HAVE_LINUX_IF_H
/* Abstrack name space (i.e., non-file represented) unix domain sockets are a
linux-only thing. */
mdp_abstract_socket = -1;
mdp_abstract.poll.fd = -1;
#else
if (mdp_abstract_socket==-1) {
if (mdp_abstract.function==NULL) {
/* Abstract name space unix sockets is a special Linux thing, which is
convenient for us because Android is Linux, but does not have a shared
writable path that is on a UFS partition, so we cannot use traditional
@ -46,17 +47,17 @@ int overlay_mdp_setup_sockets()
confValueGet("mdp.socket",DEFAULT_MDP_SOCKET_NAME));
len = 1+strlen(&name.sun_path[1]) + sizeof(name.sun_family);
mdp_abstract_socket = socket(AF_UNIX, SOCK_DGRAM, 0);
if (mdp_abstract_socket>-1) {
mdp_abstract.poll.fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (mdp_abstract.poll.fd>-1) {
int dud=0;
int reuseP=1;
if(setsockopt( mdp_abstract_socket, SOL_SOCKET, SO_REUSEADDR,
if(setsockopt( mdp_abstract.poll.fd, SOL_SOCKET, SO_REUSEADDR,
&reuseP, sizeof(reuseP)) < 0)
{
WARN_perror("setsockopt");
WARN("Could not indicate reuse addresses. Not necessarily a problem (yet)");
}
int r=bind(mdp_abstract_socket, (struct sockaddr *)&name, len);
int r=bind(mdp_abstract.poll.fd, (struct sockaddr *)&name, len);
if (r) {
WARN_perror("bind");
dud=1;
@ -64,49 +65,56 @@ int overlay_mdp_setup_sockets()
WARN("bind() of abstract name space socket failed (not an error on non-linux systems");
}
if (dud) {
close(mdp_abstract_socket);
mdp_abstract_socket=-1;
close(mdp_abstract.poll.fd);
mdp_abstract.poll.fd=-1;
WHY("Could not open abstract name-space socket (only a problem on Linux).");
}
int send_buffer_size=64*1024;
int res = setsockopt(mdp_abstract_socket, SOL_SOCKET, SO_SNDBUF,
int res = setsockopt(mdp_abstract.poll.fd, SOL_SOCKET, SO_SNDBUF,
&send_buffer_size, sizeof(send_buffer_size));
fd_watch(mdp_abstract_socket,overlay_mdp_poll,POLLIN);
mdp_abstract.function = overlay_mdp_poll;
mdp_abstract.stats.name = "overlay_mdp_poll";
mdp_abstract.poll.events = POLLIN;
watch(&mdp_abstract);
}
}
#endif
if (mdp_named_socket==-1) {
if (mdp_named.function==NULL) {
if (!form_serval_instance_path(&name.sun_path[0], 100, "mdp.socket")) {
return WHY("Cannot construct name of unix domain socket.");
}
unlink(&name.sun_path[0]);
len = 0+strlen(&name.sun_path[0]) + sizeof(name.sun_family)+1;
mdp_named_socket = socket(AF_UNIX, SOCK_DGRAM, 0);
if (mdp_named_socket>-1) {
mdp_named.poll.fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (mdp_named.poll.fd>-1) {
int dud=0;
int reuseP=1;
if(setsockopt( mdp_named_socket, SOL_SOCKET, SO_REUSEADDR,
if(setsockopt( mdp_named.poll.fd, SOL_SOCKET, SO_REUSEADDR,
&reuseP, sizeof(reuseP)) < 0)
{
WARN_perror("setsockopt");
WARN("Could not indicate reuse addresses. Not necessarily a problem (yet)");
}
int r=bind(mdp_named_socket, (struct sockaddr *)&name, len);
int r=bind(mdp_named.poll.fd, (struct sockaddr *)&name, len);
if (r) { dud=1; r=0; WHY("bind() of named unix domain socket failed"); }
if (dud) {
close(mdp_named_socket);
mdp_named_socket=-1;
close(mdp_named.poll.fd);
mdp_named.poll.fd=-1;
WHY("Could not open named unix domain socket.");
}
int send_buffer_size=64*1024;
int res = setsockopt(mdp_named_socket, SOL_SOCKET, SO_RCVBUF,
int res = setsockopt(mdp_named.poll.fd, SOL_SOCKET, SO_RCVBUF,
&send_buffer_size, sizeof(send_buffer_size));
if (res)
WHY_perror("setsockopt");
fd_watch(mdp_named_socket,overlay_mdp_poll,POLLIN);
mdp_named.function = overlay_mdp_poll;
mdp_named.stats.name = "overlay_mdp_poll";
mdp_named.poll.events = POLLIN;
watch(&mdp_named);
}
}
@ -367,7 +375,7 @@ unsigned char *overlay_mdp_decrypt(overlay_frame *f,overlay_mdp_frame *mdp,
RETURN(b);
}
int overlay_saw_mdp_containing_frame(int interface,overlay_frame *f,long long now)
int overlay_saw_mdp_containing_frame(overlay_frame *f,long long now)
{
IN();
/* Take frame source and destination and use them to populate mdp->in->{src,dst}
@ -404,7 +412,7 @@ int overlay_saw_mdp_containing_frame(int interface,overlay_frame *f,long long no
bcopy(&b[10],&mdp.in.payload[0],mdp.in.payload_length);
/* and do something with it! */
RETURN(overlay_saw_mdp_frame(interface,&mdp,now));
RETURN(overlay_saw_mdp_frame(&mdp,now));
}
int overlay_mdp_swap_src_dst(overlay_mdp_frame *mdp)
@ -416,7 +424,7 @@ int overlay_mdp_swap_src_dst(overlay_mdp_frame *mdp)
return 0;
}
int overlay_saw_mdp_frame(int interface, overlay_mdp_frame *mdp,long long now)
int overlay_saw_mdp_frame(overlay_mdp_frame *mdp,long long now)
{
IN();
int i;
@ -480,7 +488,7 @@ int overlay_saw_mdp_frame(int interface, overlay_mdp_frame *mdp,long long now)
addr.sun_family=AF_UNIX;
errno=0;
int len=overlay_mdp_relevant_bytes(mdp);
int r=sendto(mdp_named_socket,mdp,len,0,(struct sockaddr*)&addr,sizeof(addr));
int r=sendto(mdp_named.poll.fd,mdp,len,0,(struct sockaddr*)&addr,sizeof(addr));
if (r==overlay_mdp_relevant_bytes(mdp)) {
RETURN(0);
}
@ -713,7 +721,7 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
if (overlay_mdp_sanitytest_sourceaddr(&mdp->out.src,userGeneratedFrameP,
recvaddr,recvaddrlen))
RETURN(overlay_mdp_reply_error
(mdp_named_socket,
(mdp_named.poll.fd,
(struct sockaddr_un *)recvaddr,
recvaddrlen,8,
"Source address is invalid (you must bind to a source address before"
@ -724,8 +732,7 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
if (overlay_address_is_local(mdp->out.dst.sid)||broadcast)
{
/* Packet is addressed such that we should process it. */
overlay_saw_mdp_frame(-1 /* not received on a network interface */,
mdp,overlay_gettime_ms());
overlay_saw_mdp_frame(mdp,overlay_gettime_ms());
if (!broadcast) {
/* Is local, and is not broadcast, so shouldn't get sent out
on the wire. */
@ -738,7 +745,7 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
NaCl cryptobox keys can be used for signing. */
if (broadcast) {
if (!(mdp->packetTypeAndFlags&MDP_NOCRYPT))
RETURN(overlay_mdp_reply_error(mdp_named_socket,
RETURN(overlay_mdp_reply_error(mdp_named.poll.fd,
recvaddr,recvaddrlen,5,
"Broadcast packets cannot be encrypted ")); }
@ -951,7 +958,7 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP,
}
}
void overlay_mdp_poll()
void overlay_mdp_poll(struct sched_ent *alarm)
{
unsigned char buffer[16384];
int ttl;
@ -960,122 +967,114 @@ void overlay_mdp_poll()
socklen_t recvaddrlen=sizeof(recvaddrbuffer);
struct sockaddr_un *recvaddr_un=NULL;
if (mdp_named_socket>-1) {
ttl=-1;
bzero((void *)recvaddrbuffer,sizeof(recvaddrbuffer));
SET_NONBLOCKING(mdp_named_socket);
int len = recvwithttl(mdp_named_socket,buffer,sizeof(buffer),&ttl,
recvaddr,&recvaddrlen);
SET_BLOCKING(mdp_named_socket);
recvaddr_un=(struct sockaddr_un *)recvaddr;
ttl=-1;
bzero((void *)recvaddrbuffer,sizeof(recvaddrbuffer));
int len = recvwithttl(alarm->poll.fd,buffer,sizeof(buffer),&ttl,
recvaddr,&recvaddrlen);
recvaddr_un=(struct sockaddr_un *)recvaddr;
if (len>0) {
/* Look at overlay_mdp_frame we have received */
overlay_mdp_frame *mdp=(overlay_mdp_frame *)&buffer[0];
if (len>0) {
/* Look at overlay_mdp_frame we have received */
overlay_mdp_frame *mdp=(overlay_mdp_frame *)&buffer[0];
switch(mdp->packetTypeAndFlags&MDP_TYPE_MASK) {
case MDP_GOODBYE:
overlay_mdp_releasebindings(recvaddr_un,recvaddrlen);
return;
case MDP_VOMPEVENT:
vomp_mdp_event(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_NODEINFO:
overlay_route_node_info(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_GETADDRS:
{
overlay_mdp_frame mdpreply;
/* Work out which SIDs to get ... */
int sid_num=mdp->addrlist.first_sid;
int max_sid=mdp->addrlist.last_sid;
int max_sids=mdp->addrlist.frame_sid_count;
/* ... and constrain list for sanity */
if (sid_num<0) sid_num=0;
if (max_sids>MDP_MAX_SID_REQUEST) max_sids=MDP_MAX_SID_REQUEST;
if (max_sids<0) max_sids=0;
/* Prepare reply packet */
mdpreply.packetTypeAndFlags=MDP_ADDRLIST;
mdpreply.addrlist.first_sid=sid_num;
mdpreply.addrlist.last_sid=max_sid;
mdpreply.addrlist.frame_sid_count=max_sids;
/* Populate with SIDs */
int i=0;
int count=0;
if (mdp->addrlist.selfP) {
/* from self */
int cn=0,in=0,kp=0;
while(keyring_next_identity(keyring,&cn,&in,&kp)) {
if (count>=sid_num&&(i<max_sids))
bcopy(keyring->contexts[cn]->identities[in]
->keypairs[kp]->public_key,
mdpreply.addrlist.sids[i++],SID_SIZE);
in++; kp=0;
count++;
if (i>=max_sids) break;
}
} else {
/* from peer list */
int bin,slot;
i=0;
count=0;
for(bin=0;bin<overlay_bin_count;bin++)
for(slot=0;slot<overlay_bin_size;slot++)
{
if ((!overlay_nodes[bin][slot].sid[0])
||(overlay_nodes[bin][slot].best_link_score<1))
{
continue; }
if ((count>=sid_num)&&(i<max_sids)) {
bcopy(overlay_nodes[bin][slot].sid,
mdpreply.addrlist.sids[i++],SID_SIZE);
}
count++;
}
switch(mdp->packetTypeAndFlags&MDP_TYPE_MASK) {
case MDP_GOODBYE:
overlay_mdp_releasebindings(recvaddr_un,recvaddrlen);
return;
case MDP_VOMPEVENT:
vomp_mdp_event(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_NODEINFO:
overlay_route_node_info(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_GETADDRS:
{
overlay_mdp_frame mdpreply;
/* Work out which SIDs to get ... */
int sid_num=mdp->addrlist.first_sid;
int max_sid=mdp->addrlist.last_sid;
int max_sids=mdp->addrlist.frame_sid_count;
/* ... and constrain list for sanity */
if (sid_num<0) sid_num=0;
if (max_sids>MDP_MAX_SID_REQUEST) max_sids=MDP_MAX_SID_REQUEST;
if (max_sids<0) max_sids=0;
/* Prepare reply packet */
mdpreply.packetTypeAndFlags=MDP_ADDRLIST;
mdpreply.addrlist.first_sid=sid_num;
mdpreply.addrlist.last_sid=max_sid;
mdpreply.addrlist.frame_sid_count=max_sids;
/* Populate with SIDs */
int i=0;
int count=0;
if (mdp->addrlist.selfP) {
/* from self */
int cn=0,in=0,kp=0;
while(keyring_next_identity(keyring,&cn,&in,&kp)) {
if (count>=sid_num&&(i<max_sids))
bcopy(keyring->contexts[cn]->identities[in]
->keypairs[kp]->public_key,
mdpreply.addrlist.sids[i++],SID_SIZE);
in++; kp=0;
count++;
if (i>=max_sids) break;
}
mdpreply.addrlist.frame_sid_count=i;
mdpreply.addrlist.last_sid=sid_num+i-1;
mdpreply.addrlist.server_sid_count=count;
/* Send back to caller */
overlay_mdp_reply(mdp_named_socket,
(struct sockaddr_un *)recvaddr,recvaddrlen,
&mdpreply);
return;
} else {
/* from peer list */
int bin,slot;
i=0;
count=0;
for(bin=0;bin<overlay_bin_count;bin++)
for(slot=0;slot<overlay_bin_size;slot++)
{
if ((!overlay_nodes[bin][slot].sid[0])
||(overlay_nodes[bin][slot].best_link_score<1))
{
continue; }
if ((count>=sid_num)&&(i<max_sids)) {
bcopy(overlay_nodes[bin][slot].sid,
mdpreply.addrlist.sids[i++],SID_SIZE);
}
count++;
}
}
break;
case MDP_TX: /* Send payload (and don't treat it as system privileged) */
overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen);
mdpreply.addrlist.frame_sid_count=i;
mdpreply.addrlist.last_sid=sid_num+i-1;
mdpreply.addrlist.server_sid_count=count;
/* Send back to caller */
overlay_mdp_reply(alarm->poll.fd,
(struct sockaddr_un *)recvaddr,recvaddrlen,
&mdpreply);
return;
break;
case MDP_BIND: /* Bind to port */
overlay_mdp_process_bind_request(mdp_named_socket,mdp,
recvaddr_un,recvaddrlen);
return;
break;
default:
/* Client is not allowed to send any other frame type */
WHY("Illegal frame type.");
mdp->packetTypeAndFlags=MDP_ERROR;
mdp->error.error=2;
snprintf(mdp->error.message,128,"Illegal request type. Clients may use only MDP_TX or MDP_BIND.");
int len=4+4+strlen(mdp->error.message)+1;
errno=0;
/* We ignore the result of the following, because it is just sending an
error message back to the client. If this fails, where would we report
the error to? My point exactly. */
SET_NONBLOCKING(mdp_named_socket);
sendto(mdp_named_socket,mdp,len,0,(struct sockaddr *)recvaddr,recvaddrlen);
SET_BLOCKING(mdp_named_socket);
}
break;
case MDP_TX: /* Send payload (and don't treat it as system privileged) */
overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen);
return;
break;
case MDP_BIND: /* Bind to port */
overlay_mdp_process_bind_request(alarm->poll.fd,mdp,
recvaddr_un,recvaddrlen);
return;
break;
default:
/* Client is not allowed to send any other frame type */
WHY("Illegal frame type.");
mdp->packetTypeAndFlags=MDP_ERROR;
mdp->error.error=2;
snprintf(mdp->error.message,128,"Illegal request type. Clients may use only MDP_TX or MDP_BIND.");
int len=4+4+strlen(mdp->error.message)+1;
errno=0;
/* We ignore the result of the following, because it is just sending an
error message back to the client. If this fails, where would we report
the error to? My point exactly. */
sendto(alarm->poll.fd,mdp,len,0,(struct sockaddr *)recvaddr,recvaddrlen);
}
}
return;
}

View File

@ -26,7 +26,7 @@ struct sockaddr_in loopback = {
.sin_addr.s_addr=0x0100007f
};
int packetOkOverlay(int interface,unsigned char *packet,int len,
int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet,int len,
unsigned char *transaction_id,int recvttl,
struct sockaddr *recvaddr,int recvaddrlen,int parseP)
{
@ -106,7 +106,7 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,
if (recvaddr->sa_family==AF_INET)
f.recvaddr=recvaddr;
else {
if (overlay_interfaces[interface].fileP) {
if (interface->fileP) {
/* dummy interface, so tell to use 0.0.0.0 */
f.recvaddr=(struct sockaddr *)&loopback;
} else
@ -128,7 +128,7 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,
f.type=packet[ofs]&OF_TYPE_BITS;
f.modifiers=packet[ofs]&OF_MODIFIER_BITS;
if (debug&DEBUG_PACKETFORMATS) fprintf(stderr,"f.type=0x%02x, f.modifiers=0x%02x, ofs=%d\n",
if (debug&DEBUG_PACKETFORMATS) DEBUGF("f.type=0x%02x, f.modifiers=0x%02x, ofs=%d\n",
f.type,f.modifiers,ofs);
switch(packet[ofs]&OF_TYPE_BITS)
@ -167,7 +167,7 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,
/* Decode length of remainder of frame */
f.rfs=rfs_decode(packet,&ofs);
if (debug&DEBUG_PACKETFORMATS) fprintf(stderr,"f.rfs=%d, ofs=%d\n",f.rfs,ofs);
if (debug&DEBUG_PACKETFORMATS) DEBUGF("f.rfs=%d, ofs=%d\n",f.rfs,ofs);
if (!f.rfs) {
/* Zero length -- assume we fell off the end of the packet */
@ -177,10 +177,10 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,
/* Now extract the next hop address */
int alen=0;
int offset=ofs;
f.nexthop_address_status=overlay_abbreviate_expand_address(interface,packet,&offset,f.nexthop,&alen);
f.nexthop_address_status=overlay_abbreviate_expand_address(packet,&offset,f.nexthop,&alen);
if (debug&DEBUG_PACKETFORMATS) {
if (f.nexthop_address_status==OA_RESOLVED)
fprintf(stderr,"next hop address is %s\n",
DEBUGF("next hop address is %s\n",
overlay_render_sid(f.nexthop));
}
@ -193,7 +193,7 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,
if (f.bytecount<0) {
f.bytecount=0;
WHY("negative residual byte count after extracting addresses from frame header");
if (debug&DEBUG_PACKETFORMATS) fprintf(stderr,"f.rfs=%d, offset=%d, ofs=%d\n",
if (debug&DEBUG_PACKETFORMATS) DEBUGF("f.rfs=%d, offset=%d, ofs=%d\n",
f.rfs,offset,ofs);
return WHY("negative residual byte count after extracting addresses from frame header");
}
@ -215,16 +215,16 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,
return 0;
}
int overlay_frame_resolve_addresses(int interface,overlay_frame *f)
int overlay_frame_resolve_addresses(overlay_frame *f)
{
/* Get destination and source addresses and set pointers to payload appropriately */
int alen=0;
int offset=0;
overlay_abbreviate_set_most_recent_address(f->nexthop);
f->destination_address_status=overlay_abbreviate_expand_address(interface,f->bytes,&offset,f->destination,&alen);
f->destination_address_status=overlay_abbreviate_expand_address(f->bytes,&offset,f->destination,&alen);
alen=0;
f->source_address_status=overlay_abbreviate_expand_address(interface,f->bytes,&offset,f->source,&alen);
f->source_address_status=overlay_abbreviate_expand_address(f->bytes,&offset,f->source,&alen);
if (debug&DEBUG_OVERLAYABBREVIATIONS)
{
fprintf(stderr,"Wrote %d bytes into source address: \n",alen);

View File

@ -694,7 +694,7 @@ overlay_neighbour *overlay_route_get_neighbour_structure(unsigned char *packed_s
int overlay_route_i_can_hear_node(unsigned char *who,int sender_interface,
unsigned int s1,unsigned int s2,
int receiver_interface,long long now)
long long now)
{
if (0) WHYF("I can hear node %s (but I really only care who can hear me)",
overlay_render_sid(who));
@ -789,7 +789,7 @@ int overlay_print_address(FILE *f,char *prefix,unsigned char *s,char *suffix)
}
int overlay_route_saw_selfannounce(int interface,overlay_frame *f,long long now)
int overlay_route_saw_selfannounce(overlay_frame *f,long long now)
{
if (overlay_address_is_local(f->source)) return 0;
@ -818,7 +818,7 @@ int overlay_route_saw_selfannounce(int interface,overlay_frame *f,long long now)
dump("Payload",&f->payload->bytes[0],f->payload->length);
}
overlay_route_i_can_hear_node(f->source,sender_interface,s1,s2,interface,now);
overlay_route_i_can_hear_node(f->source,sender_interface,s1,s2,now);
/* Ignore self-announcements from ourself. */
if (overlay_address_is_local(&f->source[0]))
@ -1059,7 +1059,7 @@ char *overlay_render_sid_prefix(unsigned char *sid,int l)
These link scores should get stored in our node list as compared to our neighbour list,
with the node itself listed as the nexthop that the score is associated with.
*/
int overlay_route_saw_selfannounce_ack(int interface,overlay_frame *f,long long now)
int overlay_route_saw_selfannounce_ack(overlay_frame *f,long long now)
{
if (0) WHYF("processing selfannounce ack (payload length=%d)",f->payload->length);
if (!overlay_neighbours) {
@ -1259,14 +1259,14 @@ int overlay_route_tick_next_neighbour_id=0;
int overlay_route_tick_neighbour_bundle_size=1;
int overlay_route_tick_next_node_bin_id=0;
int overlay_route_tick_node_bundle_size=1;
void overlay_route_tick()
void overlay_route_tick(struct sched_ent *alarm)
{
int n;
long long start_time=overlay_gettime_ms();
if (debug&DEBUG_OVERLAYROUTING)
fprintf(stderr,"Neighbours: %d@%d, Nodes: %d@%d\n",
DEBUGF("Neighbours: %d@%d, Nodes: %d@%d\n",
overlay_route_tick_neighbour_bundle_size,overlay_route_tick_next_neighbour_id,
overlay_route_tick_node_bundle_size,overlay_route_tick_next_node_bin_id);
@ -1328,8 +1328,8 @@ void overlay_route_tick()
if (debug&DEBUG_OVERLAYROUTING) fprintf(stderr,"route tick interval = %dms (%d ticks per 5sec, neigh=%lldms, node=%lldms)\n",interval,ticks,neighbour_time,node_time);
/* Update callback interval based on how much work we have to do */
fd_setalarm(overlay_route_tick,interval,interval);
alarm->alarm = overlay_gettime_ms()+interval;
schedule(alarm);
return;
}
@ -1502,7 +1502,5 @@ int overlay_route_node_info(overlay_mdp_frame *mdp,
}
}
return overlay_mdp_reply(mdp_named_socket,addr,addrlen,mdp);
return 0;
return overlay_mdp_reply(mdp_named.poll.fd,addr,addrlen,mdp);
}

View File

@ -59,7 +59,7 @@ int process_packet(unsigned char *packet,int len,
return 0;
}
int packetOk(int interface,unsigned char *packet,int len,
int packetOk(struct overlay_interface *interface, unsigned char *packet,int len,
unsigned char *transaction_id,int ttl,
struct sockaddr *recvaddr,int recvaddrlen,int parseP)
{
@ -72,7 +72,7 @@ int packetOk(int interface,unsigned char *packet,int len,
if (packet[0]==0x4F&&packet[1]==0x10)
{
if (interface>-1)
if (interface!=NULL)
{
return packetOkOverlay(interface,packet,len,transaction_id,ttl,
recvaddr,recvaddrlen,parseP);

130
performance_timing.c Normal file
View File

@ -0,0 +1,130 @@
/*
Serval Distributed Numbering Architecture (DNA)
Copyright (C) 2012 Paul Gardner-Stephen
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "serval.h"
struct callback_stats *stats_head=NULL;
struct call_stats *current_call=NULL;
void fd_clearstat(struct callback_stats *s){
s->max_time = 0;
s->total_time = 0;
s->calls = 0;
}
void fd_update_stats(struct callback_stats *s,long long elapsed)
{
s->total_time+=elapsed;
if (elapsed>s->max_time) s->max_time=elapsed;
s->calls++;
}
int fd_tallystats(struct callback_stats *total,struct callback_stats *a)
{
total->total_time+=a->total_time;
total->calls+=a->calls;
if (a->max_time>total->max_time) total->max_time=a->max_time;
return 0;
}
int fd_showstat(struct callback_stats *total, struct callback_stats *a)
{
INFOF("%lldms (%2.1f%%) in %d calls (max %lldms, avg %.1fms) : %s",
a->total_time,a->total_time*100.0/total->total_time,
a->calls,
a->max_time,a->total_time*1.00/a->calls,
a->name);
return 0;
}
int fd_clearstats()
{
struct callback_stats *stats = stats_head;
while(stats!=NULL){
fd_clearstat(stats);
stats = stats->_next;
}
return 0;
}
int fd_showstats()
{
struct callback_stats total={NULL, 0, "Total", 0,0,0};
struct callback_stats *stats = stats_head;
while(stats!=NULL){
/* Get total time spent doing everything */
fd_tallystats(&total,stats);
stats = stats->_next;
}
INFOF("servald time usage stats:");
stats = stats_head;
while(stats!=NULL){
/* Get total time spent doing everything */
if (stats->calls)
fd_showstat(&total,stats);
stats = stats->_next;
}
fd_showstat(&total,&total);
return 0;
}
void fd_periodicstats(struct sched_ent *alarm)
{
fd_showstats();
fd_clearstats();
alarm->alarm = overlay_gettime_ms()+3000;
schedule(alarm);
}
int fd_func_enter(struct call_stats *this_call)
{
this_call->enter_time=overlay_gettime_ms();
this_call->child_time=0;
this_call->prev = current_call;
current_call = this_call;
return 0;
}
int fd_func_exit(struct call_stats *this_call, struct callback_stats *aggregate_stats)
{
if (current_call != this_call)
WHYF("stack mismatch, exited through %s()",aggregate_stats->name);
long long now = overlay_gettime_ms();
long long elapsed=now - this_call->enter_time;
current_call = this_call->prev;
if (current_call)
current_call->child_time+=elapsed;
fd_update_stats(aggregate_stats, (elapsed - this_call->child_time));
if (!aggregate_stats->_initialised){
aggregate_stats->_initialised=1;
aggregate_stats->_next = stats_head;
stats_head = aggregate_stats;
}
return 0;
}

View File

@ -37,56 +37,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
extern long long rhizome_voice_timeout;
typedef struct rhizome_http_request {
int socket;
long long last_activity; /* time of last activity in ms */
long long initiate_time; /* time connection was initiated */
/* The HTTP request as currently received */
int request_length;
#define RHIZOME_HTTP_REQUEST_MAXLEN 1024
char request[RHIZOME_HTTP_REQUEST_MAXLEN];
/* Nature of the request */
int request_type;
#define RHIZOME_HTTP_REQUEST_RECEIVING -1
#define RHIZOME_HTTP_REQUEST_FROMBUFFER 1
#define RHIZOME_HTTP_REQUEST_FILE 2
#define RHIZOME_HTTP_REQUEST_SUBSCRIBEDGROUPLIST 4
#define RHIZOME_HTTP_REQUEST_ALLGROUPLIST 8
#define RHIZOME_HTTP_REQUEST_BUNDLESINGROUP 16
// manifests are small enough to send from a buffer
// #define RHIZOME_HTTP_REQUEST_BUNDLEMANIFEST 32
// for anything too big, we can just use a blob
#define RHIZOME_HTTP_REQUEST_BLOB 64
#define RHIZOME_HTTP_REQUEST_FAVICON 128
/* Local buffer of data to be sent.
If a RHIZOME_HTTP_REQUEST_FROMBUFFER, then the buffer is sent, and when empty
the request is closed.
Else emptying the buffer triggers a request to fetch more data. Only if no
more data is provided do we then close the request. */
unsigned char *buffer;
int buffer_size; // size
int buffer_length; // number of bytes loaded into buffer
int buffer_offset; // where we are between [0,buffer_length)
/* The source specification data which are used in different ways by different
request types */
char source[1024];
long long source_index;
long long source_count;
int source_record_size;
unsigned int source_flags;
sqlite3_blob *blob;
/* source_index used for offset in blob */
long long blob_end;
} rhizome_http_request;
#define RHIZOME_SERVER_MAX_LIVE_REQUESTS 32
#define RHIZOME_PRIORITY_HIGHEST RHIZOME_PRIORITY_SERVAL_CORE
#define RHIZOME_PRIORITY_SERVAL_CORE 5
#define RHIZOME_PRIORITY_SUBSCRIBED 4
@ -95,6 +45,8 @@ typedef struct rhizome_http_request {
#define RHIZOME_PRIORITY_SERVAL_BULK 1
#define RHIZOME_PRIORITY_NOTINTERESTED 0
#define RHIZOME_IDLE_TIMEOUT 10000
typedef struct rhizome_signature {
unsigned char signature[crypto_sign_edwards25519sha512batch_BYTES
+crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES+1];
@ -240,11 +192,6 @@ int rhizome_add_manifest(rhizome_manifest *m_in,int ttl);
void rhizome_bytes_to_hex_upper(unsigned const char *in, char *out, int byteCount);
int rhizome_find_privatekey(rhizome_manifest *m);
rhizome_signature *rhizome_sign_hash(rhizome_manifest *m, const unsigned char *authorSid);
int rhizome_server_free_http_request(rhizome_http_request *r);
int rhizome_server_close_http_request(int i);
int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r);
int rhizome_server_parse_http_request(int rn,rhizome_http_request *r);
int rhizome_server_simple_http_response(rhizome_http_request *r,int result, char *response);
int sqlite_prepare(sqlite3_stmt **statement, const strbuf stmt);
int sqlite_prepare_loglevel(int log_level, sqlite3_stmt **statement, const strbuf stmt);
int sqlite_exec_void(const char *sqlformat,...);
@ -252,9 +199,6 @@ int sqlite_exec_void_loglevel(int log_level, const char *sqlformat, ...);
int sqlite_exec_void_strbuf_loglevel(int log_level, const strbuf stmt);
int sqlite_exec_int64(long long *result, const char *sqlformat,...);
int sqlite_exec_strbuf(strbuf sb, const char *sqlformat,...);
int rhizome_server_http_response_header(rhizome_http_request *r,int result,
char *mime_type,unsigned long long bytes);
int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r, char *table, char *column);
double rhizome_manifest_get_double(rhizome_manifest *m,char *var,double default_value);
int rhizome_manifest_extract_signature(rhizome_manifest *m,int *ofs);
int rhizome_update_file_priority(const char *fileid);
@ -295,6 +239,4 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m,
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m,
struct sockaddr_in *peerip);
void rhizome_enqueue_suggestions();
void rhizome_fetch_poll(int fd);

View File

@ -26,7 +26,7 @@ extern int sigIoFlag;
typedef struct rhizome_file_fetch_record {
int socket; /* if non-zero this is the socket to read from */
struct sched_ent alarm;
rhizome_manifest *manifest;
char fileid[RHIZOME_FILEHASH_STRLEN + 1];
FILE *file;
@ -39,7 +39,6 @@ typedef struct rhizome_file_fetch_record {
long long file_ofs;
int state;
int last_action;
#define RHIZOME_FETCH_CONNECTING 1
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 2
@ -506,7 +505,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m,
RETURN(0);
}
void rhizome_enqueue_suggestions()
void rhizome_enqueue_suggestions(struct sched_ent *alarm)
{
int i;
for(i=0;i<candidate_count;i++)
@ -528,7 +527,8 @@ void rhizome_enqueue_suggestions()
bcopy(&candidates[i],&candidates[0],bytes);
candidate_count-=i;
}
alarm->alarm = overlay_gettime_ms() + 3000;
schedule(alarm);
return;
}
@ -651,7 +651,7 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
*q=&file_fetch_queue[rhizome_file_fetch_queue_count];
q->manifest = m;
*manifest_kept = 1;
q->socket=sock;
q->alarm.poll.fd=sock;
strncpy(q->fileid, m->fileHexHash, RHIZOME_FILEHASH_STRLEN + 1);
snprintf(q->request,1024,"GET /rhizome/file/%s HTTP/1.0\r\n\r\n",
q->fileid);
@ -660,9 +660,8 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
q->state=RHIZOME_FETCH_CONNECTING;
q->file_len=-1;
q->file_ofs=0;
q->last_action=time(0);
/* XXX Don't forget to implement resume */
#define RHIZOME_IDLE_TIMEOUT 10
/* XXX We should stream file straight into the database */
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL) {
@ -686,8 +685,13 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
return -1;
}
/* Watch for activity on the socket */
fd_watch(q->socket,rhizome_fetch_poll,POLLIN|POLLOUT);
fd_setalarm(rhizome_check_connections,50,500);
q->alarm.function=rhizome_fetch_poll;
q->alarm.stats.name="rhizome_fetch_poll";
q->alarm.poll.events=POLLIN|POLLOUT;
watch(&q->alarm);
/* And schedule a timeout alarm */
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
rhizome_file_fetch_queue_count++;
if (1||debug&DEBUG_RHIZOME) DEBUGF("Queued file for fetching into %s (%d in queue)",
@ -718,21 +722,19 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
return 0;
}
int rhizome_fetch_close(int i){
int rhizome_fetch_close(rhizome_file_fetch_record *q){
/* Free ephemeral data */
if (file_fetch_queue[i].file) fclose(file_fetch_queue[i].file);
file_fetch_queue[i].file=NULL;
if (file_fetch_queue[i].manifest)
rhizome_manifest_free(file_fetch_queue[i].manifest);
file_fetch_queue[i].manifest=NULL;
if (q->file) fclose(q->file);
q->file=NULL;
if (q->manifest)
rhizome_manifest_free(q->manifest);
q->manifest=NULL;
/* close socket and stop watching it */
fd_teardown(file_fetch_queue[i].socket);
/* reshuffle higher numbered slot down if required */
if (i<(rhizome_file_fetch_queue_count-1))
bcopy(&file_fetch_queue[rhizome_file_fetch_queue_count-1],
&file_fetch_queue[i],sizeof(rhizome_file_fetch_record));
unwatch(&q->alarm);
unschedule(&q->alarm);
close(q->alarm.poll.fd);
q->alarm.poll.fd=-1;
/* Reduce count of open connections */
rhizome_file_fetch_queue_count--;
@ -740,14 +742,19 @@ int rhizome_fetch_close(int i){
if (debug&DEBUG_RHIZOME)
DEBUGF("Released rhizome fetch slot (%d used)",
rhizome_file_fetch_queue_count);
return 0;
}
void rhizome_fetch_write(rhizome_file_fetch_record *q, int i){
void rhizome_fetch_write(rhizome_file_fetch_record *q){
int bytes;
bytes=write(q->socket,&q->request[q->request_ofs],
bytes=write(q->alarm.poll.fd,&q->request[q->request_ofs],
q->request_len-q->request_ofs);
if (bytes>0) {
q->last_action=time(0);
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
q->request_ofs+=bytes;
if (q->request_ofs>=q->request_len) {
@ -755,22 +762,31 @@ void rhizome_fetch_write(rhizome_file_fetch_record *q, int i){
*/
q->request_len=0; q->request_ofs=0;
q->state=RHIZOME_FETCH_RXHTTPHEADERS;
fd_watch(q->socket,rhizome_fetch_poll,POLLIN);
q->alarm.poll.events=POLLIN;
watch(&q->alarm);
}else if(q->state==RHIZOME_FETCH_CONNECTING)
q->state = RHIZOME_FETCH_SENDINGHTTPREQUEST;
} else if (errno!=EAGAIN) {
WHY("Got error while sending HTTP request. Closing.");
rhizome_fetch_close(i);
rhizome_fetch_close(q);
}
}
void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
void rhizome_fetch_poll(struct sched_ent *alarm)
{
rhizome_file_fetch_record *q=(rhizome_file_fetch_record *)alarm;
if (alarm->poll.revents==0){
// timeout, close the socket
rhizome_fetch_close(q);
return;
}
switch(q->state)
{
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(q, i);
rhizome_fetch_write(q);
break;
case RHIZOME_FETCH_RXFILE:
/* Keep reading until we have the promised amount of data */
@ -780,25 +796,29 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
errno=0;
char buffer[8192];
int bytes=read(q->socket,buffer,8192);
int bytes=read(q->alarm.poll.fd,buffer,8192);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
q->last_action=time(0);
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
if (bytes>(q->file_len-q->file_ofs))
bytes=q->file_len-q->file_ofs;
if (fwrite(buffer,bytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME) DEBUGF("Failed writing %d bytes to file. @ offset %d",bytes,q->file_ofs);
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
q->file_ofs+=bytes;
} else if (bytes==0) {
WHY("Got zero bytes, assume socket dead.");
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
if (q->file_ofs>=q->file_len)
@ -833,7 +853,7 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
rhizome_manifest_free(q->manifest);
q->manifest=NULL;
}
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
break;
@ -842,14 +862,19 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
sigPipeFlag=0;
errno=0;
bytes=read(q->socket,&q->request[q->request_len],
bytes=read(q->alarm.poll.fd,&q->request[q->request_len],
1024-q->request_len-1);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
int lfcount=0;
int i=q->request_len-160;
q->last_action=time(0);
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
if (i<0) i=0;
q->request_len+=bytes;
if (q->request_len<1024)
@ -881,13 +906,13 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
char *s=strstr(q->request,"HTTP/1.0 ");
if (!s) {
if (debug&DEBUG_RHIZOME) DEBUGF("HTTP response lacked HTTP/1.0 response code.");
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
int http_response_code=strtoll(&s[9],NULL,10);
if (http_response_code!=200) {
if (debug&DEBUG_RHIZOME) DEBUGF("Rhizome web server returned %d != 200 OK",http_response_code);
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
/* Get content length */
@ -895,7 +920,7 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
if (!s) {
if (debug&DEBUG_RHIZOME)
DEBUGF("Missing Content-Length: header.");
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
q->file_len=strtoll(&s[16],NULL,10);
@ -903,7 +928,7 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
if (q->file_len<0) {
if (debug&DEBUG_RHIZOME)
DEBUGF("Illegal file size (%d).",q->file_len);
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
@ -918,7 +943,7 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
if (debug&DEBUG_RHIZOME)
DEBUGF("Failed writing initial %d bytes to file.",
fileRxBytes);
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
q->file_ofs=fileRxBytes;
@ -933,48 +958,15 @@ void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
/* broken pipe, so close connection */
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to sigpipe");
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
break;
default:
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
rhizome_fetch_close(i);
rhizome_fetch_close(q);
return;
}
return;
}
void rhizome_fetch_poll(int fd)
{
int rn;
for(rn=0;rn<rhizome_file_fetch_queue_count;rn++)
{
int bytes;
rhizome_file_fetch_record *q=&file_fetch_queue[rn];
if (q->socket==fd){
rhizome_fetch_handle(q, rn);
return;
}
}
}
int rhizome_check_connections(){
int i;
for(i=rhizome_file_fetch_queue_count-1;i>=0;i--)
{
if (time(0) - file_fetch_queue[i].last_action > RHIZOME_IDLE_TIMEOUT) {
if (debug&DEBUG_RHIZOME)
DEBUG("Closing connection due to inactivity timeout.");
rhizome_fetch_close(i);
continue;
}
}
if (rhizome_file_fetch_queue_count==0)
fd_setalarm(rhizome_check_connections,0,0);
}

View File

@ -25,6 +25,64 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "serval.h"
#include "rhizome.h"
typedef struct rhizome_http_request {
struct sched_ent alarm;
long long initiate_time; /* time connection was initiated */
/* The HTTP request as currently received */
int request_length;
#define RHIZOME_HTTP_REQUEST_MAXLEN 1024
char request[RHIZOME_HTTP_REQUEST_MAXLEN];
/* Nature of the request */
int request_type;
#define RHIZOME_HTTP_REQUEST_RECEIVING -1
#define RHIZOME_HTTP_REQUEST_FROMBUFFER 1
#define RHIZOME_HTTP_REQUEST_FILE 2
#define RHIZOME_HTTP_REQUEST_SUBSCRIBEDGROUPLIST 4
#define RHIZOME_HTTP_REQUEST_ALLGROUPLIST 8
#define RHIZOME_HTTP_REQUEST_BUNDLESINGROUP 16
// manifests are small enough to send from a buffer
// #define RHIZOME_HTTP_REQUEST_BUNDLEMANIFEST 32
// for anything too big, we can just use a blob
#define RHIZOME_HTTP_REQUEST_BLOB 64
#define RHIZOME_HTTP_REQUEST_FAVICON 128
/* Local buffer of data to be sent.
If a RHIZOME_HTTP_REQUEST_FROMBUFFER, then the buffer is sent, and when empty
the request is closed.
Else emptying the buffer triggers a request to fetch more data. Only if no
more data is provided do we then close the request. */
unsigned char *buffer;
int buffer_size; // size
int buffer_length; // number of bytes loaded into buffer
int buffer_offset; // where we are between [0,buffer_length)
/* The source specification data which are used in different ways by different
request types */
char source[1024];
long long source_index;
long long source_count;
int source_record_size;
unsigned int source_flags;
sqlite3_blob *blob;
/* source_index used for offset in blob */
long long blob_end;
} rhizome_http_request;
int rhizome_server_free_http_request(rhizome_http_request *r);
int rhizome_server_http_send_bytes(rhizome_http_request *r);
int rhizome_server_parse_http_request(rhizome_http_request *r);
int rhizome_server_simple_http_response(rhizome_http_request *r,int result, char *response);
int rhizome_server_http_response_header(rhizome_http_request *r,int result,
char *mime_type,unsigned long long bytes);
int rhizome_server_sql_query_fill_buffer(rhizome_http_request *r, char *table, char *column);
#define RHIZOME_SERVER_MAX_LIVE_REQUESTS 32
/*
HTTP server and client code for rhizome transfers.
@ -32,9 +90,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
int rhizome_server_socket=-1;
rhizome_http_request *rhizome_live_http_requests[RHIZOME_SERVER_MAX_LIVE_REQUESTS];
int rhizome_server_live_request_count=0;
// Format icon data using:
// od -vt u1 ~/Downloads/favicon.ico | cut -c9- | sed 's/ */,/g'
unsigned char favicon_bytes[]={
@ -60,6 +115,8 @@ unsigned char favicon_bytes[]={
,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
int favicon_len=318;
struct sched_ent server_alarm;
int rhizome_server_start()
{
if (rhizome_server_socket>-1) return 0;
@ -89,8 +146,8 @@ int rhizome_server_start()
if (bind(rhizome_server_socket, (struct sockaddr *) &address,
sizeof(address)) < 0)
{
fd_teardown(rhizome_server_socket);
rhizome_server_socket=-1000;
close(rhizome_server_socket);
rhizome_server_socket=-1;
if (debug&DEBUG_RHIZOME) WHY("bind() failed starting rhizome http server");
return -1;
}
@ -99,95 +156,99 @@ int rhizome_server_start()
if (rc < 0)
{
perror("ioctl() failed");
fd_teardown(rhizome_server_socket);
close(rhizome_server_socket);
rhizome_server_socket=-1;
exit(-1);
}
if (listen(rhizome_server_socket,20))
{
fd_teardown(rhizome_server_socket);
close(rhizome_server_socket);
rhizome_server_socket=-1;
return WHY("listen() failed starting rhizome http server");
}
/* Add Rhizome HTTPd server to list of file descriptors to watch */
fd_watch(rhizome_server_socket,rhizome_server_poll,POLLIN);
server_alarm.function = rhizome_server_poll;
server_alarm.stats.name="rhizome_server_poll";
server_alarm.poll.fd = rhizome_server_socket;
server_alarm.poll.events = POLLIN;
watch(&server_alarm);
return 0;
}
void rhizome_client_poll(int fd)
void rhizome_client_poll(struct sched_ent *alarm)
{
int rn;
for(rn=0;rn<rhizome_server_live_request_count;rn++)
{
rhizome_http_request *r=rhizome_live_http_requests[rn];
if (r->socket!=fd) continue;
switch(r->request_type)
{
case RHIZOME_HTTP_REQUEST_RECEIVING:
/* Keep reading until we have two CR/LFs in a row */
r->request[r->request_length]=0;
sigPipeFlag=0;
errno=0;
int bytes=read(r->socket,&r->request[r->request_length],
RHIZOME_HTTP_REQUEST_MAXLEN-r->request_length-1);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
int i=r->request_length-160;
int lfcount=0;
if (i<0) i=0;
r->request_length+=bytes;
if (r->request_length<RHIZOME_HTTP_REQUEST_MAXLEN)
r->request[r->request_length]=0;
if (0)
dump("request",(unsigned char *)r->request,r->request_length);
for(;i<(r->request_length+bytes);i++)
{
switch(r->request[i]) {
case '\n': lfcount++; break;
case '\r': /* ignore CR */ break;
case 0: /* ignore NUL (telnet inserts them) */ break;
default: lfcount=0; break;
}
if (lfcount==2) break;
}
if (lfcount==2) {
/* We have the request. Now parse it to see if we can respond to it */
rhizome_server_parse_http_request(rn,r);
}
r->request_length+=bytes;
}
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
WHY("Closing connection due to sigpipe");
rhizome_server_close_http_request(rn);
continue;
}
break;
default:
/* Socket already has request -- so just try to send some data. */
rhizome_server_http_send_bytes(rn,r);
break;
}
/* We have processed the connection that has activity, so we can return
immediately */
return;
}
rhizome_http_request *r=(rhizome_http_request *)alarm;
if (alarm->poll.revents==0){
rhizome_server_free_http_request(r);
return;
}
switch(r->request_type)
{
case RHIZOME_HTTP_REQUEST_RECEIVING:
/* Keep reading until we have two CR/LFs in a row */
r->request[r->request_length]=0;
sigPipeFlag=0;
errno=0;
int bytes=read(r->alarm.poll.fd,&r->request[r->request_length],
RHIZOME_HTTP_REQUEST_MAXLEN-r->request_length-1);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
// reset inactivity timer
r->alarm.alarm = overlay_gettime_ms()+RHIZOME_IDLE_TIMEOUT;
unschedule(&r->alarm);
schedule(&r->alarm);
int i=r->request_length-160;
int lfcount=0;
if (i<0) i=0;
r->request_length+=bytes;
if (r->request_length<RHIZOME_HTTP_REQUEST_MAXLEN)
r->request[r->request_length]=0;
if (0)
dump("request",(unsigned char *)r->request,r->request_length);
for(;i<(r->request_length+bytes);i++)
{
switch(r->request[i]) {
case '\n': lfcount++; break;
case '\r': /* ignore CR */ break;
case 0: /* ignore NUL (telnet inserts them) */ break;
default: lfcount=0; break;
}
if (lfcount==2) break;
}
if (lfcount==2) {
/* We have the request. Now parse it to see if we can respond to it */
rhizome_server_parse_http_request(r);
}
r->request_length+=bytes;
}
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
WHY("Closing connection due to sigpipe");
rhizome_server_free_http_request(r);
return;
}
break;
default:
/* Socket already has request -- so just try to send some data. */
rhizome_server_http_send_bytes(r);
break;
}
return;
}
void rhizome_server_poll(int ignored_file_descriptor)
void rhizome_server_poll(struct sched_ent *alarm)
{
struct sockaddr addr;
unsigned int addr_len=0;
@ -195,36 +256,30 @@ void rhizome_server_poll(int ignored_file_descriptor)
/* Deal with any new requests */
while ((rhizome_server_live_request_count<RHIZOME_SERVER_MAX_LIVE_REQUESTS)
&&((sock=accept(rhizome_server_socket,&addr,&addr_len))>-1))
while ((sock=accept(rhizome_server_socket,&addr,&addr_len))>-1)
{
rhizome_http_request *request = calloc(sizeof(rhizome_http_request),1);
request->socket=sock;
/* We are now trying to read the HTTP request */
request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING;
rhizome_live_http_requests[rhizome_server_live_request_count++]=request;
/* Watch for input */
fd_watch(request->socket,rhizome_client_poll,POLLIN);
request->alarm.function = rhizome_client_poll;
request->alarm.stats.name="rhizome_client_poll";
request->alarm.poll.fd=sock;
request->alarm.poll.events=POLLIN;
request->alarm.alarm = overlay_gettime_ms()+RHIZOME_IDLE_TIMEOUT;
// watch for the incoming http request
watch(&request->alarm);
// set an inactivity timeout to close the connection
schedule(&request->alarm);
}
}
int rhizome_server_close_http_request(int i)
{
fd_teardown(rhizome_live_http_requests[i]->socket);
rhizome_server_free_http_request(rhizome_live_http_requests[i]);
/* Make it null, so that if we are the list in the list, the following
assignment still yields the correct behaviour */
rhizome_live_http_requests[i]=NULL;
rhizome_live_http_requests[i]=
rhizome_live_http_requests[rhizome_server_live_request_count-1];
rhizome_server_live_request_count--;
return 0;
}
int rhizome_server_free_http_request(rhizome_http_request *r)
{
unwatch(&r->alarm);
unschedule(&r->alarm);
close(r->alarm.poll.fd);
if (r->buffer&&r->buffer_size) free(r->buffer);
if (r->blob) sqlite3_blob_close(r->blob);
free(r);
@ -240,7 +295,7 @@ void hexFilter(char *s)
*t = '\0';
}
int rhizome_server_sql_query_http_response(int rn,rhizome_http_request *r,
int rhizome_server_sql_query_http_response(rhizome_http_request *r,
char *column,char *table,char *query_body,
int bytes_per_row,int dehexP)
{
@ -298,10 +353,10 @@ int rhizome_server_sql_query_http_response(int rn,rhizome_http_request *r,
DEBUGF("buffer_length=%d",r->buffer_length);
/* Populate spare space in buffer with rows of data */
return rhizome_server_sql_query_fill_buffer(rn, r, table, column);
return rhizome_server_sql_query_fill_buffer(r, table, column);
}
int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r, char *table, char *column)
int rhizome_server_sql_query_fill_buffer(rhizome_http_request *r, char *table, char *column)
{
unsigned char blob_value[r->source_record_size*2+1];
@ -399,12 +454,13 @@ int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r, char *t
}
int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
int rhizome_server_parse_http_request(rhizome_http_request *r)
{
char id[1024];
/* Switching to writing, so update the call-back */
fd_watch(r->socket,rhizome_client_poll,POLLOUT);
r->alarm.poll.events=POLLOUT;
watch(&r->alarm);
/* Clear request type flags */
r->request_type=0;
@ -422,21 +478,21 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
{
/* Return the list of known groups */
WHYF("get /rhizome/groups (list of groups)");
rhizome_server_sql_query_http_response(rn,r,"id","groups","from groups",32,1);
rhizome_server_sql_query_http_response(r,"id","groups","from groups",32,1);
}
else if (!strncasecmp(r->request,"GET /rhizome/files HTTP/1.",
strlen("GET /rhizome/files HTTP/1.")))
{
/* Return the list of known files */
WHYF("get /rhizome/files (list of files)");
rhizome_server_sql_query_http_response(rn,r,"id","files","from files",32,1);
rhizome_server_sql_query_http_response(r,"id","files","from files",32,1);
}
else if (!strncasecmp(r->request,"GET /rhizome/bars HTTP/1.",
strlen("GET /rhizome/bars HTTP/1.")))
{
/* Return the list of known files */
WHYF("get /rhizome/bars (list of BARs)");
rhizome_server_sql_query_http_response(rn,r,"bar","manifests","from manifests",32,0);
rhizome_server_sql_query_http_response(r,"bar","manifests","from manifests",32,0);
}
else if (sscanf(r->request,"GET /rhizome/file/%s HTTP/1.", id)==1)
{
@ -485,7 +541,7 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
rhizome_server_simple_http_response(r,400,"<html><h1>Sorry, your request was too long.</h1></html>\r\n");
/* Try sending data immediately. */
rhizome_server_http_send_bytes(rn,r);
rhizome_server_http_send_bytes(r);
return 0;
}
@ -509,10 +565,10 @@ char *httpResultString(int id) {
int rhizome_server_simple_http_response(rhizome_http_request *r,int result, char *response)
{
if (r->buffer) free(r->buffer);
r->buffer_size=strlen(response)+strlen("HTTP/1.0 000 \r\n\r\n")+strlen(httpResultString(A_VALUE_GREATER_THAN_FOUR))+100;
r->buffer_size=strlen(response)+strlen("HTTP/1.0 000 \r\n\r\nContent-type: text/html\r\nContent-length: 0000\r\n\r\n")+strlen(httpResultString(result))+strlen(response)+100;
r->buffer=(unsigned char *)malloc(r->buffer_size);
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d %s\r\nContent-type: text/html\r\nContent-length: %lld\r\n\r\n%s",result,httpResultString(result),(int)strlen(response),response);
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d %s\r\nContent-type: text/html\r\nContent-length: %d\r\n\r\n%s",result,httpResultString(result),(int)strlen(response),response);
r->buffer_size=strlen((char *)r->buffer)+1;
r->buffer_length=r->buffer_size-1;
@ -528,10 +584,8 @@ int rhizome_server_simple_http_response(rhizome_http_request *r,int result, char
0: connection finished.
<0: an error occurred.
*/
int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r)
int rhizome_server_http_send_bytes(rhizome_http_request *r)
{
if (debug&DEBUG_RHIZOME) WHYF("Request #%d, type=0x%x\n",rn,r->request_type);
// keep writing until the write would block or we run out of data
while(r->request_type){
@ -540,7 +594,7 @@ int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r)
if (r->request_type&RHIZOME_HTTP_REQUEST_FROMBUFFER)
{
int bytes=r->buffer_length-r->buffer_offset;
bytes=write(r->socket,&r->buffer[r->buffer_offset],bytes);
bytes=write(r->alarm.poll.fd,&r->buffer[r->buffer_offset],bytes);
if (bytes<=0){
// stop writing when the tcp buffer is full
// TODO errors?
@ -551,6 +605,11 @@ int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r)
dump("bytes written",&r->buffer[r->buffer_offset],bytes);
r->buffer_offset+=bytes;
// reset inactivity timer
r->alarm.alarm = overlay_gettime_ms()+RHIZOME_IDLE_TIMEOUT;
unschedule(&r->alarm);
schedule(&r->alarm);
if (r->buffer_offset>=r->buffer_length) {
/* Buffer's cleared */
r->request_type&=~RHIZOME_HTTP_REQUEST_FROMBUFFER;
@ -626,7 +685,7 @@ int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r)
break;
}
}
if (!r->request_type) return rhizome_server_close_http_request(rn);
if (!r->request_type) return rhizome_server_free_http_request(r);
return 1;
}

626
serval.h
View File

@ -114,15 +114,6 @@ struct in_addr {
/* bzero(3) is deprecated in favour of memset(3). */
#define bzero(addr,len) memset((addr), 0, (len))
/* @PGS/20120615 */
#ifdef DO_TIMING_CHECKS
#define TIMING_CHECK() _TIMING_CHECK(__FILE__,__FUNCTION__,__LINE__)
void _TIMING_CHECK(const char *file,const char *func,int line);
void TIMING_PAUSE();
#else
#define TIMING_CHECK()
#endif
/* UDP Port numbers for various Serval services.
The overlay mesh works over DNA */
#define PORT_DNA 4110
@ -150,6 +141,7 @@ void TIMING_PAUSE();
/* Limit packet payloads to minimise packet loss of big packets in mesh networks */
#define MAX_DATA_BYTES 256
extern int debug;
extern int dnatimeout;
extern int hlr_size;
@ -401,6 +393,284 @@ extern struct mphlr_variable vars[];
extern int sock;
#define OVERLAY_MAX_INTERFACES 16
typedef struct overlay_address_table {
unsigned char epoch;
char sids[256][SID_SIZE];
/* 0x00 = not set, which thus limits us to using only 255 (0x01-0xff) of the indexes for
storing addresses.
By spending an extra 256 bytes we reduce, but not eliminate the problem of collisions.
Will think about a complete solution later.
*/
unsigned char byfirstbyte[256][2];
/* next free entry in sid[] */
unsigned char next_free;
} overlay_address_table;
typedef struct sid {
unsigned char b[SID_SIZE];
} sid;
typedef struct overlay_address_cache {
int size;
int shift; /* Used to calculat lookup function, which is (b[0].b[1].b[2]>>shift) */
sid *sids; /* one entry per bucket, to keep things simple. */
/* XXX Should have a means of changing the hash function so that naughty people can't try
to force our cache to flush with duplicate addresses?
But we must use only the first 24 bits of the address due to abbreviation policies,
so our options are limited.
For now the hash will be the first k bits.
*/
} overlay_address_cache;
extern sid overlay_abbreviate_current_sender;
typedef struct overlay_frame {
struct overlay_frame *prev;
struct overlay_frame *next;
unsigned int type;
unsigned int modifiers;
unsigned char ttl;
unsigned char dequeue;
/* Mark which interfaces the frame has been sent on,
so that we can ensure that broadcast frames get sent
exactly once on each interface */
int isBroadcast;
unsigned char broadcast_sent_via[OVERLAY_MAX_INTERFACES];
unsigned char nexthop[32];
int nexthop_address_status;
int nexthop_interface; /* which interface the next hop should be attempted on */
unsigned char destination[32];
int destination_address_status;
unsigned char source[32];
int source_address_status;
/* IPv4 node frame was received from (if applicable) */
struct sockaddr *recvaddr;
/* Frame content from destination address onwards */
int bytecount;
unsigned char *bytes;
/* Actual payload */
struct overlay_buffer *payload;
int rfs; /* remainder of frame size */
long long enqueued_at;
} overlay_frame;
#define CRYPT_CIPHERED 1
#define CRYPT_SIGNED 2
#define CRYPT_PUBLIC 4
struct call_stats{
long long enter_time;
long long child_time;
struct call_stats *prev;
};
struct callback_stats {
struct callback_stats *_next;
int _initialised;
const char *name;
long long max_time;
long long total_time;
int calls;
};
struct sched_ent;
typedef void (*ALARM_FUNCP) (struct sched_ent *alarm);
struct sched_ent{
struct sched_ent *_next;
struct sched_ent *_prev;
ALARM_FUNCP function;
void *context;
struct pollfd poll;
long long alarm;
struct callback_stats stats;
int _poll_index;
};
extern int overlayMode;
#define OVERLAY_INTERFACE_UNKNOWN 0
#define OVERLAY_INTERFACE_ETHERNET 1
#define OVERLAY_INTERFACE_WIFI 2
#define OVERLAY_INTERFACE_PACKETRADIO 3
typedef struct overlay_interface {
struct sched_ent alarm;
char name[80];
int offset;
int fileP;
int bits_per_second;
int port;
int type;
/* Number of milli-seconds per tick for this interface, which is basically related to the
the typical TX range divided by the maximum expected speed of nodes in the network.
This means that short-range communications has a higher bandwidth requirement than
long-range communications because the tick interval has to be shorter to still allow
fast-convergence time to allow for mobility.
For wifi (nominal range 100m) it is usually 500ms.
For ~100K ISM915MHz (nominal range 1000m) it will probably be about 5000ms.
For ~10K ISM915MHz (nominal range ~3000m) it will probably be about 15000ms.
These figures will be refined over time, and we will allow people to set them per-interface.
*/
int tick_ms; /* milliseconds per tick */
/* The time of the last tick on this interface in milli seconds */
long long last_tick_ms;
/* How many times have we abbreviated our address since we last announced it in full? */
int ticks_since_sent_full_address;
/* sequence number of last packet sent on this interface.
Used to allow NACKs that can request retransmission of recent packets.
*/
int sequence_number;
/* XXX need recent packet buffers to support the above */
/* Broadcast address and netmask, if known
We really only case about distinct broadcast addresses on interfaces.
Also simplifies aliases on interfaces. */
struct sockaddr_in broadcast_address;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX on this interface.
For radio links the actual maximum and the maximum that is likely to be delivered reliably are
potentially two quite different values. */
int mtu;
/* If the interface still exists on the local machine.
If not, it we keep track of it for a few seconds before purging it, incase of flapping, e.g.,
due to DHCP renewal */
int observed;
} overlay_interface;
/* Maximum interface count is rather arbitrary.
Memory consumption is O(n) with respect to this parameter, so let's not make it too big for now.
*/
extern overlay_interface overlay_interfaces[OVERLAY_MAX_INTERFACES];
extern int overlay_last_interface_number; // used to remember where a packet came from
extern unsigned int overlay_sequence_number;
/*
For each peer we need to keep track of the routes that we know to reach it.
We want to use static sized data structures as much as we can to keep things efficient by
allowing computed memory address lookups instead of following linked lists and other
non-deterministic means.
The tricky part of doing all this is that each interface may have a different maximum number
of peers based on the bandwidth of the link, as we do not want mesh traffic to consume all
available bandwidth. In particular, we need to reserve at least enough bandwidth for one
call.
Related to this, if we are in a mesh larger than the per-interface limit allows, then we need to
only track the highest-scoring peers. This sounds simple, but how to we tell when to replace a
low-scoring peer with another one which has a better reachability score, if we are not tracking
the reachability score of that node?
The answer to this is that we track as many nodes as we can, but only announce the highest
scoring nodes on each interface as bandwidth allows.
This also keeps our memory usage fixed.
XXX - At present we are setting OVERLAY_MAX_PEERS at compile time.
With a bit of work we can change this to be a run-time option.
Memory consumption of OVERLAY_MAX_PEERS=n is O(n^2).
XXX We could and should improve this down the track by only monitoring the top k routes, and replacing the worst route
option when a better one comes along. This would get the memory usage down to O(n).
*/
#define OVERLAY_MAX_PEERS 500
typedef struct overlay_peer {
unsigned char address[SIDDIDFIELD_LEN];
/* Scores and score update times for reaching this node via various interfaces */
int known_routes[OVERLAY_MAX_INTERFACES];
unsigned short scores[OVERLAY_MAX_INTERFACES][OVERLAY_MAX_PEERS];
/* last_regeneration is the time that this peer was created/replaced with another peer.
lastupdate[] indicates the time that another peer's reachability report
caused us to update our score to reach via that peer.
If lastupdate[x][y] is older than last_regeneration[y], then we must
ignore the entry, because the lastupdate[x][y] entry references a previous
generation of that peer, i.e., not to the peer we think it does.
This slight convolution allows us to replace peers without having to touch the
records of every other peer in our list.
*/
int last_regeneration;
unsigned int lastupdate[OVERLAY_MAX_INTERFACES][OVERLAY_MAX_PEERS];
} overlay_peer;
extern overlay_peer overlay_peers[OVERLAY_MAX_PEERS];
typedef struct overlay_buffer {
unsigned char *bytes;
int length;
int allocSize;
int checkpointLength;
int sizeLimit;
int var_length_offset;
int var_length_bytes;
} overlay_buffer;
int ob_unlimitsize(overlay_buffer *b);
typedef struct overlay_txqueue {
struct overlay_frame *first;
struct overlay_frame *last;
int length; /* # frames in queue */
int maxLength; /* max # frames in queue before we consider ourselves congested */
/* Latency target in ms for this traffic class.
Frames older than the latency target will get dropped. */
int latencyTarget;
/* XXX Need to initialise these:
Real-time queue for voice (<200ms ?)
Real-time queue for video (<200ms ?) (lower priority than voice)
Ordinary service queue (<3 sec ?)
Rhizome opportunistic queue (infinity)
(Mesh management doesn't need a queue, as each overlay packet is tagged with some mesh management information)
*/
} overlay_txqueue;
#define OQ_ISOCHRONOUS_VOICE 0
#define OQ_MESH_MANAGEMENT 1
#define OQ_ISOCHRONOUS_VIDEO 2
#define OQ_ORDINARY 3
#define OQ_OPPORTUNISTIC 4
#define OQ_MAX 5
extern overlay_txqueue overlay_tx[OQ_MAX];
#define LOG_LEVEL_SILENT (-1)
#define LOG_LEVEL_DEBUG (0)
#define LOG_LEVEL_INFO (1)
#define LOG_LEVEL_WARN (2)
#define LOG_LEVEL_ERROR (3)
#define LOG_LEVEL_FATAL (4)
const char *confValueGet(const char *var, const char *defaultValue);
int confValueGetBoolean(const char *var, int defaultValue);
void confSetDebugFlags();
@ -430,7 +700,6 @@ long long gettime_ms();
int server_pid();
void server_save_argv(int argc, const char *const *argv);
int server(char *backing_file);
void server_shutdown_check();
int server_create_stopfile();
int server_remove_stopfile();
int server_check_stopfile();
@ -438,7 +707,7 @@ void serverCleanUp();
int isTransactionInCache(unsigned char *transaction_id);
void insertTransactionInCache(unsigned char *transaction_id);
int packetOk(int interface,unsigned char *packet,int len,
int packetOk(struct overlay_interface *interface,unsigned char *packet,int len,
unsigned char *transaction_id, int recvttl,
struct sockaddr *recvaddr,int recvaddrlen,int parseP);
int process_packet(unsigned char *packet,int len,
@ -499,7 +768,7 @@ int runCommand(char *cmd);
int asteriskObtainGateway(char *requestor_sid,char *did,char *uri_out);
int packetOkDNA(unsigned char *packet,int len,unsigned char *transaction_id,
int recvttl,struct sockaddr *recvaddr,int recvaddrlen,int parseP);
int packetOkOverlay(int interface,unsigned char *packet,int len,
int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet,int len,
unsigned char *transaction_id,int recvttl,
struct sockaddr *recvaddr,int recvaddrlen,int parseP);
int prepareGateway(char *gatewayspec);
@ -508,261 +777,8 @@ int packetSendRequest(int method,unsigned char *packet,int packet_len,int batchP
struct response_set *responses);
int readArpTable(struct in_addr peers[],int *peer_count,int peer_max);
#define OVERLAY_MAX_INTERFACES 16
typedef struct overlay_address_table {
unsigned char epoch;
char sids[256][SID_SIZE];
/* 0x00 = not set, which thus limits us to using only 255 (0x01-0xff) of the indexes for
storing addresses.
By spending an extra 256 bytes we reduce, but not eliminate the problem of collisions.
Will think about a complete solution later.
*/
unsigned char byfirstbyte[256][2];
/* next free entry in sid[] */
unsigned char next_free;
} overlay_address_table;
typedef struct sid {
unsigned char b[SID_SIZE];
} sid;
typedef struct overlay_address_cache {
int size;
int shift; /* Used to calculat lookup function, which is (b[0].b[1].b[2]>>shift) */
sid *sids; /* one entry per bucket, to keep things simple. */
/* XXX Should have a means of changing the hash function so that naughty people can't try
to force our cache to flush with duplicate addresses?
But we must use only the first 24 bits of the address due to abbreviation policies,
so our options are limited.
For now the hash will be the first k bits.
*/
} overlay_address_cache;
extern sid overlay_abbreviate_current_sender;
typedef struct overlay_frame {
struct overlay_frame *prev;
struct overlay_frame *next;
unsigned int type;
unsigned int modifiers;
unsigned char ttl;
unsigned char dequeue;
/* Mark which interfaces the frame has been sent on,
so that we can ensure that broadcast frames get sent
exactly once on each interface */
int isBroadcast;
unsigned char broadcast_sent_via[OVERLAY_MAX_INTERFACES];
unsigned char nexthop[32];
int nexthop_address_status;
int nexthop_interface; /* which interface the next hop should be attempted on */
unsigned char destination[32];
int destination_address_status;
unsigned char source[32];
int source_address_status;
/* IPv4 node frame was received from (if applicable) */
struct sockaddr *recvaddr;
/* Frame content from destination address onwards */
int bytecount;
unsigned char *bytes;
/* Actual payload */
struct overlay_buffer *payload;
int rfs; /* remainder of frame size */
long long enqueued_at;
} overlay_frame;
int overlay_frame_process(int interface,overlay_frame *f);
int overlay_frame_resolve_addresses(int interface,overlay_frame *f);
#define CRYPT_CIPHERED 1
#define CRYPT_SIGNED 2
#define CRYPT_PUBLIC 4
extern int overlayMode;
#define OVERLAY_INTERFACE_UNKNOWN 0
#define OVERLAY_INTERFACE_ETHERNET 1
#define OVERLAY_INTERFACE_WIFI 2
#define OVERLAY_INTERFACE_PACKETRADIO 3
typedef struct overlay_interface {
char name[80];
int fd;
int offset;
int fileP;
int bits_per_second;
int port;
int type;
/* Number of milli-seconds per tick for this interface, which is basically related to the
the typical TX range divided by the maximum expected speed of nodes in the network.
This means that short-range communications has a higher bandwidth requirement than
long-range communications because the tick interval has to be shorter to still allow
fast-convergence time to allow for mobility.
For wifi (nominal range 100m) it is usually 500ms.
For ~100K ISM915MHz (nominal range 1000m) it will probably be about 5000ms.
For ~10K ISM915MHz (nominal range ~3000m) it will probably be about 15000ms.
These figures will be refined over time, and we will allow people to set them per-interface.
*/
int tick_ms; /* milliseconds per tick */
/* The time of the last tick on this interface in milli seconds */
long long last_tick_ms;
/* How many times have we abbreviated our address since we last announced it in full? */
int ticks_since_sent_full_address;
/* sequence number of last packet sent on this interface.
Used to allow NACKs that can request retransmission of recent packets.
*/
int sequence_number;
/* XXX need recent packet buffers to support the above */
/* Broadcast address and netmask, if known
We really only case about distinct broadcast addresses on interfaces.
Also simplifies aliases on interfaces. */
struct sockaddr_in broadcast_address;
/* Not necessarily the real MTU, but the largest frame size we are willing to TX on this interface.
For radio links the actual maximum and the maximum that is likely to be delivered reliably are
potentially two quite different values. */
int mtu;
/* If the interface still exists on the local machine.
If not, it we keep track of it for a few seconds before purging it, incase of flapping, e.g.,
due to DHCP renewal */
int observed;
} overlay_interface;
/* Maximum interface count is rather arbitrary.
Memory consumption is O(n) with respect to this parameter, so let's not make it too big for now.
*/
extern overlay_interface overlay_interfaces[OVERLAY_MAX_INTERFACES];
extern int overlay_last_interface_number; // used to remember where a packet came from
extern unsigned int overlay_sequence_number;
/* Has someone sent us an abbreviation of an unknown type recently? If so remind them
that we don't accept these.
XXX - This method assumes bidirectional links. We should consider sending direct
to the perpetuator. We will deal with that in time, the main thing is that we have
a message type that can be used for the purpose.
*/
extern int overlay_interface_repeat_abbreviation_policy[OVERLAY_MAX_INTERFACES];
/*
For each peer we need to keep track of the routes that we know to reach it.
We want to use static sized data structures as much as we can to keep things efficient by
allowing computed memory address lookups instead of following linked lists and other
non-deterministic means.
The tricky part of doing all this is that each interface may have a different maximum number
of peers based on the bandwidth of the link, as we do not want mesh traffic to consume all
available bandwidth. In particular, we need to reserve at least enough bandwidth for one
call.
Related to this, if we are in a mesh larger than the per-interface limit allows, then we need to
only track the highest-scoring peers. This sounds simple, but how to we tell when to replace a
low-scoring peer with another one which has a better reachability score, if we are not tracking
the reachability score of that node?
The answer to this is that we track as many nodes as we can, but only announce the highest
scoring nodes on each interface as bandwidth allows.
This also keeps our memory usage fixed.
XXX - At present we are setting OVERLAY_MAX_PEERS at compile time.
With a bit of work we can change this to be a run-time option.
Memory consumption of OVERLAY_MAX_PEERS=n is O(n^2).
XXX We could and should improve this down the track by only monitoring the top k routes, and replacing the worst route
option when a better one comes along. This would get the memory usage down to O(n).
*/
#define OVERLAY_MAX_PEERS 500
typedef struct overlay_peer {
unsigned char address[SIDDIDFIELD_LEN];
/* Scores and score update times for reaching this node via various interfaces */
int known_routes[OVERLAY_MAX_INTERFACES];
unsigned short scores[OVERLAY_MAX_INTERFACES][OVERLAY_MAX_PEERS];
/* last_regeneration is the time that this peer was created/replaced with another peer.
lastupdate[] indicates the time that another peer's reachability report
caused us to update our score to reach via that peer.
If lastupdate[x][y] is older than last_regeneration[y], then we must
ignore the entry, because the lastupdate[x][y] entry references a previous
generation of that peer, i.e., not to the peer we think it does.
This slight convolution allows us to replace peers without having to touch the
records of every other peer in our list.
*/
int last_regeneration;
unsigned int lastupdate[OVERLAY_MAX_INTERFACES][OVERLAY_MAX_PEERS];
} overlay_peer;
extern overlay_peer overlay_peers[OVERLAY_MAX_PEERS];
typedef struct overlay_buffer {
unsigned char *bytes;
int length;
int allocSize;
int checkpointLength;
int sizeLimit;
int var_length_offset;
int var_length_bytes;
} overlay_buffer;
int ob_unlimitsize(overlay_buffer *b);
typedef struct overlay_txqueue {
struct overlay_frame *first;
struct overlay_frame *last;
int length; /* # frames in queue */
int maxLength; /* max # frames in queue before we consider ourselves congested */
/* Latency target in ms for this traffic class.
Frames older than the latency target will get dropped. */
int latencyTarget;
/* XXX Need to initialise these:
Real-time queue for voice (<200ms ?)
Real-time queue for video (<200ms ?) (lower priority than voice)
Ordinary service queue (<3 sec ?)
Rhizome opportunistic queue (infinity)
(Mesh management doesn't need a queue, as each overlay packet is tagged with some mesh management information)
*/
} overlay_txqueue;
#define OQ_ISOCHRONOUS_VOICE 0
#define OQ_MESH_MANAGEMENT 1
#define OQ_ISOCHRONOUS_VIDEO 2
#define OQ_ORDINARY 3
#define OQ_OPPORTUNISTIC 4
#define OQ_MAX 5
extern overlay_txqueue overlay_tx[OQ_MAX];
#define LOG_LEVEL_SILENT (-1)
#define LOG_LEVEL_DEBUG (0)
#define LOG_LEVEL_INFO (1)
#define LOG_LEVEL_WARN (2)
#define LOG_LEVEL_ERROR (3)
#define LOG_LEVEL_FATAL (4)
int overlay_frame_process(struct overlay_interface *interface,overlay_frame *f);
int overlay_frame_resolve_addresses(overlay_frame *f);
int start_logging();
void logMessage(int level, const char *file, unsigned int line, const char *function, const char *fmt, ...);
@ -823,10 +839,9 @@ long long parse_quantity(char *q);
int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockaddr_in broadcast,
int speed_in_bits,int port,int type);
int overlay_interface_init_socket(int i,struct sockaddr_in src_addr,struct sockaddr_in broadcast);
void overlay_interface_discover();
long long overlay_time_until_next_tick();
int overlay_rx_messages();
void overlay_check_ticks();
int overlay_add_selfannouncement();
int overlay_frame_package_fmt1(overlay_frame *p,overlay_buffer *b);
int overlay_interface_args(const char *arg);
@ -896,7 +911,7 @@ extern unsigned char *overlay_local_identities[OVERLAY_MAX_LOCAL_IDENTITIES];
int overlay_abbreviate_address(unsigned char *in,unsigned char *out,int *ofs);
int overlay_abbreviate_append_address(overlay_buffer *b,unsigned char *a);
int overlay_abbreviate_expand_address(int interface,unsigned char *in,int *inofs,unsigned char *out,int *ofs);
int overlay_abbreviate_expand_address(unsigned char *in,int *inofs,unsigned char *out,int *ofs);
int overlay_abbreviate_cache_address(unsigned char *sid);
int overlay_abbreviate_cache_lookup(unsigned char *in,unsigned char *out,int *ofs,
int prefix_bytes,int index_bytes);
@ -1008,10 +1023,10 @@ extern overlay_neighbour *overlay_neighbours;
long long overlay_gettime_ms();
int overlay_route_init(int mb_ram);
int overlay_route_saw_selfannounce_ack(int interface,overlay_frame *f,long long now);
int overlay_route_saw_selfannounce_ack(overlay_frame *f,long long now);
int overlay_route_recalc_node_metrics(overlay_node *n,long long now);
int overlay_route_recalc_neighbour_metrics(overlay_neighbour *n,long long now);
int overlay_route_saw_selfannounce(int interface,overlay_frame *f,long long now);
int overlay_route_saw_selfannounce(overlay_frame *f,long long now);
overlay_node *overlay_route_find_node(unsigned char *sid,int prefixLen,int createP);
unsigned int overlay_route_hash_sid(unsigned char *sid);
int overlay_route_init(int mb_ram);
@ -1036,10 +1051,9 @@ int overlay_route_record_link(long long now,unsigned char *to,
unsigned char *via,int sender_interface,
unsigned int s1,unsigned int s2,int score,int gateways_en_route);
int overlay_route_dump();
void overlay_route_tick();
int overlay_route_tick_neighbour(int neighbour_id,long long now);
int overlay_route_tick_node(int bin,int slot,long long now);
int overlay_route_add_advertisements(int interface,overlay_buffer *e);
int overlay_route_add_advertisements(overlay_buffer *e);
int ovleray_route_please_advertise(overlay_node *n);
int overlay_abbreviate_set_current_sender(unsigned char *in);
@ -1052,9 +1066,8 @@ int overlay_route_saw_advertisements(int i,overlay_frame *f, long long now);
int overlay_rhizome_saw_advertisements(int i,overlay_frame *f, long long now);
int overlay_route_please_advertise(overlay_node *n);
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
void rhizome_server_poll(int ignored_file_descriptor);
int rhizome_saw_voice_traffic();
int overlay_saw_mdp_containing_frame(int interface,overlay_frame *f,long long now);
int overlay_saw_mdp_containing_frame(overlay_frame *f,long long now);
#include "nacl.h"
@ -1091,8 +1104,6 @@ int overlay_address_is_broadcast(unsigned char *a);
int overlay_broadcast_generate_address(unsigned char *a);
int overlay_abbreviate_unset_current_sender();
int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
void rhizome_fetch_poll(int fd);
int rhizome_check_connections();
int rhizome_opendb();
typedef struct dna_identity_status {
@ -1156,12 +1167,11 @@ int mkdirsn(const char *path, size_t len, mode_t mode);
#define FORM_SERVAL_INSTANCE_PATH(buf, path) (form_serval_instance_path(buf, sizeof(buf), (path)))
int overlay_mdp_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
void overlay_mdp_poll();
int overlay_mdp_reply_error(int sock,
struct sockaddr_un *recvaddr,int recvaddrlen,
int error_number,char *message);
extern int mdp_abstract_socket;
extern int mdp_named_socket;
extern struct sched_ent mdp_abstract;
extern struct sched_ent mdp_named;
typedef struct sockaddr_mdp {
@ -1317,7 +1327,7 @@ int overlay_mdp_recv(overlay_mdp_frame *mdp,int *ttl);
int overlay_mdp_send(overlay_mdp_frame *mdp,int flags,int timeout_ms);
/* Server-side MDP functions */
int overlay_saw_mdp_frame(int interface, overlay_mdp_frame *mdp,long long now);
int overlay_saw_mdp_frame(overlay_mdp_frame *mdp,long long now);
int overlay_mdp_swap_src_dst(overlay_mdp_frame *mdp);
int overlay_mdp_reply(int sock,struct sockaddr_un *recvaddr,int recvaddrlen,
overlay_mdp_frame *mdpreply);
@ -1427,7 +1437,6 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
int vomp_mdp_received(overlay_mdp_frame *mdp);
char *vomp_describe_state(int state);
char *vomp_describe_codec(int c);
void vomp_tick();
int vomp_tick_interval();
int vomp_sample_size(int c);
int vomp_codec_timespan(int c);
@ -1476,8 +1485,6 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_setup_sockets();
void monitor_poll(int ignored_fd);
void monitor_client_poll(int ignored_fd);
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_call_status(vomp_call_state *call);
int monitor_send_audio(vomp_call_state *call,overlay_mdp_frame *audio);
@ -1540,31 +1547,40 @@ void sigIoHandler(int signal);
#define WRITE_STR(fd, str) write(fd, str, strlen(str))
int rhizome_server_start();
void rhizome_enqueue_suggestions();
int overlay_mdp_setup_sockets();
void overlay_interface_poll(int fd);
void overlay_dummy_poll();
void rhizome_client_poll(int fd);
/* Event queue handling functions */
int schedule(struct sched_ent *alarm);
int unschedule(struct sched_ent *alarm);
int watch(struct sched_ent *alarm);
int unwatch(struct sched_ent *alarm);
int fd_poll();
int fd_checkalarms();
int fd_setalarm(void (*func),long long first_alarm_in,int repeat_every);
int fd_teardown(int fd);
int fd_watch(int fd,void (*func)(int fd),int events);
int fd_list();
char *fd_funcname(void *addr);
int fd_showstats();
void fd_periodicstats();
int fd_next_funcid();
int fd_func_exit(int funcid);
int fd_func_enter(int funcid);
int fd_next_funcid(const char *funcname);
#define IN() static int _func_id=-1; if (_func_id<0) _func_id=fd_next_funcid(__FUNCTION__); fd_func_enter(_func_id);
#define OUT() fd_func_exit(_func_id);
void overlay_interface_discover(struct sched_ent *alarm);
void overlay_check_ticks(struct sched_ent *alarm);
void overlay_dummy_poll(struct sched_ent *alarm);
void overlay_route_tick(struct sched_ent *alarm);
void rhizome_enqueue_suggestions(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm);
void overlay_mdp_poll(struct sched_ent *alarm);
void fd_periodicstats(struct sched_ent *alarm);
void vomp_tick(struct sched_ent *alarm);
void rhizome_check_connections(struct sched_ent *alarm);
void monitor_client_poll(struct sched_ent *alarm);
void monitor_poll(struct sched_ent *alarm);
void overlay_interface_poll(struct sched_ent *alarm);
void rhizome_client_poll(struct sched_ent *alarm);
void rhizome_fetch_poll(struct sched_ent *alarm);
void rhizome_server_poll(struct sched_ent *alarm);
/* function timing routines */
int fd_checkalarms();
int fd_func_exit(struct call_stats *this_call, struct callback_stats *call_stats);
int fd_func_enter(struct call_stats *this_call);
#define IN() static struct callback_stats _aggregate_stats={NULL,0,__FUNCTION__,0,0,0}; struct call_stats _this_call; fd_func_enter(&_this_call);
#define OUT() fd_func_exit(&_this_call, &_aggregate_stats);
#define RETURN(X) { OUT() return(X); }
#define SET_NONBLOCKING(X) fcntl(X,F_SETFL,fcntl(X, F_GETFL, NULL)|O_NONBLOCK);
#define SET_BLOCKING(X) fcntl(X,F_SETFL,fcntl(X, F_GETFL, NULL)&(~O_NONBLOCK));

View File

@ -185,20 +185,7 @@ int server(char *backing_file)
signal(SIGINT, signal_handler);
signal(SIGQUIT, signal_handler);
if (overlayMode)
{
/* Now find and initialise all the suitable network interfaces, i.e.,
those running IPv4.
Packet radio dongles will get discovered later as the interfaces get probed.
This will setup the sockets for the server to communicate on each interface.
XXX - Problems may persist where the same address is used on multiple interfaces,
but otherwise hopefully it will allow us to bridge multiple networks.
*/
overlay_interface_discover();
}
else
if (!overlayMode)
{
/* Create a simple socket for listening on if we are not in overlay mesh mode. */
createServerSocket();
@ -228,7 +215,7 @@ int server(char *backing_file)
/* Called periodically by the server process in its main loop.
*/
void server_shutdown_check()
void server_shutdown_check(struct sched_ent *alarm)
{
if (servalShutdown) {
WHY("Shutdown flag set -- terminating with cleanup");
@ -252,6 +239,10 @@ void server_shutdown_check()
exit(1);
}
}
if (alarm){
alarm->alarm = overlay_gettime_ms()+1000;
schedule(alarm);
}
}
int server_create_stopfile()
@ -430,7 +421,7 @@ void signal_handler(int signal)
/* Terminate the server process. The shutting down should be done from the main-line code
rather than here, so we first try to tell the mainline code to do so. If, however, this is
not the first time we have been asked to shut down, then we will do it here. */
server_shutdown_check();
server_shutdown_check(NULL);
WHY("Asking Serval process to shutdown cleanly");
servalShutdown = 1;
return;
@ -440,8 +431,8 @@ void signal_handler(int signal)
if (sock>-1) close(sock);
int i;
for(i=0;i<overlay_interface_count;i++)
if (overlay_interfaces[i].fd>-1)
close(overlay_interfaces[i].fd);
if (overlay_interfaces[i].alarm.poll.fd>-1)
close(overlay_interfaces[i].alarm.poll.fd);
execv(exec_args[0],exec_args);
/* Quit if the exec() fails */
exit(-3);

44
vomp.c
View File

@ -321,7 +321,7 @@ int vomp_send_status(vomp_call_state *call,int flags,overlay_mdp_frame *arg)
long long now=overlay_gettime_ms();
for(i=0;i<vomp_interested_usock_count;i++)
if (vomp_interested_expiries[i]>=now) {
overlay_mdp_reply(mdp_named_socket,
overlay_mdp_reply(mdp_named.poll.fd,
vomp_interested_usocks[i],
vomp_interested_usock_lengths[i],
&mdp);
@ -475,7 +475,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
if (!memcmp(recvaddr->sun_path,
vomp_interested_usocks[i],recvaddrlen))
/* found it -- so we are already monitoring this one */
return overlay_mdp_reply_error(mdp_named_socket,recvaddr,recvaddrlen,
return overlay_mdp_reply_error(mdp_named.poll.fd,recvaddr,recvaddrlen,
0,"Success");
if (vomp_interested_expiries[i]<now) candidate=i;
}
@ -488,7 +488,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
}
vomp_interested_usocks[i]=malloc(recvaddrlen);
if (!vomp_interested_usocks[i])
return overlay_mdp_reply_error(mdp_named_socket, recvaddr,recvaddrlen,
return overlay_mdp_reply_error(mdp_named.poll.fd, recvaddr,recvaddrlen,
4002,"Out of memory");
bcopy(recvaddr,vomp_interested_usocks[i],
recvaddrlen);
@ -506,10 +506,10 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
}
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,0,"Success");
(mdp_named.poll.fd,recvaddr,recvaddrlen,0,"Success");
} else {
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,
(mdp_named.poll.fd,recvaddr,recvaddrlen,
4003,"Too many listeners (try again in a minute?)");
}
}
@ -537,12 +537,12 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
}
vomp_interested_usock_count--;
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,
(mdp_named.poll.fd,recvaddr,recvaddrlen,
0,"Success. You have been removed.");
}
}
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,
(mdp_named.poll.fd,recvaddr,recvaddrlen,
0,"Success. You were never listening.");
}
break;
@ -586,7 +586,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
}
}
return overlay_mdp_reply(mdp_named_socket,recvaddr,recvaddrlen,&mdpreply);
return overlay_mdp_reply(mdp_named.poll.fd,recvaddr,recvaddrlen,&mdpreply);
}
break;
case VOMPEVENT_DIAL:
@ -594,12 +594,12 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
These need to be passed to the node being called to provide caller id,
and potentially handle call-routing, e.g., if it is a gateway.
*/
fprintf(stderr,"DIAL Request!\n");
DEBUG("DIAL Request!\n");
{
/* Populate call structure */
if (vomp_call_count>=VOMP_MAX_CALLS)
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,4004,
(mdp_named.poll.fd,recvaddr,recvaddrlen,4004,
"All call slots in use");
int slot=vomp_call_count++;
vomp_call_state *call=&vomp_call_states[slot];
@ -619,10 +619,10 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
{
if (urandombytes((unsigned char *)&call->local.session,sizeof(int)))
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,4005,
(mdp_named.poll.fd,recvaddr,recvaddrlen,4005,
"Insufficient entropy");
call->local.session&=VOMP_SESSION_MASK;
printf("session=0x%08x\n",call->local.session);
DEBUGF("session=0x%08x\n",call->local.session);
int i;
for(i=0;i<vomp_call_count;i++)
if (i!=slot)
@ -639,7 +639,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
WHY("sending MDP reply back");
dump("recvaddr",(unsigned char *)recvaddr,recvaddrlen);
int result= overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,0, "Success");
(mdp_named.poll.fd,recvaddr,recvaddrlen,0, "Success");
if (result) WHY("Failed to send MDP reply");
return result;
}
@ -651,12 +651,12 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
=vomp_find_call_by_session(mdp->vompevent.call_session_token);
if (!call)
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,4006,
(mdp_named.poll.fd,recvaddr,recvaddrlen,4006,
"No such call");
if (call->local.state==VOMP_STATE_INCALL) vomp_call_stop_audio(call);
call->local.state=VOMP_STATE_CALLENDED;
monitor_call_status(call);
overlay_mdp_reply_error(mdp_named_socket,
overlay_mdp_reply_error(mdp_named.poll.fd,
recvaddr,recvaddrlen,0,"Success");
return vomp_send_status(call,VOMP_TELLREMOTE|VOMP_TELLINTERESTED,NULL);
}
@ -668,7 +668,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
=vomp_find_call_by_session(mdp->vompevent.call_session_token);
if (!call)
return overlay_mdp_reply_error
(mdp_named_socket,recvaddr,recvaddrlen,4006,
(mdp_named.poll.fd,recvaddr,recvaddrlen,4006,
"No such call");
if (call->local.state==VOMP_STATE_RINGINGIN) {
call->local.state=VOMP_STATE_INCALL;
@ -676,11 +676,11 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
call->ringing=0;
/* state machine does job of starting audio stream, just tell everyone about
the changed state. */
overlay_mdp_reply_error(mdp_named_socket,
overlay_mdp_reply_error(mdp_named.poll.fd,
recvaddr,recvaddrlen,0,"Success");
return vomp_send_status(call,VOMP_TELLREMOTE|VOMP_TELLINTERESTED,NULL);
} else {
overlay_mdp_reply_error(mdp_named_socket,
overlay_mdp_reply_error(mdp_named.poll.fd,
recvaddr,recvaddrlen,4009,
"Call is not RINGINGIN, so cannot be picked up");
}
@ -700,7 +700,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
break;
default:
/* didn't understand it, so respond with an error */
return overlay_mdp_reply_error(mdp_named_socket,
return overlay_mdp_reply_error(mdp_named.poll.fd,
recvaddr,recvaddrlen,4001,
"Invalid VOMPEVENT request (use DIAL,HANGUP,CALLREJECT,AUDIOSTREAMING,REGISTERINTERST,WITHDRAWINTERST only)");
@ -1458,7 +1458,7 @@ int app_vomp_monitor(int argc, const char *const *argv, struct command_line_opti
return overlay_mdp_client_done();
}
void vomp_tick()
void vomp_tick(struct sched_ent *alarm)
{
/* Send any reminder packets for call state, and also process any audio. */
unsigned long long now=overlay_gettime_ms();
@ -1508,7 +1508,9 @@ void vomp_tick()
i--;
break;
}
}
}
alarm->alarm = overlay_gettime_ms()+1000;
schedule(alarm);
return;
}