Various debug fiddles and conversion from select() to poll()

(although poll() still doesn't let us monitor ordinary files
which is silly.)
This commit is contained in:
gardners 2012-01-12 16:47:24 +10:30
parent aeff9a4682
commit bf9f8559f1
8 changed files with 242 additions and 25 deletions

View File

@ -435,6 +435,9 @@ typedef struct overlay_frame {
unsigned char source[32]; unsigned char source[32];
int source_address_status; int source_address_status;
/* IPv4 node frame was received from (if applicable) */
struct sockaddr *recvaddr;
/* Frame content from destination address onwards */ /* Frame content from destination address onwards */
int bytecount; int bytecount;
unsigned char *bytes; unsigned char *bytes;

View File

@ -96,8 +96,9 @@ int overlayServerMode()
5ms between checks if we have a dummy interface running. This is a reasonable simulation 5ms between checks if we have a dummy interface running. This is a reasonable simulation
of wifi latency anyway, so we'll live with it. Larger values will affect voice transport, of wifi latency anyway, so we'll live with it. Larger values will affect voice transport,
and smaller values would affect CPU and energy use, and make the simulation less realistic. */ and smaller values would affect CPU and energy use, and make the simulation less realistic. */
fd_set read_fds;
int maxfd=-1; struct pollfd fds[128];
int fdcount;
/* Create structures to use 1MB of RAM for testing */ /* Create structures to use 1MB of RAM for testing */
overlay_route_init(1); overlay_route_init(1);
@ -116,21 +117,41 @@ int overlayServerMode()
if (nextHlr(hlr,&ofs)) break; if (nextHlr(hlr,&ofs)) break;
} }
/* Get rhizome server started BEFORE populating fd list so that
the server's listen socket is in the list for poll() */
if (rhizome_datastore_path) rhizome_server_poll();
while(1) { while(1) {
/* Work out how long we can wait before we need to tick */ /* Work out how long we can wait before we need to tick */
long long ms=overlay_time_until_next_tick(); long long ms=overlay_time_until_next_tick();
struct timeval waittime;
int filesPresent=0; int filesPresent=0;
FD_ZERO(&read_fds); fds[0].fd=sock; fds[0].events=POLLIN;
fdcount=1;
rhizome_server_get_fds(fds,&fdcount,128);
for(i=0;i<overlay_interface_count;i++) for(i=0;i<overlay_interface_count;i++)
{ {
if (!overlay_interfaces[i].fileP) /* Make socket non-blocking so that poll() behaves correctly.
We then set non-blocking before actually reading from it */
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)&(~O_NONBLOCK));
if ((!overlay_interfaces[i].fileP)&&(fdcount<128))
{ {
if (overlay_interfaces[i].fd>maxfd) maxfd=overlay_interfaces[i].fd; if (debug&DEBUG_IO) {
FD_SET(overlay_interfaces[i].fd,&read_fds); fprintf(stderr,"Interface %s is poll() slot #%d (fd %d)\n",
overlay_interfaces[i].name,
fdcount,
overlay_interfaces[i].fd);
} }
else { filesPresent=1; if (ms>5) ms=5; } fds[fdcount].fd=overlay_interfaces[i].fd;
fds[fdcount].events=POLLRDNORM;
fds[fdcount].revents=0;
fdcount++;
}
if (overlay_interfaces[i].fileP)
{ filesPresent=1; if (ms>5) ms=5; }
} }
/* Progressively update link scores to neighbours etc, and find out how long before /* Progressively update link scores to neighbours etc, and find out how long before
@ -140,23 +161,39 @@ int overlayServerMode()
int route_tick_interval=overlay_route_tick(); int route_tick_interval=overlay_route_tick();
if (ms>route_tick_interval) ms=route_tick_interval; if (ms>route_tick_interval) ms=route_tick_interval;
waittime.tv_usec=(ms%1000)*1000; if (debug&DEBUG_VERBOSE_IO)
waittime.tv_sec=ms/1000; fprintf(stderr,"Waiting via poll() for up to %lldms\n",ms);
int r=poll(fds,fdcount,ms);
if (debug&DEBUG_VERBOSE_IO) fprintf(stderr,"Waiting via select() for up to %lldms\n",ms);
int r=select(maxfd+1,&read_fds,NULL,NULL,&waittime);
if (r<0) { if (r<0) {
/* select had a problem */ /* select had a problem */
if (debug&DEBUG_IO) perror("select()"); if (debug&DEBUG_IO) perror("poll()");
WHY("select() complained."); WHY("select() complained.");
} else if (r>0) { } else if (r>0) {
/* We have data, so try to receive it */ /* We have data, so try to receive it */
if (debug&DEBUG_IO) fprintf(stderr,"select() reports packets waiting\n"); if (debug&DEBUG_IO) {
fprintf(stderr,"poll() reports %d fds ready\n",r);
int i;
for(i=0;i<fdcount;i++) {
if (fds[i].revents)
{
fprintf(stderr," #%d (fd %d): %d (",i,fds[i].fd,fds[i].revents);
if ((fds[i].revents&POLL_IN)==POLL_IN) fprintf(stderr,"POLL_IN,");
if ((fds[i].revents&POLLRDNORM)==POLLRDNORM) fprintf(stderr,"POLLRDNORM,");
if ((fds[i].revents&POLL_OUT)==POLL_OUT) fprintf(stderr,"POLL_OUT,");
if ((fds[i].revents&POLL_ERR)==POLL_ERR) fprintf(stderr,"POLL_ERR,");
if ((fds[i].revents&POLL_HUP)==POLL_HUP) fprintf(stderr,"POLL_HUP,");
if ((fds[i].revents&POLLNVAL)==POLLNVAL) fprintf(stderr,"POLL_NVAL,");
fprintf(stderr,")\n");
}
}
}
overlay_rx_messages(); overlay_rx_messages();
} else { } else {
/* No data before tick occurred, so do nothing. /* No data before tick occurred, so do nothing.
Well, for now let's just check anyway. */ Well, for now let's just check anyway. */
if (debug&DEBUG_IO) fprintf(stderr,"select() timeout.\n"); if (debug&DEBUG_IO) fprintf(stderr,"poll() timeout.\n");
overlay_rx_messages(); overlay_rx_messages();
} }
/* Check if we need to trigger any ticks on any interfaces */ /* Check if we need to trigger any ticks on any interfaces */

View File

@ -242,7 +242,7 @@ int overlay_interface_init(char *name,struct sockaddr_in src_addr,struct sockadd
if (name[0]=='>') { if (name[0]=='>') {
I(fileP)=1; I(fileP)=1;
I(fd) = open(&name[1],O_APPEND|O_NONBLOCK|O_RDWR); I(fd) = open(&name[1],O_APPEND|O_RDWR);
if (I(fd)<1) if (I(fd)<1)
return WHY("could not open dummy interface file for append"); return WHY("could not open dummy interface file for append");
/* Seek to end of file as initial reading point */ /* Seek to end of file as initial reading point */
@ -288,10 +288,22 @@ int overlay_rx_messages()
overlay_last_interface_number=i; overlay_last_interface_number=i;
/* Set socket non-blocking before we try to read from it */
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)|O_NONBLOCK);
if (overlay_interfaces[i].fileP) { if (overlay_interfaces[i].fileP) {
/* Read from dummy interface file */ /* Read from dummy interface file */
long long length=lseek(overlay_interfaces[i].fd,0,SEEK_END); long long length=lseek(overlay_interfaces[i].fd,0,SEEK_END);
if (overlay_interfaces[i].offset<length) if (overlay_interfaces[i].offset>=length)
{
if (debug&DEBUG_OVERLAYINTERFACES)
fprintf(stderr,"At end of input on dummy interface #%d\n",i);
char c;
int r=read(overlay_interfaces[i].fd,&c,1);
fprintf(stderr,"r=%d\n",r);
}
else
{ {
lseek(overlay_interfaces[i].fd,overlay_interfaces[i].offset,SEEK_SET); lseek(overlay_interfaces[i].fd,overlay_interfaces[i].offset,SEEK_SET);
if (debug&DEBUG_OVERLAYINTERFACES) if (debug&DEBUG_OVERLAYINTERFACES)
@ -313,18 +325,27 @@ int overlay_rx_messages()
{ if (packetOk(i,&packet[128],plen,transaction_id,&src_addr,addrlen,1)) WHY("Malformed or unsupported packet from dummy interface (packetOK() failed)"); } } { if (packetOk(i,&packet[128],plen,transaction_id,&src_addr,addrlen,1)) WHY("Malformed or unsupported packet from dummy interface (packetOK() failed)"); } }
else WHY("Invalid packet version in dummy interface"); else WHY("Invalid packet version in dummy interface");
} }
else { c[i]=0; count--; } else {
if (debug&DEBUG_IO) fprintf(stderr,"Read NOTHING from dummy interface\n");
c[i]=0; count--;
}
} }
} else { } else {
/* Read from UDP socket */ /* Read from UDP socket */
plen=recvfrom(overlay_interfaces[i].fd,packet,sizeof(packet),MSG_DONTWAIT, plen=recvfrom(overlay_interfaces[i].fd,packet,sizeof(packet),
MSG_DONTWAIT,
&src_addr,&addrlen); &src_addr,&addrlen);
if (plen<0) { c[i]=0; count--; } else { fprintf(stderr,"Interface #%d (%s): plen=%d\n",
i,overlay_interfaces[i].name,plen);
perror("recvfrom");
if (plen<0) {
c[i]=0; count--;
} else {
/* We have a frame from this interface */ /* We have a frame from this interface */
if (debug&DEBUG_PACKETXFER) if (debug&DEBUG_PACKETXFER)
serval_packetvisualise(stderr,"Read from real interface", serval_packetvisualise(stderr,"Read from real interface",
packet,plen); packet,plen);
if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d\n",plen,i); if (debug&DEBUG_OVERLAYINTERFACES)fprintf(stderr,"Received %d bytes on interface #%d (%s)\n",plen,i,overlay_interfaces[i].name);
if (packetOk(i,packet,plen,NULL,&src_addr,addrlen,1)) WHY("Malformed packet"); if (packetOk(i,packet,plen,NULL,&src_addr,addrlen,1)) WHY("Malformed packet");
} }

View File

@ -20,6 +20,12 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "mphlr.h" #include "mphlr.h"
struct sockaddr_in loopback = {
.sin_family=0,
.sin_port=0,
.sin_addr.s_addr=0x0100007f
};
int packetOkOverlay(int interface,unsigned char *packet,int len,unsigned char *transaction_id, int packetOkOverlay(int interface,unsigned char *packet,int len,unsigned char *transaction_id,
struct sockaddr *recvaddr,int recvaddrlen,int parseP) struct sockaddr *recvaddr,int recvaddrlen,int parseP)
{ {
@ -96,6 +102,16 @@ int packetOkOverlay(int interface,unsigned char *packet,int len,unsigned char *t
f.bytes=NULL; f.bytes=NULL;
f.bytecount=0; f.bytecount=0;
f.prev=NULL; f.next=NULL; f.prev=NULL; f.next=NULL;
if (recvaddr->sa_family==AF_INET)
f.recvaddr=recvaddr;
else {
if (overlay_interfaces[interface].fileP) {
/* dummy interface, so tell to use 0.0.0.0 */
f.recvaddr=(struct sockaddr *)&loopback;
} else
/* some other sort of interface, so we can't offer any help here */
f.recvaddr=NULL;
}
overlay_abbreviate_unset_current_sender(); overlay_abbreviate_unset_current_sender();

View File

@ -201,7 +201,7 @@ long long sqlite_exec_int64(char *sqlformat,...);
int rhizome_update_file_priority(char *fileid); int rhizome_update_file_priority(char *fileid);
int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar); int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar);
char nybltochar(int n); char nybltochar(int n);
int rhizome_queue_manifest_import(rhizome_manifest *m); int rhizome_queue_manifest_import(rhizome_manifest *m,struct sockaddr_in *peerip);
#define RHIZOME_DONTVERIFY 0 #define RHIZOME_DONTVERIFY 0
#define RHIZOME_VERIFY 1 #define RHIZOME_VERIFY 1

View File

@ -501,6 +501,7 @@ char *rhizome_safe_encode(unsigned char *in,int len)
r[o++]=in[i]; r[o++]=in[i];
} }
} }
r[128]=0;
return r; return r;
} }

View File

@ -20,7 +20,137 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "mphlr.h" #include "mphlr.h"
#include "rhizome.h" #include "rhizome.h"
int rhizome_queue_manifest_import(rhizome_manifest *m) typedef struct rhizome_file_fetch_record {
int sock; /* if non-zero this is the socket to read from */
rhizome_manifest *manifest;
char fileid[SHA512_DIGEST_STRING_LENGTH];
FILE *file;
} rhizome_file_fetch_record;
/* List of queued transfers */
#define MAX_QUEUED_FILES 4
int rhizome_file_fetch_queue_count=0;
rhizome_file_fetch_record file_fetch_queue[MAX_QUEUED_FILES];
/*
Queue a manifest for importing.
There are three main cases that can occur here:
1. The manifest has no associated file (filesize=0);
2. The associated file is already in our database; or
3. The associated file is not already in our database, and so we need
to fetch it before we can import it.
Cases (1) and (2) are more or less identical, and all we need to do is to
import the manifest into the database.
Case (3) requires that we fetch the associated file.
This is where life gets interesting.
First, we need to make sure that we can free up enough space in the database
for the file.
Second, we need to work out how we are going to get the file.
If we are on an IPv4 wifi network, then HTTP is probably the way to go.
If we are not on an IPv4 wifi network, then HTTP is not an option, and we need
to use a Rhizome/Overlay protocol to fetch it. It might even be HTTP over MDP
(Serval Mesh Datagram Protocol) or MTCP (Serval Mesh Transmission Control Protocol
-- yet to be specified).
For efficiency, the MDP transfer protocol should allow multiple listeners to
receive the data. In contrast, it would be nice to have the data auth-crypted, if
only to deal with packet errors (but also naughty people who might want to mess
with the transfer.
For HTTP over IPv4, the biggest problem is that we don't know the IPv4 address of
the sender, or in fact that the link is over IPv4 and thus that HTTP over IPv4 is
an option. We probably need to be passed this information.
*/
int rhizome_queue_manifest_import(rhizome_manifest *m,
struct sockaddr_in *peerip)
{ {
if (rhizome_file_fetch_queue_count>=MAX_QUEUED_FILES) {
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Already busy fetching files");
return -1;
}
char *filehash=rhizome_manifest_get(m,"filehash",NULL,0);
long long filesize=rhizome_manifest_get_ll(m,"filesize");
if (filesize>0&&(filehash!=NULL))
{
if (strlen(filehash)!=SHA512_DIGEST_STRING_LENGTH-1)
{
return WHY("File hash is wrong length");
}
int gotfile=
sqlite_exec_int64("SELECT COUNT(*) FROM FILES WHERE ID='%s';",
rhizome_safe_encode((unsigned char *)filehash,
strlen(filehash)));
if (gotfile!=1) {
/* We need to get the file */
/* Discard request if the same manifest is already queued for reception.
*/
int i,j;
for(i=0;i<rhizome_file_fetch_queue_count;i++)
{
for(j=0;j<crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES;j++)
if (m->cryptoSignPublic[j]
!=file_fetch_queue[i].manifest->cryptoSignPublic[j]) break;
if (j==crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES)
{
/* We are already fetching this manifest */
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Already fetching manifest\n");
return -1;
}
for(j=0;j<SHA512_DIGEST_STRING_LENGTH;j++)
if (filehash[j]!=file_fetch_queue[i].fileid[j]) break;
if (j==SHA512_DIGEST_STRING_LENGTH)
{
/* We are already fetching this file */
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Already fetching file %s\n",
filehash);
return -1;
}
}
if (peerip)
{
/* Transfer via HTTP over IPv4 */
int sock = socket(AF_INET,SOCK_STREAM,0);
fcntl(sock,F_SETFL, O_NONBLOCK);
struct sockaddr_in peeraddr;
bcopy(peerip,&peeraddr,sizeof(peeraddr));
peeraddr.sin_port=htons(RHIZOME_HTTP_PORT);
int r=connect(sock,(struct sockaddr*)&peeraddr,sizeof(peeraddr));
if (r!=EINPROGRESS||(r!=0)) {
close (sock);
WHY("Failed to open socket to peer's rhizome web server");
return -1;
}
file_fetch_queue[rhizome_file_fetch_queue_count].manifest=m;
strncpy(file_fetch_queue[rhizome_file_fetch_queue_count].fileid,
filehash,SHA512_DIGEST_STRING_LENGTH);
rhizome_file_fetch_queue_count++;
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Queued file for fetching\n");
WHY("Fetch preparation incomplete (socket state recording is needed)");
return 0;
}
else
{
/* Transfer via overlay */
return WHY("Rhizome fetching via overlay not implemented");
}
}
}
/* got file, so now import */
WHY("Actual importing not implemented");
return WHY("Not implemented."); return WHY("Not implemented.");
} }

View File

@ -81,6 +81,8 @@ int rhizome_server_start()
struct sockaddr_in address; struct sockaddr_in address;
int on=1; int on=1;
WHY("Started rhizome server");
/* Catch broken pipe signals */ /* Catch broken pipe signals */
signal(SIGPIPE,sigPipeHandler); signal(SIGPIPE,sigPipeHandler);
signal(SIGIO,sigIoHandler); signal(SIGIO,sigIoHandler);
@ -253,6 +255,10 @@ int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
if (rhizome_server_socket>-1) if (rhizome_server_socket>-1)
{ {
if (debug&DEBUG_IO) {
fprintf(stderr,"rhizome http server is poll() slot #%d (fd %d)\n",
*fdcount,rhizome_server_socket);
}
fds[*fdcount].fd=rhizome_server_socket; fds[*fdcount].fd=rhizome_server_socket;
fds[*fdcount].events=POLLIN; fds[*fdcount].events=POLLIN;
(*fdcount)++; (*fdcount)++;
@ -261,6 +267,9 @@ int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
for(i=0;i<rhizome_server_live_request_count;i++) for(i=0;i<rhizome_server_live_request_count;i++)
{ {
if ((*fdcount)>=fdmax) return -1; if ((*fdcount)>=fdmax) return -1;
if (debug&DEBUG_IO) {
fprintf(stderr,"rhizome http request #%d is poll() slot #%d (fd %d)\n",
i,*fdcount,rhizome_live_http_requests[i]->socket); }
fds[*fdcount].fd=rhizome_live_http_requests[i]->socket; fds[*fdcount].fd=rhizome_live_http_requests[i]->socket;
switch(rhizome_live_http_requests[i]->request_type) { switch(rhizome_live_http_requests[i]->request_type) {
case RHIZOME_HTTP_REQUEST_RECEIVING: case RHIZOME_HTTP_REQUEST_RECEIVING: