Rhizome web server progress. Still not working, but poll() now is

doing what it should, and we are detecting closed sockets.
Now to add some parsing etc.
This commit is contained in:
gardners 2011-12-22 21:58:18 +10:30
parent 16fb3a3f61
commit ad88045ca6
3 changed files with 188 additions and 24 deletions

View File

@ -856,5 +856,7 @@ extern overlay_node **overlay_nodes;
int overlay_route_saw_advertisements(int i,overlay_frame *f, long long now); int overlay_route_saw_advertisements(int i,overlay_frame *f, long long now);
int overlay_route_please_advertise(overlay_node *n); int overlay_route_please_advertise(overlay_node *n);
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int rhizome_server_poll();
#include "nacl.h" #include "nacl.h"

View File

@ -19,6 +19,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <signal.h>
#include "mphlr.h" #include "mphlr.h"
#include "rhizome.h" #include "rhizome.h"
@ -29,6 +30,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
int rhizome_server_socket=-1; int rhizome_server_socket=-1;
int sigPipeFlag=0;
int sigIoFlag=0;
typedef struct rhizome_http_request { typedef struct rhizome_http_request {
int socket; int socket;
@ -56,8 +59,9 @@ typedef struct rhizome_http_request {
Else emptying the buffer triggers a request to fetch more data. Only if no Else emptying the buffer triggers a request to fetch more data. Only if no
more data is provided do we then close the request. */ more data is provided do we then close the request. */
unsigned char *buffer; unsigned char *buffer;
int buffer_length; int buffer_size; // size
int buffer_offset; int buffer_length; // number of bytes loaded into buffer
int buffer_offset; // where we are between [0,buffer_length)
/* The source specification data which are used in different ways by different /* The source specification data which are used in different ways by different
request types */ request types */
@ -66,29 +70,73 @@ typedef struct rhizome_http_request {
} rhizome_http_request; } rhizome_http_request;
int rhizome_server_free_http_request(rhizome_http_request *r);
int rhizome_server_close_http_request(int i);
#define RHIZOME_SERVER_MAX_LIVE_REQUESTS 32 #define RHIZOME_SERVER_MAX_LIVE_REQUESTS 32
rhizome_http_request *rhizome_live_http_requests[RHIZOME_SERVER_MAX_LIVE_REQUESTS]; rhizome_http_request *rhizome_live_http_requests[RHIZOME_SERVER_MAX_LIVE_REQUESTS];
int rhizome_server_live_request_count=0; int rhizome_server_live_request_count=0;
void sigPipeHandler(int signal)
{
sigPipeFlag++;
return;
}
void sigIoHandler(int signal)
{
printf("sigio\n");
sigIoFlag++;
return;
}
int rhizome_server_start() int rhizome_server_start()
{ {
if (rhizome_server_socket>-1) return 0; if (rhizome_server_socket>-1) return 0;
struct sockaddr_in address; struct sockaddr_in address;
int on=1;
/* Catch broken pipe signals */
signal(SIGPIPE,sigPipeHandler);
signal(SIGIO,sigIoHandler);
rhizome_server_socket=socket(AF_INET,SOCK_STREAM,0); rhizome_server_socket=socket(AF_INET,SOCK_STREAM,0);
if (rhizome_server_socket<0) if (rhizome_server_socket<0)
return WHY("socket() failed starting rhizome http server"); return WHY("socket() failed starting rhizome http server");
setsockopt(rhizome_server_socket, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on));
bzero((char *) &address, sizeof(address)); bzero((char *) &address, sizeof(address));
address.sin_family = AF_INET; address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY; address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(RHIZOME_HTTP_PORT); address.sin_port = htons(RHIZOME_HTTP_PORT);
if (bind(rhizome_server_socket, (struct sockaddr *) &address, if (bind(rhizome_server_socket, (struct sockaddr *) &address,
sizeof(address)) < 0) sizeof(address)) < 0)
{
close(rhizome_server_socket);
rhizome_server_socket=-1;
return WHY("bind() failed starting rhizome http server\n"); return WHY("bind() failed starting rhizome http server\n");
}
int rc = ioctl(rhizome_server_socket, FIONBIO, (char *)&on);
if (rc < 0)
{
perror("ioctl() failed");
close(rhizome_server_socket);
exit(-1);
}
if (listen(rhizome_server_socket,20)) if (listen(rhizome_server_socket,20))
{
close(rhizome_server_socket);
rhizome_server_socket=-1;
return WHY("listen() failed starting rhizome http server\n"); return WHY("listen() failed starting rhizome http server\n");
}
printf("server socket = %d\n",rhizome_server_socket);
return 0; return 0;
} }
@ -100,17 +148,59 @@ int rhizome_server_poll()
int sock; int sock;
int i; int i;
printf("checking on rhizome server connections (and possibly accepting new connections)\n");
/* Having the starting of the server here is helpful in that
if the port is taken by someone else, we will grab it fairly
swiftly once it becomes available. */
if (rhizome_server_socket<0) rhizome_server_start();
if (rhizome_server_socket<0) return 0;
/* Process the existing requests. /* Process the existing requests.
XXX - should use poll or select here */ XXX - should use poll or select here */
if (debug) printf("Checking %d active connections\n",
rhizome_server_live_request_count);
for(i=0;i<rhizome_server_live_request_count;i++) for(i=0;i<rhizome_server_live_request_count;i++)
{ {
rhizome_http_request *r=rhizome_live_http_requests[i];
switch(r->request_type) {
case RHIZOME_HTTP_REQUEST_RECEIVING:
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag=0;
/* Make socket non-blocking */
fcntl(r->socket,F_SETFL,fcntl(r->socket, F_GETFL, NULL)|O_NONBLOCK);
errno=0;
int bytes=read(r->socket,&r->request[r->request_length],
RHIZOME_HTTP_REQUEST_MAXLEN-r->request_length);
printf("Read %d bytes, errno=%d\n",bytes,errno);
/* Make socket blocking again for poll()/select() */
fcntl(r->socket,F_SETFL,fcntl(r->socket, F_GETFL, NULL)&(~O_NONBLOCK));
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
WHY("Closing connection due to sigpipe");
rhizome_server_close_http_request(i);
continue;
}
break;
}
WHY("Processing live HTTP requests not implemented."); WHY("Processing live HTTP requests not implemented.");
} }
/* Deal with any new requests */ /* Deal with any new requests */
/* Make socket non-blocking */
fcntl(rhizome_server_socket,F_SETFL,
fcntl(rhizome_server_socket, F_GETFL, NULL)|O_NONBLOCK);
while ((rhizome_server_live_request_count<RHIZOME_SERVER_MAX_LIVE_REQUESTS) while ((rhizome_server_live_request_count<RHIZOME_SERVER_MAX_LIVE_REQUESTS)
&&((sock=accept(rhizome_server_socket,&addr,&addr_len))>-1)) &&((sock=accept(rhizome_server_socket,&addr,&addr_len))>-1))
{ {
printf("accepting connection.\n");
rhizome_http_request *request = calloc(sizeof(rhizome_http_request),1); rhizome_http_request *request = calloc(sizeof(rhizome_http_request),1);
request->socket=sock; request->socket=sock;
/* We are now trying to read the HTTP request */ /* We are now trying to read the HTTP request */
@ -118,5 +208,51 @@ int rhizome_server_poll()
rhizome_live_http_requests[rhizome_server_live_request_count++]=request; rhizome_live_http_requests[rhizome_server_live_request_count++]=request;
} }
fcntl(rhizome_server_socket,F_SETFL,
fcntl(rhizome_server_socket, F_GETFL, NULL)&(~O_NONBLOCK));
printf("done rhizome checking.\n");
return 0;
}
int rhizome_server_close_http_request(int i)
{
rhizome_server_free_http_request(rhizome_live_http_requests[i]);
/* Make it null, so that if we are the list in the list, the following
assignment still yields the correct behaviour */
rhizome_live_http_requests[i]=NULL;
rhizome_live_http_requests[i]=
rhizome_live_http_requests[rhizome_server_live_request_count-1];
rhizome_server_live_request_count--;
return 0;
}
int rhizome_server_free_http_request(rhizome_http_request *r)
{
if (r->buffer&&r->buffer_size) free(r->buffer);
free(r);
return 0;
}
int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
{
int i;
if ((*fdcount)>=fdmax) return -1;
if (rhizome_server_socket>-1)
{
fds[*fdcount].fd=rhizome_server_socket;
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
for(i=0;i<rhizome_server_live_request_count;i++)
{
if ((*fdcount)>=fdmax) return -1;
fds[*fdcount].fd=rhizome_live_http_requests[i]->socket;
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
return 0; return 0;
} }

View File

@ -511,21 +511,46 @@ int createServerSocket()
return 0; return 0;
} }
extern int sigIoFlag;
extern int rhizome_server_socket;
int simpleServerMode() int simpleServerMode()
{ {
while(1) { while(1) {
unsigned char buffer[16384]; unsigned char buffer[16384];
struct sockaddr recvaddr; struct sockaddr recvaddr;
socklen_t recvaddrlen=sizeof(recvaddr); socklen_t recvaddrlen=sizeof(recvaddr);
struct pollfd fds; struct pollfd fds[128];
int fdcount;
int len; int len;
int r;
bzero((void *)&recvaddr,sizeof(recvaddr)); bzero((void *)&recvaddr,sizeof(recvaddr));
fds.fd=sock; fds.events=POLLIN;
/* Wait patiently for packets to arrive */ /* Get rhizome server started BEFORE populating fd list so that
while (poll(&fds,1,1000)<1) sleep(0); the server's listen socket is in the list for poll() */
if (rhizome_datastore_path) rhizome_server_poll();
/* Get list of file descripters to watch */
fds[0].fd=sock; fds[0].events=POLLIN;
fdcount=1;
rhizome_server_get_fds(fds,&fdcount,128);
printf("poll()ing file descriptors:");
{ int i;
for(i=0;i<fdcount;i++) { printf(" %d",fds[i].fd); } }
printf("\n");
/* Wait patiently for packets to arrive. */
if (rhizome_datastore_path) rhizome_server_poll();
printf("polling %d fds\n",fdcount);
while ((r=poll(fds,fdcount,100000))<1) {
printf("poll returned %d\n",r);
if (sigIoFlag) { sigIoFlag=0; break; }
sleep(0);
}
printf("poll ended: %d fds\n",r);
if (rhizome_datastore_path) rhizome_server_poll();
if (fds[0].revents&POLLIN) {
len=recvfrom(sock,buffer,sizeof(buffer),0,&recvaddr,&recvaddrlen); len=recvfrom(sock,buffer,sizeof(buffer),0,&recvaddr,&recvaddrlen);
client_port=((struct sockaddr_in*)&recvaddr)->sin_port; client_port=((struct sockaddr_in*)&recvaddr)->sin_port;
@ -544,5 +569,6 @@ int simpleServerMode()
} }
if (debug>1) fprintf(stderr,"Finished processing packet, waiting for next one.\n"); if (debug>1) fprintf(stderr,"Finished processing packet, waiting for next one.\n");
} }
}
return 0; return 0;
} }