From 5dc6d122a5eb0251427f6865d227b32d61e8f6d6 Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 22 Jun 2012 13:25:41 +0930 Subject: [PATCH] significant progress towards clean callback scheduler for poll() events and timed callbacks. --- Android.mk | 1 + Makefile.in | 1 + monitor.c | 33 ++++++---- overlay.c | 156 +++++++------------------------------------- overlay_interface.c | 113 +++++++++++++++++--------------- overlay_mdp.c | 63 ++++++------------ overlay_route.c | 8 ++- rhizome.h | 4 +- rhizome_fetch.c | 76 +++++---------------- rhizome_http.c | 103 +++++++++-------------------- serval.h | 29 ++++++-- server.c | 70 +------------------- vomp.c | 5 -- 13 files changed, 202 insertions(+), 460 deletions(-) diff --git a/Android.mk b/Android.mk index eb6a0577..c919f93e 100644 --- a/Android.mk +++ b/Android.mk @@ -44,6 +44,7 @@ SERVALD_SRC_FILES = \ serval-dna/lsif.c \ serval-dna/dna_helper.c \ serval-dna/sighandlers.c \ + serval-dna/fdqueue.c \ serval-dna/monitor.c \ serval-dna/monitor-cli.c \ serval-dna/codecs.c \ diff --git a/Makefile.in b/Makefile.in index 0796bd36..73007987 100644 --- a/Makefile.in +++ b/Makefile.in @@ -47,6 +47,7 @@ SRCS= main.c \ monitor-cli.c \ dna_helper.c \ sighandlers.c \ + fdqueue.c \ codecs.c \ audiodevices.c \ audio_msm_g1.c \ diff --git a/monitor.c b/monitor.c index a6db2bf1..ee216f3d 100644 --- a/monitor.c +++ b/monitor.c @@ -127,7 +127,7 @@ int monitor_setup_sockets() return 0; error: - close(monitor_named_socket); + fd_teardown(monitor_named_socket); monitor_named_socket=-1; return -1; } @@ -170,7 +170,7 @@ int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax) return 0; } -int monitor_poll() +void monitor_poll(int ignored_fd) { int s; unsigned char buffer[1024]; @@ -214,7 +214,7 @@ int monitor_poll() ) { if (0) WHYF("ignored_length=%d",ignored_length); int res = fcntl(s,F_SETFL, O_NONBLOCK); - if (res) { close(s); continue; } + if (res) { fd_teardown(s); continue; } #if defined(HAVE_LINUX_STRUCT_UCRED) struct ucred ucred; #elif defined(HAVE_BSD_STRUCT_UCRED) @@ -230,7 +230,7 @@ int monitor_poll() } if (res) { WHY("Failed to read credentials of monitor.socket client"); - close(s); continue; } + fd_teardown(s); continue; } #if defined(HAVE_LINUX_STRUCT_UCRED) otheruid = ucred.uid; #elif defined(HAVE_BSD_STRUCT_UCRED) @@ -240,12 +240,12 @@ int monitor_poll() if (0) WHYF("monitor.socket client has wrong uid (%d versus %d)", otheruid,getuid()); write(s,"\nCLOSE:Incorrect UID\n",strlen("\nCLOSE:Incorrect UID\n")); - close(s); continue; + fd_teardown(s); continue; } else if (monitor_socket_count>=MAX_MONITOR_SOCKETS ||monitor_socket_count<0) { write(s,"\nCLOSE:All sockets busy\n",strlen("\nCLOSE:All sockets busy\n")); - close(s); + fd_teardown(s); } else { struct monitor_context *c=&monitor_sockets[monitor_socket_count]; c->socket=s; @@ -255,6 +255,7 @@ int monitor_poll() write(s,"\nMONITOR:You are talking to servald\n", strlen("\nMONITOR:You are talking to servald\n")); WHYF("Got %d clients",monitor_socket_count); + fd_watch(c->socket,monitor_client_poll,POLL_IN); } ignored_length=sizeof(ignored_address); @@ -262,7 +263,10 @@ int monitor_poll() fcntl(monitor_named_socket, F_GETFL, NULL)|O_NONBLOCK); } if (errno != EAGAIN) WHY_perror("accept"); +} +void monitor_client_poll(int fd) +{ /* Read from any open connections */ int i; for(i=0;isocket!=fd) continue; fcntl(c->socket,F_SETFL, fcntl(c->socket, F_GETFL, NULL)|O_NONBLOCK); switch(c->state) { @@ -299,7 +304,7 @@ int monitor_poll() /* all other errors; close socket */ WHYF("Tearing down monitor client #%d due to errno=%d (%s)", i,errno,strerror(errno)?strerror(errno):""); - close(c->socket); + fd_teardown(c->socket); if (i==monitor_socket_count-1) { monitor_socket_count--; continue; @@ -338,7 +343,7 @@ int monitor_poll() /* all other errors; close socket */ WHYF("Tearing down monitor client #%d due to errno=%d", i,errno); - close(c->socket); + fd_teardown(c->socket); if (i==monitor_socket_count-1) { monitor_socket_count--; continue; @@ -366,7 +371,7 @@ int monitor_poll() } } - return 0; + return; } int monitor_process_command(int index,char *cmd) @@ -580,7 +585,7 @@ int monitor_announce_bundle(rhizome_manifest *m) /* error sending update, so kill monitor socket */ WHYF("Tearing down monitor client #%d due to errno=%d", i,errno); - close(monitor_sockets[i].socket); + fd_teardown(monitor_sockets[i].socket); if (i==monitor_socket_count-1) { monitor_socket_count--; continue; @@ -628,7 +633,7 @@ int monitor_call_status(vomp_call_state *call) /* error sending update, so kill monitor socket */ WHYF("Tearing down monitor client #%d due to errno=%d", i,errno); - close(monitor_sockets[i].socket); + fd_teardown(monitor_sockets[i].socket); if (i==monitor_socket_count-1) { monitor_socket_count--; continue; @@ -699,7 +704,7 @@ int monitor_tell_clients(unsigned char *msg,int msglen,int mask) /* error sending update, so kill monitor socket */ WHYF("Tearing down monitor client #%d due to errno=%d", i,errno); - close(monitor_sockets[i].socket); + fd_teardown(monitor_sockets[i].socket); if (i==monitor_socket_count-1) { monitor_socket_count--; continue; @@ -735,7 +740,7 @@ int server_probe(int *pid) if (0) DEBUGF("last char='%c' %02x\n",p[len-1],p[len-1]); if (connect(fd, (struct sockaddr*)&addr, len) == -1) { - close(fd); + fd_teardown(fd); return SERVER_NOTRUNNING; } @@ -775,7 +780,7 @@ int server_probe(int *pid) char buff[1024]; int bytes=read(fd,buff,1024); - close(fd); + fd_teardown(fd); if (bytes<0) { return SERVER_NOTRESPONDING; } else if (bytes==0) { diff --git a/overlay.c b/overlay.c index b1f6f9e4..41291091 100644 --- a/overlay.c +++ b/overlay.c @@ -144,155 +144,43 @@ int overlayServerMode() of wifi latency anyway, so we'll live with it. Larger values will affect voice transport, and smaller values would affect CPU and energy use, and make the simulation less realistic. */ - struct pollfd fds[128]; - int fdcount; - /* Create structures to use 1MB of RAM for testing */ overlay_route_init(1); + /* Setup up MDP unix domain sockets */ + overlay_mdp_setup_sockets(); + /* Get rhizome server started BEFORE populating fd list so that the server's listen socket is in the list for poll() */ - if (rhizome_enabled()) rhizome_server_poll(); + if (rhizome_enabled()) rhizome_server_start(); + /* Pick next rhizome files to grab every few seconds + from the priority list continuously being built from observed + bundle announcements */ + fd_setalarm(rhizome_enqueue_suggestions,3000,3000); + + /* Periodically check for new interfaces */ + fd_setalarm(overlay_interface_discover,1,5000); + + /* Periodically check for server shut down */ + fd_setalarm(server_shutdown_check,1,1000); + + /* Periodically update route table. + (Alarm interval is dynamically updated by overlay_route_tick() + based on load/route table size etc) */ + fd_setalarm(overlay_route_tick,1000,1000); while(1) { - TIMING_CHECK(); - - server_shutdown_check(); - - TIMING_CHECK(); + /* Check for activitiy and respond to it */ + fd_list(); + fd_poll(); /* Work out how long we can wait before we need to tick */ - long long ms=overlay_time_until_next_tick(); memabuseCheck(); - TIMING_CHECK(); - //int filesPresent=0; - fds[0].fd=sock; fds[0].events=POLLIN; - fdcount=1; - rhizome_server_get_fds(fds,&fdcount,128); - TIMING_CHECK(); - rhizome_fetching_get_fds(fds,&fdcount,128); - TIMING_CHECK(); - overlay_mdp_get_fds(fds,&fdcount,128); - TIMING_CHECK(); - monitor_get_fds(fds,&fdcount,128); - TIMING_CHECK(); - for(i=0;i5) ms=5; - } - } - TIMING_CHECK(); - - /* Progressively update link scores to neighbours etc, and find out how long before - we should next tick the route table. - Basically the faster the CPU and the sparser the route table, the less often we - will need to tick in order to keep each tick nice and fast. */ - int route_tick_interval=overlay_route_tick(); - if (ms>route_tick_interval) ms=route_tick_interval; - int vomp_tick_time=vomp_tick_interval(); - if (ms>vomp_tick_time) ms=vomp_tick_time; - - TIMING_CHECK(); - if (debug&DEBUG_VERBOSE_IO) - DEBUGF("Waiting via poll() for up to %lldms", ms); - TIMING_PAUSE(); - /* Sanity check maximum poll timeout */ - if (ms<1) ms=1; - if (ms>15000) ms=15000; - int r = poll(fds, fdcount, ms); - TIMING_CHECK(); - if (r == -1) - WHY_perror("poll"); - else if (debug&DEBUG_VERBOSE_IO) { - DEBUGF("poll() says %d file descriptors are ready", r); - int i; - for(i=0;i 0) { - /* We have data, so try to receive it */ - if (debug&DEBUG_IO) { - fprintf(stderr,"poll() reports %d fds ready\n",r); - int i; - for(i=0;i') { I(fileP)=1; + fd_setalarm(overlay_dummy_poll,10,10); char dummyfile[1024]; if (name[1]=='/') { /* Absolute path */ @@ -297,7 +299,47 @@ int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockadd return 0; } -int overlay_rx_messages() +void overlay_interface_poll(int fd) +{ + int i; + int plen=0; + unsigned char packet[16384]; + + for(i=0;i0); count+=c[i]; } + int dummys=0; - /* Grab packets from interfaces in round-robin fashion until all have been grabbed, - or until we have spent too long (maybe 10ms?) */ + /* Check for input on any dummy interfaces that are attached to ordinary + files. We have to do it this way, because poll() says that ordinary + files are always ready for reading, even if at EOF. + + Also, make sure we don't spend too much time here */ int now = overlay_gettime_ms(); while(count>0) { @@ -322,16 +365,12 @@ int overlay_rx_messages() { struct sockaddr src_addr; unsigned int addrlen=sizeof(src_addr); - unsigned char transaction_id[8]; - - overlay_last_interface_number=i; - - /* Set socket non-blocking before we try to read from it */ - fcntl(overlay_interfaces[i].fd, F_SETFL, - fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)|O_NONBLOCK); + unsigned char transaction_id[8]; if (overlay_interfaces[i].fileP) { + dummys++; /* Read from dummy interface file */ + overlay_last_interface_number=i; long long length=lseek(overlay_interfaces[i].fd,0,SEEK_END); if (overlay_interfaces[i].offset>=length) { @@ -372,34 +411,16 @@ int overlay_rx_messages() } } } else { - /* Read from UDP socket */ - int recvttl=1; - plen=recvwithttl(overlay_interfaces[i].fd,packet,sizeof(packet), - &recvttl,&src_addr,&addrlen); - if (plen<0) { - c[i]=0; count--; - } else { - /* We have a frame from this interface */ - if (debug&DEBUG_PACKETRX) { - fflush(stdout); - serval_packetvisualise(stderr,"Read from real interface", - packet,plen); - fflush(stderr); - } - if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d (%s)\n",plen,i,overlay_interfaces[i].name); - - if (packetOk(i,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) { - WHY("Malformed packet"); - serval_packetvisualise(stderr,"Malformed packet", packet,plen); - } - } } } /* Don't sit here forever, or else we will never send any packets */ if (overlay_gettime_ms()>(now+10)) break; } - return 0; + /* Stop watching dummy nets if there are none active */ + if (!dummys) fd_setalarm(overlay_dummy_poll,0,0); + + return ; } int overlay_tx_messages() @@ -565,7 +586,7 @@ overlay_interface_register(char *name, overlay_interfaces[i].broadcast_address.sin_addr.s_addr, local.sin_addr.s_addr, broadcast.sin_addr.s_addr); - close(overlay_interfaces[i].fd); + fd_teardown(overlay_interfaces[i].fd); overlay_interfaces[i].fd = -1; if (overlay_interface_init_socket(i, local, broadcast)) INFOF("Could not reinitialise changed interface %s", name); @@ -582,25 +603,11 @@ overlay_interface_register(char *name, return 0; } -static time_t overlay_last_interface_discover_time = 0; -int -overlay_interface_discover(void) { +void overlay_interface_discover(void) { int no_route, i; - time_t now; struct interface_rules *r; struct sockaddr_in dummyaddr; - /* Don't waste too much time and effort on interface discovery, - especially if we can't attach to a given interface for some reason. */ - now = time(NULL); - if (overlay_last_interface_discover_time > now) - overlay_last_interface_discover_time = now; - - if ((now - overlay_last_interface_discover_time) < 2) - return 0; - - overlay_last_interface_discover_time = now; - /* Mark all interfaces as not observed, so that we know if we need to cull any */ for(i = 0; i < overlay_interface_count; i++) overlay_interfaces[i].observed = 0; @@ -650,7 +657,7 @@ overlay_interface_discover(void) { FATAL("Unable to get any interface information"); } - return 0; + return; } int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,overlay_frame *pax[],int *frame_pax,int frame_max_pax) diff --git a/overlay_mdp.c b/overlay_mdp.c index f20ebb85..f7437781 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -72,6 +72,8 @@ int overlay_mdp_setup_sockets() int send_buffer_size=64*1024; int res = setsockopt(mdp_abstract_socket, SOL_SOCKET, SO_SNDBUF, &send_buffer_size, sizeof(send_buffer_size)); + + fd_watch(mdp_abstract_socket,overlay_mdp_poll,POLL_IN); } } #endif @@ -104,6 +106,7 @@ int overlay_mdp_setup_sockets() &send_buffer_size, sizeof(send_buffer_size)); if (res) WHY_perror("setsockopt"); + fd_watch(mdp_named_socket,overlay_mdp_poll,POLL_IN); } } @@ -111,38 +114,6 @@ int overlay_mdp_setup_sockets() } -int overlay_mdp_get_fds(struct pollfd *fds,int *fdcount,int fdmax) -{ - /* Make sure sockets are open */ - overlay_mdp_setup_sockets(); - - if ((*fdcount)>=fdmax) return -1; - if (mdp_abstract_socket>-1) - { - if (debug&DEBUG_IO) { - fprintf(stderr,"MDP abstract name space socket is poll() slot #%d (fd %d)\n", - *fdcount,mdp_abstract_socket); - } - fds[*fdcount].fd=mdp_abstract_socket; - fds[*fdcount].events=POLLIN; - (*fdcount)++; - } - if ((*fdcount)>=fdmax) return -1; - if (mdp_named_socket>-1) - { - if (debug&DEBUG_IO) { - fprintf(stderr,"MDP named unix domain socket is poll() slot #%d (fd %d)\n", - *fdcount,mdp_named_socket); - } - fds[*fdcount].fd=mdp_named_socket; - fds[*fdcount].events=POLLIN; - (*fdcount)++; - } - - - return 0; -} - #define MDP_MAX_BINDINGS 100 #define MDP_MAX_SOCKET_NAME_LEN 110 int mdp_bindings_initialised=0; @@ -948,7 +919,7 @@ int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP, } } -int overlay_mdp_poll() +void overlay_mdp_poll() { unsigned char buffer[16384]; int ttl; @@ -972,11 +943,14 @@ int overlay_mdp_poll() switch(mdp->packetTypeAndFlags&MDP_TYPE_MASK) { case MDP_GOODBYE: - return overlay_mdp_releasebindings(recvaddr_un,recvaddrlen); + overlay_mdp_releasebindings(recvaddr_un,recvaddrlen); + return; case MDP_VOMPEVENT: - return vomp_mdp_event(mdp,recvaddr_un,recvaddrlen); + vomp_mdp_event(mdp,recvaddr_un,recvaddrlen); + return; case MDP_NODEINFO: - return overlay_route_node_info(mdp,recvaddr_un,recvaddrlen); + overlay_route_node_info(mdp,recvaddr_un,recvaddrlen); + return; case MDP_GETADDRS: { overlay_mdp_frame mdpreply; @@ -1035,17 +1009,20 @@ int overlay_mdp_poll() mdpreply.addrlist.server_sid_count=count; /* Send back to caller */ - return overlay_mdp_reply(mdp_named_socket, - (struct sockaddr_un *)recvaddr,recvaddrlen, - &mdpreply); + overlay_mdp_reply(mdp_named_socket, + (struct sockaddr_un *)recvaddr,recvaddrlen, + &mdpreply); + return; } break; case MDP_TX: /* Send payload (and don't treat it as system privileged) */ - return overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen); + overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen); + return; break; case MDP_BIND: /* Bind to port */ - return overlay_mdp_process_bind_request(mdp_named_socket,mdp, - recvaddr_un,recvaddrlen); + overlay_mdp_process_bind_request(mdp_named_socket,mdp, + recvaddr_un,recvaddrlen); + return; break; default: /* Client is not allowed to send any other frame type */ @@ -1066,7 +1043,7 @@ int overlay_mdp_poll() fcntl(mdp_named_socket, F_GETFL, NULL)&(~O_NONBLOCK)); } - return 0; + return; } int overlay_mdp_relevant_bytes(overlay_mdp_frame *mdp) diff --git a/overlay_route.c b/overlay_route.c index 5a47a8b2..425f078e 100644 --- a/overlay_route.c +++ b/overlay_route.c @@ -1259,7 +1259,7 @@ int overlay_route_tick_next_neighbour_id=0; int overlay_route_tick_neighbour_bundle_size=1; int overlay_route_tick_next_node_bin_id=0; int overlay_route_tick_node_bundle_size=1; -int overlay_route_tick() +void overlay_route_tick() { int n; @@ -1326,7 +1326,11 @@ int overlay_route_tick() int interval=5000/ticks; if (debug&DEBUG_OVERLAYROUTING) fprintf(stderr,"route tick interval = %dms (%d ticks per 5sec, neigh=%lldms, node=%lldms)\n",interval,ticks,neighbour_time,node_time); - return interval; + + /* Update callback interval based on how much work we have to do */ + fd_setalarm(overlay_route_tick,interval,interval); + + return; } /* Ticking neighbours is easy; we just pretend we have heard from them again, diff --git a/rhizome.h b/rhizome.h index 42e87c8d..9e5a946a 100644 --- a/rhizome.h +++ b/rhizome.h @@ -299,4 +299,6 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip); -int rhizome_enqueue_suggestions(); +void rhizome_enqueue_suggestions(); +void rhizome_fetch_poll(int fd); + diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 398e031f..dadc6660 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -485,7 +485,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, return 0; } -int rhizome_enqueue_suggestions() +void rhizome_enqueue_suggestions() { int i; for(i=0;isocket,rhizome_fetch_poll,POLL_OUT); rhizome_file_fetch_queue_count++; if (1||debug&DEBUG_RHIZOME) DEBUGF("Queued file for fetching into %s (%d in queue)", q->manifest->dataFileName, rhizome_file_fetch_queue_count); @@ -693,65 +695,15 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri return 0; } -long long rhizome_last_fetch=0; -int rhizome_poll_fetchP=0; -int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax) -{ - /* Don't fetch quickly during voice calls */ - rhizome_poll_fetchP=0; - long long now=overlay_gettime_ms(); - if (nownow) - return 0; - } - rhizome_last_fetch=now; - rhizome_poll_fetchP=1; - - int i; - if ((*fdcount)>=fdmax) return -1; - - for(i=0;i=fdmax) return -1; - if (debug&DEBUG_RHIZOMESYNC) { - DEBUGF("rhizome file fetch request #%d is poll() slot #%d (fd %d)", - i,*fdcount,file_fetch_queue[i].socket); } - fds[*fdcount].fd=file_fetch_queue[i].socket; - switch(file_fetch_queue[i].state) { - case RHIZOME_FETCH_SENDINGHTTPREQUEST: - fds[*fdcount].events=POLLOUT; break; - case RHIZOME_FETCH_RXHTTPHEADERS: - case RHIZOME_FETCH_RXFILE: - default: - fds[*fdcount].events=POLLIN; break; - } - (*fdcount)++; - } - return 0; -} - -long long rhizome_last_fetch_enqueue_time=0; - -int rhizome_fetch_poll() +void rhizome_fetch_poll(int fd) { int rn; - if (!rhizome_poll_fetchP) return 0; - - if (rhizome_last_fetch_enqueue_timesocket!=fd) continue; /* Make socket non-blocking */ fcntl(q->socket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)|O_NONBLOCK); @@ -775,6 +727,7 @@ int rhizome_fetch_poll() } q->request_len=0; q->request_ofs=0; q->state=RHIZOME_FETCH_RXHTTPHEADERS; + fd_watch(q->socket,rhizome_fetch_poll,POLL_IN); } } else if (errno!=EAGAIN) { WHY("Got error while sending HTTP request. Closing."); @@ -825,12 +778,12 @@ int rhizome_fetch_poll() fclose(q->file); q->file=NULL; const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0); if (id == NULL) - return WHY("Manifest missing ID"); + { WHY("Manifest missing ID"); return; } if (create_rhizome_import_dir() == -1) - return -1; + return; char filename[1024]; if (!FORM_RHIZOME_IMPORT_PATH(filename,"manifest.%s", id)) - return -1; + return; /* Do really write the manifest unchanged */ if (debug&DEBUG_RHIZOME) { DEBUGF("manifest has %d signatories",q->manifest->sig_count); @@ -977,6 +930,9 @@ int rhizome_fetch_poll() rhizome_manifest_free(file_fetch_queue[i].manifest); file_fetch_queue[i].manifest=NULL; + /* close socket and stop watching it */ + fd_teardown(file_fetch_queue[i].socket); + /* reshuffle higher numbered slot down if required */ if (i<(rhizome_file_fetch_queue_count-1)) bcopy(&file_fetch_queue[rhizome_file_fetch_queue_count-1], @@ -990,7 +946,5 @@ int rhizome_fetch_poll() rhizome_file_fetch_queue_count); } } - - - return 0; + return; } diff --git a/rhizome_http.c b/rhizome_http.c index 55bb058c..bb404012 100644 --- a/rhizome_http.c +++ b/rhizome_http.c @@ -93,7 +93,7 @@ int rhizome_server_start() if (bind(rhizome_server_socket, (struct sockaddr *) &address, sizeof(address)) < 0) { - close(rhizome_server_socket); + fd_teardown(rhizome_server_socket); rhizome_server_socket=-1000; if (debug&DEBUG_RHIZOME) WHY("bind() failed starting rhizome http server"); return -1; @@ -103,44 +103,32 @@ int rhizome_server_start() if (rc < 0) { perror("ioctl() failed"); - close(rhizome_server_socket); + fd_teardown(rhizome_server_socket); + rhizome_server_socket=-1; exit(-1); } if (listen(rhizome_server_socket,20)) { - close(rhizome_server_socket); + fd_teardown(rhizome_server_socket); rhizome_server_socket=-1; return WHY("listen() failed starting rhizome http server"); } + /* Add Rhizome HTTPd server to list of file descriptors to watch */ + fd_watch(rhizome_server_socket,rhizome_server_poll,POLL_IN); + return 0; } -int rhizome_poll_httpP=0; - -int rhizome_server_poll() +void rhizome_client_poll(int fd) { - struct sockaddr addr; - unsigned int addr_len=0; - int sock; int rn; - - if (!rhizome_poll_httpP) return 0; - /* Having the starting of the server here is helpful in that - if the port is taken by someone else, we will grab it fairly - swiftly once it becomes available. */ - if (rhizome_server_socket<0) rhizome_server_start(); - if (rhizome_server_socket<0) return 0; - - /* Process the existing requests. - XXX - should use poll or select here */ - if (0&&debug&DEBUG_RHIZOME) WHYF("Checking %d active connections", - rhizome_server_live_request_count); for(rn=0;rnsocket!=fd) continue; switch(r->request_type) { case RHIZOME_HTTP_REQUEST_RECEIVING: @@ -198,9 +186,23 @@ int rhizome_server_poll() /* Socket already has request -- so just try to send some data. */ rhizome_server_http_send_bytes(rn,r); break; - } + } + /* We have processed the connection that has activity, so we can return + immediately */ + return; } + + return; +} + + +void rhizome_server_poll(int ignored_file_descriptor) +{ + struct sockaddr addr; + unsigned int addr_len=0; + int sock; + /* Deal with any new requests */ /* Make socket non-blocking */ fcntl(rhizome_server_socket,F_SETFL, @@ -214,17 +216,18 @@ int rhizome_server_poll() /* We are now trying to read the HTTP request */ request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING; rhizome_live_http_requests[rhizome_server_live_request_count++]=request; + /* Watch for input */ + fd_watch(request->socket,rhizome_client_poll,POLL_IN); } fcntl(rhizome_server_socket,F_SETFL, fcntl(rhizome_server_socket, F_GETFL, NULL)&(~O_NONBLOCK)); - - return 0; } int rhizome_server_close_http_request(int i) { - close(rhizome_live_http_requests[i]->socket); + fd_teardown(rhizome_live_http_requests[i]->socket); + rhizome_server_free_http_request(rhizome_live_http_requests[i]); /* Make it null, so that if we are the list in the list, the following assignment still yields the correct behaviour */ @@ -245,52 +248,6 @@ int rhizome_server_free_http_request(rhizome_http_request *r) return 0; } -long long rhizome_last_http_send=0; -int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax) -{ - int i; - if ((*fdcount)>=fdmax) return -1; - rhizome_poll_httpP=0; - /* Don't send quickly during voice calls */ - long long now=overlay_gettime_ms(); - if (nownow) - return 0; - } - rhizome_last_http_send=now; - rhizome_poll_httpP=1; - - if (rhizome_server_socket>-1) - { - if (debug&DEBUG_IO) { - WHYF("rhizome http server is poll() slot #%d (fd %d)", - *fdcount,rhizome_server_socket); - } - fds[*fdcount].fd=rhizome_server_socket; - fds[*fdcount].events=POLLIN; - (*fdcount)++; - } - - for(i=0;i=fdmax) return -1; - if (debug&DEBUG_IO) { - WHYF("rhizome http request #%d is poll() slot #%d (fd %d)", - i,*fdcount,rhizome_live_http_requests[i]->socket); } - fds[*fdcount].fd=rhizome_live_http_requests[i]->socket; - switch(rhizome_live_http_requests[i]->request_type) { - case RHIZOME_HTTP_REQUEST_RECEIVING: - fds[*fdcount].events=POLLIN; break; - default: - fds[*fdcount].events=POLLOUT; break; - } - (*fdcount)++; - } - return 0; -} - void hexFilter(char *s) { char *t; @@ -466,6 +423,9 @@ int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r) int rhizome_server_parse_http_request(int rn,rhizome_http_request *r) { char id[1024]; + + /* Switching to writing, so update the call-back */ + fd_watch(r->socket,rhizome_client_poll,POLL_OUT); /* Clear request type flags */ r->request_type=0; @@ -502,6 +462,7 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r) else if (sscanf(r->request,"GET /rhizome/file/%s HTTP/1.", id)==1) { /* Stream the specified file */ + int dud=0; int i; hexFilter(id); diff --git a/serval.h b/serval.h index 11809438..cb9de47d 100755 --- a/serval.h +++ b/serval.h @@ -824,8 +824,7 @@ long long parse_quantity(char *q); int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockaddr_in broadcast, int speed_in_bits,int port,int type); int overlay_interface_init_socket(int i,struct sockaddr_in src_addr,struct sockaddr_in broadcast); -int overlay_interface_discover(); -int overlay_interface_discover(); +void overlay_interface_discover(); long long overlay_time_until_next_tick(); int overlay_rx_messages(); int overlay_check_ticks(); @@ -1038,7 +1037,7 @@ int overlay_route_record_link(long long now,unsigned char *to, unsigned char *via,int sender_interface, unsigned int s1,unsigned int s2,int score,int gateways_en_route); int overlay_route_dump(); -int overlay_route_tick(); +void overlay_route_tick(); int overlay_route_tick_neighbour(int neighbour_id,long long now); int overlay_route_tick_node(int bin,int slot,long long now); int overlay_route_add_advertisements(int interface,overlay_buffer *e); @@ -1054,7 +1053,7 @@ int overlay_route_saw_advertisements(int i,overlay_frame *f, long long now); int overlay_rhizome_saw_advertisements(int i,overlay_frame *f, long long now); int overlay_route_please_advertise(overlay_node *n); int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax); -int rhizome_server_poll(); +void rhizome_server_poll(int ignored_file_descriptor); int rhizome_saw_voice_traffic(); int overlay_saw_mdp_containing_frame(int interface,overlay_frame *f,long long now); @@ -1093,7 +1092,7 @@ int overlay_address_is_broadcast(unsigned char *a); int overlay_broadcast_generate_address(unsigned char *a); int overlay_abbreviate_unset_current_sender(); int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax); -int rhizome_fetch_poll(); +void rhizome_fetch_poll(int fd); int rhizome_opendb(); typedef struct dna_identity_status { @@ -1157,7 +1156,7 @@ int mkdirsn(const char *path, size_t len, mode_t mode); #define FORM_SERVAL_INSTANCE_PATH(buf, path) (form_serval_instance_path(buf, sizeof(buf), (path))) int overlay_mdp_get_fds(struct pollfd *fds,int *fdcount,int fdmax); -int overlay_mdp_poll(); +void overlay_mdp_poll(); int overlay_mdp_reply_error(int sock, struct sockaddr_un *recvaddr,int recvaddrlen, int error_number,char *message); @@ -1477,7 +1476,8 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax); int monitor_setup_sockets(); -int monitor_poll(); +void monitor_poll(int ignored_fd); +void monitor_client_poll(int ignored_fd); int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax); int monitor_call_status(vomp_call_state *call); int monitor_send_audio(vomp_call_state *call,overlay_mdp_frame *audio); @@ -1536,3 +1536,18 @@ void sigIoHandler(int signal); #define DEFAULT_MONITOR_SOCKET_NAME "org.servalproject.servald.monitor.socket" #define DEFAULT_MDP_SOCKET_NAME "org.servalproject.servald.mdp.socket" + +/* Event queue handling functions */ +int fd_poll(); +int fd_checkalarms(); +int fd_setalarm(void (*func),long long first_alarm_in,int repeat_every); +int fd_teardown(int fd); +int fd_watch(int fd,void (*func)(int fd),int events); +int fd_list(); + +int rhizome_server_start(); +void rhizome_enqueue_suggestions(); +int overlay_mdp_setup_sockets(); +void overlay_interface_poll(int fd); +void overlay_dummy_poll(); +void rhizome_client_poll(int fd); diff --git a/server.c b/server.c index 17234599..96b4fac0 100644 --- a/server.c +++ b/server.c @@ -209,8 +209,7 @@ int server(char *backing_file) fprintf(f,"%d\n", server_getpid); fclose(f); - if (!overlayMode) simpleServerMode(); - else overlayServerMode(); + overlayServerMode(); return 0; } @@ -810,73 +809,6 @@ int createServerSocket() return 0; } -extern int sigIoFlag; -extern int rhizome_server_socket; -int simpleServerMode() -{ - while(1) { - struct sockaddr recvaddr; - socklen_t recvaddrlen=sizeof(recvaddr); - struct pollfd fds[128]; - int fdcount; - int len; - int r; - - server_shutdown_check(); - - bzero((void *)&recvaddr,sizeof(recvaddr)); - - /* Get rhizome server started BEFORE populating fd list so that - the server's listen socket is in the list for poll() */ - if (rhizome_enabled()) rhizome_server_poll(); - - /* Get list of file descripters to watch */ - fds[0].fd=sock; fds[0].events=POLLIN; - fdcount=1; - rhizome_server_get_fds(fds,&fdcount,128); - if (debug&DEBUG_IO) { - printf("poll()ing file descriptors:"); - { int i; - for(i=0;isin_port; - client_addr=((struct sockaddr_in*)&recvaddr)->sin_addr; - - if (debug&DEBUG_DNAREQUESTS) fprintf(stderr,"Received packet from %s:%d (len=%d).\n",inet_ntoa(client_addr),client_port,len); - if (debug&DEBUG_PACKETRX) dump("recvaddr",(unsigned char *)&recvaddr,recvaddrlen); - if (debug&DEBUG_PACKETRX) dump("packet",(unsigned char *)buffer,len); - if (dropPacketP(len)) { - if (debug&DEBUG_SIMULATION) DEBUG("Simulation mode: Dropped packet due to simulated link parameters."); - continue; - } - /* Simple server mode doesn't really use interface numbers, so lie and say interface -1 */ - if (packetOk(-1,buffer,len,NULL,ttl,&recvaddr,recvaddrlen,1)) { - if (debug&DEBUG_PACKETFORMATS) DEBUG("Ignoring invalid packet"); - } - if (debug&DEBUG_PACKETRX) DEBUG("Finished processing packet, waiting for next one."); - } - } - return 0; -} - #ifdef DEBUG_MEM_ABUSE unsigned char groundzero[65536]; int memabuseInitP=0; diff --git a/vomp.c b/vomp.c index 4c745617..01b3691c 100644 --- a/vomp.c +++ b/vomp.c @@ -1511,8 +1511,3 @@ int vomp_tick() return 0; } -int vomp_tick_interval() -{ - /* Work out the number of milliseconds until the next vomp tick is required. */ - return 1000; -}