Fix rhizome transfers

This commit is contained in:
Jeremy Lakeman 2012-06-27 16:54:42 +09:30
parent baf8543def
commit 29cba17891
12 changed files with 488 additions and 466 deletions

View File

@ -418,7 +418,7 @@ int audio_msm_g1_poll_fds(struct pollfd *fds,int slots)
int count=0;
if (playFd>-1&&slots>0) {
fds[count].fd=playFd;
fds[count].events=POLL_IN;
fds[count].events=POLLIN;
count++; slots--;
}
return count;

View File

@ -88,8 +88,16 @@ int fd_watch(int fd,void (*func)(int fd),int events)
if (fdcount>=MAX_WATCHED_FDS)
return WHYF("Currently watching too many file descriptors. This should never happen; report a bug.");
fds[fdcount].fd=fd;
fds[fdcount++].events=events;
int i;
for(i=0;i<fdcount;i++)
if (fds[i].fd==fd)
break;
fds[i].fd=fd;
fds[i].events=events;
if (i==fdcount)
fdcount++;
if (func!=fd_functions[fd]) {
fd_stats[fd].max_time=0;
fd_stats[fd].total_time=0;
@ -215,7 +223,15 @@ int fd_poll()
for(i=0;i<fdcount;i++)
if (fds[i].revents) {
long long now=overlay_gettime_ms();
/* Make socket non-blocking */
SET_NONBLOCKING(fds[i].fd);
fd_functions[fds[i].fd](fds[i].fd);
/* Make socket blocking again */
SET_BLOCKING(fds[i].fd);
long long elapsed=overlay_gettime_ms()-now;
fd_update_stats(&fd_stats[fds[i].fd],elapsed);
}
@ -249,6 +265,7 @@ func_descriptions func_names[]={
{rhizome_server_poll,"rhizome_server_poll"},
{fd_periodicstats,"fd_periodicstats"},
{vomp_tick,"vomp_tick"},
{rhizome_check_connections,"rhizome_check_connections"},
{NULL,NULL}
};
@ -291,11 +308,11 @@ int fd_list()
INFOF("---------------------------------");
for(i=0;i<fdcount;i++) {
char *eventdesc="<somethinged>";
if ((fds[i].events&POLL_IN)&&(fds[i].events&POLL_OUT))
if ((fds[i].events&POLLIN)&&(fds[i].events&POLLOUT))
eventdesc="read or written";
else if (fds[i].events&POLL_IN)
else if (fds[i].events&POLLIN)
eventdesc="read";
else if (fds[i].events&POLL_OUT)
else if (fds[i].events&POLLOUT)
eventdesc="written";
INFOF("%s() when fd#%d can be %s",
@ -366,8 +383,10 @@ int fd_showstats()
}
for(i=0;i<fdcount;i++) {
char desc[1024];
snprintf(desc,1024,"%s() fd#%d callback",
fd_funcname(fd_functions[fds[i].fd]),fds[i].fd);
snprintf(desc,1024,"%s() fd#%d %s%scallback",
fd_funcname(fd_functions[fds[i].fd]),fds[i].fd,
fds[i].events&POLLIN?"IN ":"",
fds[i].events&POLLOUT?"OUT ":"");
fd_showstat(&total,&fd_stats[fds[i].fd],desc);
}
fd_showstat(&total,&total,"TOTAL");

View File

@ -132,11 +132,9 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio
if (audev&&audev->poll_fds) fdcount+=audev->poll_fds(&fds[fdcount],128-fdcount);
poll(fds,fdcount,1000);
fcntl(fd,F_SETFL,
fcntl(fd, F_GETFL, NULL)|O_NONBLOCK);
SET_NONBLOCKING(fd);
if (interactiveP)
fcntl(STDIN_FILENO,F_SETFL,
fcntl(STDIN_FILENO, F_GETFL, NULL)|O_NONBLOCK);
SET_NONBLOCKING(STDIN_FILENO);
int bytes;
int i;
@ -184,10 +182,8 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio
audioRecordBufferBytes-=audioRecordBufferOffset;
}
fcntl(fd,F_SETFL,
fcntl(fd, F_GETFL, NULL)&~O_NONBLOCK);
fcntl(STDIN_FILENO,F_SETFL,
fcntl(STDIN_FILENO, F_GETFL, NULL)&~O_NONBLOCK);
SET_BLOCKING(fd);
SET_BLOCKING(STDIN_FILENO);
}
return 0;

View File

@ -118,7 +118,7 @@ int monitor_setup_sockets()
WHY_perror("setsockopt");
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO)) WHY("Monitor server socket setup");
fd_watch(monitor_named_socket,monitor_poll,POLL_IN);
fd_watch(monitor_named_socket,monitor_poll,POLLIN);
return 0;
error:
@ -158,8 +158,6 @@ void monitor_poll(int ignored_fd)
}
/* Check for new connections */
fcntl(monitor_named_socket, F_SETFL,
fcntl(monitor_named_socket, F_GETFL, NULL) | O_NONBLOCK);
/* We don't care about the peer's address */
ignored_length = 0;
while (
@ -186,8 +184,7 @@ void monitor_client_poll(int fd)
int bytes;
struct monitor_context *c=&monitor_sockets[i];
if (c->socket!=fd) continue;
fcntl(c->socket,F_SETFL,
fcntl(c->socket, F_GETFL, NULL) | O_NONBLOCK);
switch(c->state) {
case MONITOR_STATE_COMMAND:
bytes = 1;
@ -293,12 +290,7 @@ static void monitor_new_client(int s) {
uid_t otheruid;
struct monitor_context *c;
#ifndef HAVE_LINUX_IF_H
if ((res = fcntl(s, F_SETFL, O_NONBLOCK)) == -1) {
WHY_perror("fcntl()");
goto error;
}
#endif
SET_NONBLOCKING(s);
#ifdef linux
len = sizeof(ucred);
@ -337,10 +329,7 @@ static void monitor_new_client(int s) {
INFOF("Got %d clients", monitor_socket_count);
}
fcntl(monitor_named_socket,F_SETFL,
fcntl(monitor_named_socket, F_GETFL, NULL)|O_NONBLOCK);
fd_watch(s, monitor_client_poll, POLL_IN);
fd_watch(s, monitor_client_poll, POLLIN);
return;
@ -355,17 +344,17 @@ int monitor_process_command(int index,char *cmd)
char sid[MONITOR_LINE_LENGTH],localDid[MONITOR_LINE_LENGTH];
char remoteDid[MONITOR_LINE_LENGTH],digits[MONITOR_LINE_LENGTH];
overlay_mdp_frame mdp;
IN();
mdp.packetTypeAndFlags=MDP_VOMPEVENT;
struct monitor_context *c=&monitor_sockets[index];
c->line_length=0;
fcntl(c->socket,F_SETFL,
fcntl(c->socket, F_GETFL, NULL)|O_NONBLOCK);
if (strlen(cmd)>MONITOR_LINE_LENGTH) {
WRITE_STR(c->socket,"\nERROR:Command too long\n");
return -1;
RETURN(-1);
}
char msg[1024];
@ -388,7 +377,7 @@ int monitor_process_command(int index,char *cmd)
/* Start getting sample */
c->sample_call_session_token=callSessionToken;
c->sample_codec=sampleType;
return 0;
RETURN(0);
}
}
}
@ -486,13 +475,10 @@ int monitor_process_command(int index,char *cmd)
}
fcntl(c->socket,F_SETFL,
fcntl(c->socket, F_GETFL, NULL)|O_NONBLOCK);
snprintf(msg,1024,"\nMONITORSTATUS:%d\n",c->flags);
WRITE_STR(c->socket,msg);
return 0;
RETURN(0);
}
int monitor_process_data(int index)
@ -506,9 +492,6 @@ int monitor_process_data(int index)
WHYF("Ignoring sample block of incorrect size (expected %d, got %d bytes for codec %d)",
vomp_sample_size(c->sample_codec), c->data_offset, c->sample_codec);
fcntl(c->socket,F_SETFL,
fcntl(c->socket, F_GETFL, NULL)|O_NONBLOCK);
vomp_call_state *call=vomp_find_call_by_session(c->sample_call_session_token);
if (!call) {
WRITE_STR(c->socket,"\nERROR:No such call\n");
@ -551,9 +534,12 @@ int monitor_announce_bundle(rhizome_manifest *m)
continue;
nextInSameSlot:
errno=0;
fcntl(monitor_sockets[i].socket,F_SETFL,
fcntl(monitor_sockets[i].socket, F_GETFL, NULL)|O_NONBLOCK);
SET_NONBLOCKING(monitor_sockets[i].socket);
WRITE_STR(monitor_sockets[i].socket,msg);
SET_BLOCKING(monitor_sockets[i].socket);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
@ -579,6 +565,7 @@ int monitor_call_status(vomp_call_state *call)
int i;
char msg[1024];
int show=0;
IN();
if (call->local.state>call->local.last_state) show=1;
if (call->remote.state>call->remote.last_state) show=1;
call->local.last_state=call->local.state;
@ -599,9 +586,9 @@ int monitor_call_status(vomp_call_state *call)
continue;
nextInSameSlot:
errno=0;
fcntl(monitor_sockets[i].socket,F_SETFL,
fcntl(monitor_sockets[i].socket, F_GETFL, NULL)|O_NONBLOCK);
SET_NONBLOCKING(monitor_sockets[i].socket);
WRITE_STR(monitor_sockets[i].socket,msg);
SET_BLOCKING(monitor_sockets[i].socket);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHYF("Tearing down monitor client #%d due to errno=%d",
@ -620,7 +607,7 @@ int monitor_call_status(vomp_call_state *call)
}
}
}
return 0;
RETURN(0);
}
int monitor_announce_peer(unsigned char *sid)
@ -659,15 +646,16 @@ int monitor_send_audio(vomp_call_state *call,overlay_mdp_frame *audio)
int monitor_tell_clients(char *msg, int msglen, int mask)
{
int i;
IN();
for(i=0;i<monitor_socket_count;i++)
{
if (!(monitor_sockets[i].flags&mask))
continue;
nextInSameSlot:
errno=0;
fcntl(monitor_sockets[i].socket,F_SETFL,
fcntl(monitor_sockets[i].socket, F_GETFL, NULL)|O_NONBLOCK);
SET_NONBLOCKING(monitor_sockets[i].socket);
write(monitor_sockets[i].socket, msg, msglen);
SET_BLOCKING(monitor_sockets[i].socket);
// WHYF("Writing AUDIOPACKET to client");
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
@ -686,5 +674,5 @@ int monitor_tell_clients(char *msg, int msglen, int mask)
}
}
}
return 0;
RETURN(0);
}

View File

@ -188,7 +188,7 @@ overlay_interface_init_socket(int interface, struct sockaddr_in src_addr, struct
I(fileP) = 0;
I(fd) = socket(PF_INET,SOCK_DGRAM,0);
fd_watch(I(fd),overlay_interface_poll,POLL_IN);
fd_watch(I(fd),overlay_interface_poll,POLLIN);
WHYF("Watching fd#%d for interface #%d",I(fd),interface);
if (I(fd) < 0) {
WHY_perror("socket()");
@ -318,8 +318,6 @@ void overlay_interface_poll(int fd)
enable stats to accurately count packets received */
// while (plen>0) {
int recvttl=1;
fcntl(overlay_interfaces[i].fd, F_SETFL,
fcntl(overlay_interfaces[i].fd, F_GETFL, NULL)|O_NONBLOCK);
plen=recvwithttl(overlay_interfaces[i].fd,packet,sizeof(packet),
&recvttl,&src_addr,&addrlen);
if (plen<1) {

View File

@ -73,7 +73,7 @@ int overlay_mdp_setup_sockets()
int res = setsockopt(mdp_abstract_socket, SOL_SOCKET, SO_SNDBUF,
&send_buffer_size, sizeof(send_buffer_size));
fd_watch(mdp_abstract_socket,overlay_mdp_poll,POLL_IN);
fd_watch(mdp_abstract_socket,overlay_mdp_poll,POLLIN);
}
}
#endif
@ -106,7 +106,7 @@ int overlay_mdp_setup_sockets()
&send_buffer_size, sizeof(send_buffer_size));
if (res)
WHY_perror("setsockopt");
fd_watch(mdp_named_socket,overlay_mdp_poll,POLL_IN);
fd_watch(mdp_named_socket,overlay_mdp_poll,POLLIN);
}
}
@ -963,10 +963,11 @@ void overlay_mdp_poll()
if (mdp_named_socket>-1) {
ttl=-1;
bzero((void *)recvaddrbuffer,sizeof(recvaddrbuffer));
fcntl(mdp_named_socket, F_SETFL,
fcntl(mdp_named_socket, F_GETFL, NULL)|O_NONBLOCK);
SET_NONBLOCKING(mdp_named_socket);
int len = recvwithttl(mdp_named_socket,buffer,sizeof(buffer),&ttl,
recvaddr,&recvaddrlen);
SET_BLOCKING(mdp_named_socket);
recvaddr_un=(struct sockaddr_un *)recvaddr;
if (len>0) {
@ -1067,12 +1068,12 @@ void overlay_mdp_poll()
/* We ignore the result of the following, because it is just sending an
error message back to the client. If this fails, where would we report
the error to? My point exactly. */
SET_NONBLOCKING(mdp_named_socket);
sendto(mdp_named_socket,mdp,len,0,(struct sockaddr *)recvaddr,recvaddrlen);
SET_BLOCKING(mdp_named_socket);
}
}
fcntl(mdp_named_socket, F_SETFL,
fcntl(mdp_named_socket, F_GETFL, NULL)&(~O_NONBLOCK));
}
return;
@ -1142,10 +1143,10 @@ int overlay_mdp_send(overlay_mdp_frame *mdp,int flags,int timeout_ms)
if (!FORM_SERVAL_INSTANCE_PATH(name.sun_path, "mdp.socket"))
return -1;
fcntl(mdp_client_socket, F_SETFL,
fcntl(mdp_client_socket, F_GETFL, NULL)|O_NONBLOCK);
SET_NONBLOCKING(mdp_client_socket);
int result=sendto(mdp_client_socket, mdp, len, 0,
(struct sockaddr *)&name, sizeof(struct sockaddr_un));
SET_BLOCKING(mdp_client_socket);
if (result<0) {
mdp->packetTypeAndFlags=MDP_ERROR;
mdp->error.error=1;
@ -1270,17 +1271,21 @@ int overlay_mdp_client_poll(long long timeout_ms)
int overlay_mdp_recv(overlay_mdp_frame *mdp,int *ttl)
{
char mdp_socket_name[101];
if (!FORM_SERVAL_INSTANCE_PATH(mdp_socket_name, "mdp.socket"))
return WHY("Could not find mdp socket");
/* Check if reply available */
fcntl(mdp_client_socket, F_SETFL, fcntl(mdp_client_socket, F_GETFL, NULL)|O_NONBLOCK);
unsigned char recvaddrbuffer[1024];
struct sockaddr *recvaddr=(struct sockaddr *)recvaddrbuffer;
unsigned int recvaddrlen=sizeof(recvaddrbuffer);
struct sockaddr_un *recvaddr_un;
if (!FORM_SERVAL_INSTANCE_PATH(mdp_socket_name, "mdp.socket"))
return WHY("Could not find mdp socket");
mdp->packetTypeAndFlags=0;
/* Check if reply available */
SET_NONBLOCKING(mdp_client_socket);
int len = recvwithttl(mdp_client_socket,(unsigned char *)mdp,
sizeof(overlay_mdp_frame),ttl,recvaddr,&recvaddrlen);
SET_BLOCKING(mdp_client_socket);
recvaddr_un=(struct sockaddr_un *)recvaddr;
/* Null terminate received address so that the stat() call below can succeed */
if (recvaddrlen<1024) recvaddrbuffer[recvaddrlen]=0;

View File

@ -79,11 +79,9 @@ typedef struct rhizome_http_request {
int source_record_size;
unsigned int source_flags;
char *blob_table;
char *blob_column;
unsigned long long blob_rowid;
sqlite3_blob *blob;
/* source_index used for offset in blob */
unsigned long long blob_end;
long long blob_end;
} rhizome_http_request;
@ -256,7 +254,7 @@ int sqlite_exec_int64(long long *result, const char *sqlformat,...);
int sqlite_exec_strbuf(strbuf sb, const char *sqlformat,...);
int rhizome_server_http_response_header(rhizome_http_request *r,int result,
char *mime_type,unsigned long long bytes);
int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r);
int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r, char *table, char *column);
double rhizome_manifest_get_double(rhizome_manifest *m,char *var,double default_value);
int rhizome_manifest_extract_signature(rhizome_manifest *m,int *ofs);
int rhizome_update_file_priority(const char *fileid);

View File

@ -130,14 +130,19 @@ int rhizome_opendb()
{
if (rhizome_db) return 0;
if (create_rhizome_datastore_dir() == -1)
return WHY("No Directory");
IN();
if (create_rhizome_datastore_dir() == -1){
RETURN(WHY("No Directory"));
}
char dbpath[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(dbpath, "rhizome.db"))
return WHY("Invalid path");
if (!FORM_RHIZOME_DATASTORE_PATH(dbpath, "rhizome.db")){
RETURN(WHY("Invalid path"));
}
if (sqlite3_open(dbpath,&rhizome_db))
return WHYF("SQLite could not open database %s: %s", dbpath, sqlite3_errmsg(rhizome_db));
if (sqlite3_open(dbpath,&rhizome_db)){
RETURN(WHYF("SQLite could not open database %s: %s", dbpath, sqlite3_errmsg(rhizome_db)));
}
int loglevel = (debug & DEBUG_RHIZOME) ? LOG_LEVEL_DEBUG : LOG_LEVEL_SILENT;
/* Read Rhizome configuration */
@ -156,7 +161,7 @@ int rhizome_opendb()
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS GROUPMEMBERSHIPS(manifestid text not null, groupid text not null);") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS VERIFICATIONS(sid text not null, did text, name text, starttime integer, endtime integer, signature blob);") == -1
) {
return WHY("Failed to create schema");
RETURN(WHY("Failed to create schema"));
}
// No easy way to tell if these columns already exist, should probably create some kind of schema
// version table. Running these a second time will fail.
@ -168,7 +173,7 @@ int rhizome_opendb()
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM MANIFESTS WHERE filehash IS NULL;");
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM FILES WHERE NOT EXISTS( SELECT 1 FROM MANIFESTS WHERE MANIFESTS.filehash = FILES.id);");
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM MANIFESTS WHERE NOT EXISTS( SELECT 1 FROM FILES WHERE MANIFESTS.filehash = FILES.id);");
return 0;
RETURN(0);
}
/*

View File

@ -31,20 +31,21 @@ typedef struct rhizome_file_fetch_record {
char fileid[RHIZOME_FILEHASH_STRLEN + 1];
FILE *file;
int close;
char request[1024];
int request_len;
int request_ofs;
int file_len;
int file_ofs;
long long file_len;
long long file_ofs;
int state;
int last_action;
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 1
#define RHIZOME_FETCH_RXHTTPHEADERS 2
#define RHIZOME_FETCH_RXFILE 3
#define RHIZOME_FETCH_CONNECTING 1
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 2
#define RHIZOME_FETCH_RXHTTPHEADERS 3
#define RHIZOME_FETCH_RXFILE 4
} rhizome_file_fetch_record;
/* List of queued transfers */
@ -633,15 +634,16 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
{
/* Transfer via HTTP over IPv4 */
int sock = socket(AF_INET,SOCK_STREAM,0);
fcntl(sock,F_SETFL, O_NONBLOCK);
SET_NONBLOCKING(sock);
struct sockaddr_in peeraddr;
bcopy(peerip,&peeraddr,sizeof(peeraddr));
peeraddr.sin_port=htons(RHIZOME_HTTP_PORT);
DEBUG("Initiating HTTP connection for transfer");
int r=connect(sock,(struct sockaddr*)&peeraddr,sizeof(peeraddr));
if ((errno!=EINPROGRESS)&&(r!=0)) {
WHY("Failed to open socket to peer's rhizome web server");
WHY_perror("connect");
close (sock);
if (debug&DEBUG_RHIZOME) DEBUG("Failed to open socket to peer's rhizome web server");
return -1;
}
@ -655,10 +657,9 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
q->fileid);
q->request_len=strlen(q->request);
q->request_ofs=0;
q->state=RHIZOME_FETCH_SENDINGHTTPREQUEST;
q->state=RHIZOME_FETCH_CONNECTING;
q->file_len=-1;
q->file_ofs=0;
q->close=0;
q->last_action=time(0);
/* XXX Don't forget to implement resume */
#define RHIZOME_IDLE_TIMEOUT 10
@ -685,7 +686,9 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
return -1;
}
/* Watch for activity on the socket */
fd_watch(q->socket,rhizome_fetch_poll,POLL_OUT);
fd_watch(q->socket,rhizome_fetch_poll,POLLIN|POLLOUT);
fd_setalarm(rhizome_check_connections,50,500);
rhizome_file_fetch_queue_count++;
if (1||debug&DEBUG_RHIZOME) DEBUGF("Queued file for fetching into %s (%d in queue)",
q->manifest->dataFileName, rhizome_file_fetch_queue_count);
@ -715,256 +718,268 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
return 0;
}
int rhizome_fetch_close(int i){
/* Free ephemeral data */
if (file_fetch_queue[i].file) fclose(file_fetch_queue[i].file);
file_fetch_queue[i].file=NULL;
if (file_fetch_queue[i].manifest)
rhizome_manifest_free(file_fetch_queue[i].manifest);
file_fetch_queue[i].manifest=NULL;
/* close socket and stop watching it */
fd_teardown(file_fetch_queue[i].socket);
/* reshuffle higher numbered slot down if required */
if (i<(rhizome_file_fetch_queue_count-1))
bcopy(&file_fetch_queue[rhizome_file_fetch_queue_count-1],
&file_fetch_queue[i],sizeof(rhizome_file_fetch_record));
/* Reduce count of open connections */
rhizome_file_fetch_queue_count--;
if (debug&DEBUG_RHIZOME)
DEBUGF("Released rhizome fetch slot (%d used)",
rhizome_file_fetch_queue_count);
}
void rhizome_fetch_write(rhizome_file_fetch_record *q, int i){
int bytes;
bytes=write(q->socket,&q->request[q->request_ofs],
q->request_len-q->request_ofs);
if (bytes>0) {
q->last_action=time(0);
q->request_ofs+=bytes;
DEBUGF("Sent HTTP request (%d of %d): %s",
q->request_ofs,q->request_len,q->request);
if (q->request_ofs>=q->request_len) {
/* Sent all of request. Switch to listening for HTTP response headers.
*/
q->request_len=0; q->request_ofs=0;
q->state=RHIZOME_FETCH_RXHTTPHEADERS;
fd_watch(q->socket,rhizome_fetch_poll,POLLIN);
}else if(q->state==RHIZOME_FETCH_CONNECTING)
q->state = RHIZOME_FETCH_SENDINGHTTPREQUEST;
} else if (errno!=EAGAIN) {
WHY("Got error while sending HTTP request. Closing.");
rhizome_fetch_close(i);
}
}
void rhizome_fetch_handle(rhizome_file_fetch_record *q, int i)
{
switch(q->state)
{
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(q, i);
break;
case RHIZOME_FETCH_RXFILE:
/* Keep reading until we have the promised amount of data */
sigPipeFlag=0;
errno=0;
char buffer[8192];
int bytes=read(q->socket,buffer,8192);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
q->last_action=time(0);
if (bytes>(q->file_len-q->file_ofs))
bytes=q->file_len-q->file_ofs;
if (fwrite(buffer,bytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME) DEBUGF("Failed writing %d bytes to file. @ offset %d",bytes,q->file_ofs);
rhizome_fetch_close(i);
return;
}
q->file_ofs+=bytes;
DEBUGF("Transferred (%lld of %lld)",
q->file_ofs,q->file_len);
} else if (bytes==0) {
WHY("Got zero bytes, assume socket dead.");
rhizome_fetch_close(i);
return;
}
if (q->file_ofs>=q->file_len)
{
/* got all of file */
if (debug&DEBUG_RHIZOME) DEBUGF("Received all of file via rhizome -- now to import it");
{
fclose(q->file); q->file=NULL;
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL)
{ WHY("Manifest missing ID"); return; }
if (create_rhizome_import_dir() == -1)
return;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename,"manifest.%s", id))
return;
/* Do really write the manifest unchanged */
if (debug&DEBUG_RHIZOME) {
DEBUGF("manifest has %d signatories",q->manifest->sig_count);
DEBUGF("manifest id = %s, len=%d",
rhizome_manifest_get(q->manifest,"id",NULL,0),
q->manifest->manifest_bytes);
dump("manifest",&q->manifest->manifestdata[0],
q->manifest->manifest_all_bytes);
}
q->manifest->finalised=1;
q->manifest->manifest_bytes=q->manifest->manifest_all_bytes;
if (rhizome_write_manifest_file(q->manifest,filename) != -1) {
rhizome_bundle_import(q->manifest, NULL, id,
q->manifest->ttl - 1 /* TTL */);
}
rhizome_manifest_free(q->manifest);
q->manifest=NULL;
}
rhizome_fetch_close(i);
return;
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS:
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag=0;
errno=0;
bytes=read(q->socket,&q->request[q->request_len],
1024-q->request_len-1);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
int lfcount=0;
int i=q->request_len-160;
q->last_action=time(0);
if (i<0) i=0;
q->request_len+=bytes;
if (q->request_len<1024)
q->request[q->request_len]=0;
for(;i<(q->request_len+bytes);i++)
{
switch(q->request[i]) {
case '\n': lfcount++; break;
case '\r': /* ignore CR */ break;
case 0: /* ignore NUL (telnet inserts them) */ break;
default: lfcount=0; break;
}
if (lfcount==2) break;
}
DEBUGF("Received headers (%d bytes)", lfcount==2?i:q->request_len);
if (debug&DEBUG_RHIZOME)
dump("http reply headers",(unsigned char *)q->request,lfcount==2?i:q->request_len);
if (lfcount==2) {
/* We have the response headers, so parse.
(we may also have some bytes of content, so we need to be a little
careful) */
/* Terminate string at end of headers */
q->request[i]=0;
/* Get HTTP result code */
char *s=strstr(q->request,"HTTP/1.0 ");
if (!s) {
if (debug&DEBUG_RHIZOME) DEBUGF("HTTP response lacked HTTP/1.0 response code.");
rhizome_fetch_close(i);
return;
}
int http_response_code=strtoll(&s[9],NULL,10);
if (http_response_code!=200) {
if (debug&DEBUG_RHIZOME) DEBUGF("Rhizome web server returned %d != 200 OK",http_response_code);
rhizome_fetch_close(i);
return;
}
/* Get content length */
s=strstr(q->request,"Content-length: ");
if (!s) {
if (debug&DEBUG_RHIZOME)
DEBUGF("Missing Content-Length: header.");
rhizome_fetch_close(i);
return;
}
q->file_len=strtoll(&s[16],NULL,10);
if (q->file_len<0) {
if (debug&DEBUG_RHIZOME)
DEBUGF("Illegal file size (%d).",q->file_len);
rhizome_fetch_close(i);
return;
}
/* Okay, we have both, and are all set.
File is already open, so just write out any initial bytes of the
file we read, and update state flag.
*/
int fileRxBytes=q->request_len-(i+1);
if (fileRxBytes>0)
if (fwrite(&q->request[i+1],fileRxBytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME)
DEBUGF("Failed writing initial %d bytes to file.",
fileRxBytes);
rhizome_fetch_close(i);
return;
}
q->file_ofs=fileRxBytes;
DEBUGF("Transferred (%lld of %lld)",
q->file_ofs,q->file_len);
q->state=RHIZOME_FETCH_RXFILE;
}
}
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to sigpipe");
rhizome_fetch_close(i);
return;
}
break;
default:
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
rhizome_fetch_close(i);
return;
}
return;
}
void rhizome_fetch_poll(int fd)
{
int rn;
for(rn=0;rn<rhizome_file_fetch_queue_count;rn++)
{
int action=0;
int bytes;
rhizome_file_fetch_record *q=&file_fetch_queue[rn];
if (q->socket!=fd) continue;
/* Make socket non-blocking */
fcntl(q->socket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)|O_NONBLOCK);
switch(q->state)
{
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
DEBUGF("sending http request (%d of %d bytes sent): %s",
q->request_ofs,q->request_len,q->request);
bytes=write(q->socket,&q->request[q->request_ofs],
q->request_len-q->request_ofs);
if (bytes>0) {
action=1;
q->request_ofs+=bytes;
if (q->request_ofs>=q->request_len) {
/* Sent all of request. Switch to listening for HTTP response headers.
*/
if (debug&DEBUG_RHIZOME) {
DEBUGF("Sent http request to fetch file. (%d of %d bytes)",q->request_ofs,q->request_len);
DEBUGF("sent [%s]",q->request);
}
q->request_len=0; q->request_ofs=0;
q->state=RHIZOME_FETCH_RXHTTPHEADERS;
fd_watch(q->socket,rhizome_fetch_poll,POLL_IN);
}
} else if (errno!=EAGAIN) {
WHY("Got error while sending HTTP request. Closing.");
q->close=1;
}
break;
case RHIZOME_FETCH_RXFILE:
/* Keep reading until we have the promised amount of data */
if (debug&DEBUG_RHIZOME)
DEBUGF("receiving rhizome fetch file body (current offset=%d of %d)",
q->file_ofs,q->file_len);
sigPipeFlag=0;
errno=0;
char buffer[8192];
int bytes=read(q->socket,buffer,8192);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
action=1;
if (debug&DEBUG_RHIZOME)
DEBUGF("Read %d bytes; we now have %d of %d bytes.",
bytes,q->file_ofs+bytes,q->file_len);
if (bytes>(q->file_len-q->file_ofs))
bytes=q->file_len-q->file_ofs;
if (fwrite(buffer,bytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME) DEBUGF("Failed writing %d bytes to file. @ offset %d",bytes,q->file_ofs);
q->close=1;
continue;
}
q->file_ofs+=bytes;
} else if (bytes==0) {
WHY("Got zero bytes, assume socket dead.");
q->close=1;
continue;
}
if (q->file_ofs>=q->file_len)
{
/* got all of file */
q->close=1;
if (debug&DEBUG_RHIZOME) DEBUGF("Received all of file via rhizome -- now to import it");
{
fclose(q->file); q->file=NULL;
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL)
{ WHY("Manifest missing ID"); return; }
if (create_rhizome_import_dir() == -1)
return;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename,"manifest.%s", id))
return;
/* Do really write the manifest unchanged */
if (debug&DEBUG_RHIZOME) {
DEBUGF("manifest has %d signatories",q->manifest->sig_count);
DEBUGF("manifest id = %s, len=%d",
rhizome_manifest_get(q->manifest,"id",NULL,0),
q->manifest->manifest_bytes);
dump("manifest",&q->manifest->manifestdata[0],
q->manifest->manifest_all_bytes);
}
q->manifest->finalised=1;
q->manifest->manifest_bytes=q->manifest->manifest_all_bytes;
if (rhizome_write_manifest_file(q->manifest,filename) != -1) {
rhizome_bundle_import(q->manifest, NULL, id,
q->manifest->ttl - 1 /* TTL */);
}
rhizome_manifest_free(q->manifest);
q->manifest=NULL;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS:
/* Keep reading until we have two CR/LFs in a row */
if (debug&DEBUG_RHIZOME) DEBUG("receiving rhizome fetch http headers");
sigPipeFlag=0;
errno=0;
bytes=read(q->socket,&q->request[q->request_len],
1024-q->request_len-1);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
action=1;
int lfcount=0;
int i=q->request_len-160;
if (i<0) i=0;
q->request_len+=bytes;
if (q->request_len<1024)
q->request[q->request_len]=0;
if (debug&DEBUG_RHIZOME)
dump("http reply headers",(unsigned char *)q->request,q->request_len);
for(;i<(q->request_len+bytes);i++)
{
switch(q->request[i]) {
case '\n': lfcount++; break;
case '\r': /* ignore CR */ break;
case 0: /* ignore NUL (telnet inserts them) */ break;
default: lfcount=0; break;
}
if (lfcount==2) break;
}
if (lfcount==2) {
/* We have the response headers, so parse.
(we may also have some extra bytes, so we need to be a little
careful) */
/* Terminate string at end of headers */
q->request[i]=0;
/* Get HTTP result code */
char *s=strstr(q->request,"HTTP/1.0 ");
if (!s) {
if (debug&DEBUG_RHIZOME) DEBUGF("HTTP response lacked HTTP/1.0 response code.");
q->close=1; continue; }
int http_response_code=strtoll(&s[9],NULL,10);
if (http_response_code!=200) {
if (debug&DEBUG_RHIZOME) DEBUGF("Rhizome web server returned %d != 200 OK",http_response_code);
q->close=1; continue;
}
/* Get content length */
s=strstr(q->request,"Content-length: ");
if (!s) {
if (debug&DEBUG_RHIZOME)
DEBUGF("Missing Content-Length: header.");
q->close=1; continue; }
q->file_len=strtoll(&s[16],NULL,10);
if (q->file_len<0) {
if (debug&DEBUG_RHIZOME)
DEBUGF("Illegal file size (%d).",q->file_len);
q->close=1; continue; }
/* Okay, we have both, and are all set.
File is already open, so just write out any initial bytes of the
file we read, and update state flag.
*/
int fileRxBytes=q->request_len-(i+1);
if (fileRxBytes>0)
if (fwrite(&q->request[i+1],fileRxBytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME)
DEBUGF("Failed writing initial %d bytes to file.",
fileRxBytes);
q->close=1;
continue;
}
q->file_ofs=fileRxBytes;
if (debug&DEBUG_RHIZOME)
DEBUGF("Read %d initial bytes of %d total",
q->file_ofs,q->file_len);
q->state=RHIZOME_FETCH_RXFILE;
}
q->request_len+=bytes;
}
/* Give up fairly quickly if there is no action, because the peer may
have moved out of range. */
if (!action) {
if (time(0)-q->last_action>RHIZOME_IDLE_TIMEOUT) {
if (debug&DEBUG_RHIZOME)
DEBUG("Closing connection due to inactivity timeout.");
q->close=1;
continue;
}
} else q->last_action=time(0);
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to sigpipe");
q->close=1;
continue;
}
break;
default:
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
q->close=1;
break;
}
/* Make socket blocking again for poll()/select() */
fcntl(q->socket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)&(~O_NONBLOCK));
{
int bytes;
rhizome_file_fetch_record *q=&file_fetch_queue[rn];
if (q->socket==fd){
rhizome_fetch_handle(q, rn);
return;
}
}
}
int rhizome_check_connections(){
int i;
for(i=rhizome_file_fetch_queue_count-1;i>=0;i--)
{
if (file_fetch_queue[i].close) {
/* Free ephemeral data */
if (file_fetch_queue[i].file) fclose(file_fetch_queue[i].file);
file_fetch_queue[i].file=NULL;
if (file_fetch_queue[i].manifest)
rhizome_manifest_free(file_fetch_queue[i].manifest);
file_fetch_queue[i].manifest=NULL;
/* close socket and stop watching it */
fd_teardown(file_fetch_queue[i].socket);
/* reshuffle higher numbered slot down if required */
if (i<(rhizome_file_fetch_queue_count-1))
bcopy(&file_fetch_queue[rhizome_file_fetch_queue_count-1],
&file_fetch_queue[i],sizeof(rhizome_file_fetch_record));
/* Reduce count of open connections */
rhizome_file_fetch_queue_count--;
if (debug&DEBUG_RHIZOME)
DEBUGF("Released rhizome fetch slot (%d remaining)",
rhizome_file_fetch_queue_count);
}
{
if (time(0) - file_fetch_queue[i].last_action > RHIZOME_IDLE_TIMEOUT) {
if (debug&DEBUG_RHIZOME)
DEBUG("Closing connection due to inactivity timeout.");
rhizome_fetch_close(i);
continue;
}
return;
}
if (rhizome_file_fetch_queue_count==0)
fd_setalarm(rhizome_check_connections,0,0);
}

View File

@ -112,7 +112,7 @@ int rhizome_server_start()
}
/* Add Rhizome HTTPd server to list of file descriptors to watch */
fd_watch(rhizome_server_socket,rhizome_server_poll,POLL_IN);
fd_watch(rhizome_server_socket,rhizome_server_poll,POLLIN);
return 0;
}
@ -130,13 +130,9 @@ void rhizome_client_poll(int fd)
case RHIZOME_HTTP_REQUEST_RECEIVING:
/* Keep reading until we have two CR/LFs in a row */
r->request[r->request_length]=0;
WHYF("http request so far: [%s]",r->request);
sigPipeFlag=0;
/* Make socket non-blocking */
fcntl(r->socket,F_SETFL,fcntl(r->socket, F_GETFL, NULL)|O_NONBLOCK);
errno=0;
int bytes=read(r->socket,&r->request[r->request_length],
RHIZOME_HTTP_REQUEST_MAXLEN-r->request_length-1);
@ -149,7 +145,8 @@ void rhizome_client_poll(int fd)
r->request_length+=bytes;
if (r->request_length<RHIZOME_HTTP_REQUEST_MAXLEN)
r->request[r->request_length]=0;
dump("request",(unsigned char *)r->request,r->request_length);
if (0)
dump("request",(unsigned char *)r->request,r->request_length);
for(;i<(r->request_length+bytes);i++)
{
switch(r->request[i]) {
@ -168,9 +165,6 @@ void rhizome_client_poll(int fd)
r->request_length+=bytes;
}
/* Make socket blocking again for poll()/select() */
fcntl(r->socket,F_SETFL,fcntl(r->socket, F_GETFL, NULL)&(~O_NONBLOCK));
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
WHY("Closing connection due to sigpipe");
@ -200,30 +194,27 @@ void rhizome_server_poll(int ignored_file_descriptor)
int sock;
/* Deal with any new requests */
/* Make socket non-blocking */
fcntl(rhizome_server_socket,F_SETFL,
fcntl(rhizome_server_socket, F_GETFL, NULL)|O_NONBLOCK);
while ((rhizome_server_live_request_count<RHIZOME_SERVER_MAX_LIVE_REQUESTS)
&&((sock=accept(rhizome_server_socket,&addr,&addr_len))>-1))
{
DEBUG("Acceptted HTTP connection");
rhizome_http_request *request = calloc(sizeof(rhizome_http_request),1);
request->socket=sock;
/* We are now trying to read the HTTP request */
request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING;
rhizome_live_http_requests[rhizome_server_live_request_count++]=request;
/* Watch for input */
fd_watch(request->socket,rhizome_client_poll,POLL_IN);
fd_watch(request->socket,rhizome_client_poll,POLLIN);
}
fcntl(rhizome_server_socket,F_SETFL,
fcntl(rhizome_server_socket, F_GETFL, NULL)&(~O_NONBLOCK));
}
int rhizome_server_close_http_request(int i)
{
fd_teardown(rhizome_live_http_requests[i]->socket);
DEBUG("HTTP connection closed");
rhizome_server_free_http_request(rhizome_live_http_requests[i]);
/* Make it null, so that if we are the list in the list, the following
assignment still yields the correct behaviour */
@ -237,9 +228,7 @@ int rhizome_server_close_http_request(int i)
int rhizome_server_free_http_request(rhizome_http_request *r)
{
if (r->buffer&&r->buffer_size) free(r->buffer);
if (r->blob_table) free(r->blob_table);
if (r->blob_column) free(r->blob_column);
if (r->blob) sqlite3_blob_close(r->blob);
free(r);
return 0;
}
@ -307,16 +296,14 @@ int rhizome_server_sql_query_http_response(int rn,rhizome_http_request *r,
WHYF("SQL query overrun: %s", strbuf_str(b));
r->source_index=0;
r->source_flags=dehexP;
r->blob_column=strdup(column);
r->blob_table=strdup(table);
DEBUGF("buffer_length=%d",r->buffer_length);
/* Populate spare space in buffer with rows of data */
return rhizome_server_sql_query_fill_buffer(rn,r);
return rhizome_server_sql_query_fill_buffer(rn, r, table, column);
}
int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r)
int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r, char *table, char *column)
{
unsigned char blob_value[r->source_record_size*2+1];
@ -367,9 +354,9 @@ int rhizome_server_sql_query_fill_buffer(int rn,rhizome_http_request *r)
case SQLITE_TEXT: value=sqlite3_column_text(statement, 0); break;
case SQLITE_BLOB:
WHYF("table='%s',col='%s',rowid=%lld",
r->blob_table,r->blob_column,
table, column,
sqlite3_column_int64(statement,1));
if (sqlite3_blob_open(rhizome_db,"main",r->blob_table,r->blob_column,
if (sqlite3_blob_open(rhizome_db,"main",table,column,
sqlite3_column_int64(statement,1) /* rowid */,
0 /* read only */,&blob)!=SQLITE_OK)
{
@ -421,7 +408,7 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
char id[1024];
/* Switching to writing, so update the call-back */
fd_watch(r->socket,rhizome_client_poll,POLL_OUT);
fd_watch(r->socket,rhizome_client_poll,POLLOUT);
/* Clear request type flags */
r->request_type=0;
@ -463,31 +450,28 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
int i;
hexFilter(id);
WHYF("get /rhizome/file/ [%s]",id);
WHY("Check for range: header, and return 206 if returning partial content");
// Check for range: header, and return 206 if returning partial content
for(i=0;i<strlen(id);i++) if (!isxdigit(id[i])) dud++;
if (dud) rhizome_server_simple_http_response(r,400,"<html><h1>That doesn't look like hex to me.</h1></html>\r\n");
else {
str_toupper_inplace(id);
long long rowid = -1;
sqlite_exec_int64(&rowid, "select rowid from files where id='%s';", id);
sqlite3_blob *blob;
if (rowid>=0)
if (sqlite3_blob_open(rhizome_db,"main","files","data",rowid,0,&blob) !=SQLITE_OK)
if (sqlite3_blob_open(rhizome_db,"main","files","data",rowid,0,&r->blob) !=SQLITE_OK)
rowid=-1;
if (rowid<0) {
rhizome_server_simple_http_response(r,404,"<html><h1>Sorry, can't find that here.</h1></html>\r\n");
WHY("File not found / blob not opened");
}
else {
r->blob_table=strdup("files");
r->blob_column=strdup("data");
r->blob_rowid=rowid;
r->source_index=0;
r->blob_end=sqlite3_blob_bytes(blob);
r->source_index=0;
r->blob_end=sqlite3_blob_bytes(r->blob);
rhizome_server_http_response_header(r,200,"application/binary",
r->blob_end-r->source_index);
r->blob_end - r->source_index);
r->request_type|=RHIZOME_HTTP_REQUEST_BLOB;
sqlite3_blob_close(blob);
WHY("opened blob and file -- but still need to send file body.");
}
}
@ -533,7 +517,7 @@ int rhizome_server_simple_http_response(rhizome_http_request *r,int result, char
r->buffer_size=strlen(response)+strlen("HTTP/1.0 000 \r\n\r\n")+strlen(httpResultString(A_VALUE_GREATER_THAN_FOUR))+100;
r->buffer=(unsigned char *)malloc(r->buffer_size);
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d %s\r\nContent-type: text/html\r\nContent-length: %d\r\n\r\n%s",result,httpResultString(result),(int)strlen(response),response);
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d %s\r\nContent-type: text/html\r\nContent-length: %lld\r\n\r\n%s",result,httpResultString(result),(int)strlen(response),response);
r->buffer_size=strlen((char *)r->buffer)+1;
r->buffer_length=r->buffer_size-1;
@ -551,120 +535,131 @@ int rhizome_server_simple_http_response(rhizome_http_request *r,int result, char
*/
int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r)
{
sqlite3_blob *blob;
int bytes;
fcntl(r->socket,F_SETFL,fcntl(r->socket, F_GETFL, NULL)|O_NONBLOCK);
if (debug&DEBUG_RHIZOME) WHYF("Request #%d, type=0x%x\n",rn,r->request_type);
/* Flush anything out of the buffer if present, before doing any further
processing */
if (r->request_type&RHIZOME_HTTP_REQUEST_FROMBUFFER)
{
bytes=r->buffer_length-r->buffer_offset;
bytes=write(r->socket,&r->buffer[r->buffer_offset],bytes);
if (bytes>0) {
WHYF("wrote %d bytes\n",bytes);
dump("bytes written",&r->buffer[r->buffer_offset],bytes);
r->buffer_offset+=bytes;
if (r->buffer_offset>=r->buffer_length) {
/* Our work is done. close socket and go home */
r->request_type&=~RHIZOME_HTTP_REQUEST_FROMBUFFER;
r->buffer_offset=0; r->buffer_length=0;
if (!r->request_type) {
WHY("Finished sending data");
return rhizome_server_close_http_request(rn);
} else {
if (debug&DEBUG_RHIZOME) { WHYF("request type = 0x%x after sending buffer.",
r->request_type);
}
}
} else {
/* Still more stuff in the buffer, so return now */
// keep writing until the write would block or we run out of data
while(r->request_type){
/* Flush anything out of the buffer if present, before doing any further
processing */
if (r->request_type&RHIZOME_HTTP_REQUEST_FROMBUFFER)
{
int bytes=r->buffer_length-r->buffer_offset;
bytes=write(r->socket,&r->buffer[r->buffer_offset],bytes);
if (bytes<=0){
// stop writing when the tcp buffer is full
// TODO errors?
return 1;
}
DEBUGF("wrote %d bytes\n",bytes);
if (0)
dump("bytes written",&r->buffer[r->buffer_offset],bytes);
r->buffer_offset+=bytes;
if (r->buffer_offset>=r->buffer_length) {
/* Buffer's cleared */
r->request_type&=~RHIZOME_HTTP_REQUEST_FROMBUFFER;
r->buffer_offset=0; r->buffer_length=0;
}
// go around the loop again to work out what we should do next
continue;
}
}
switch(r->request_type&(~RHIZOME_HTTP_REQUEST_FROMBUFFER))
{
case RHIZOME_HTTP_REQUEST_FAVICON:
if (r->buffer_size<favicon_len) {
free(r->buffer);
r->buffer_size=0;
r->buffer=malloc(favicon_len);
if (!r->buffer) r->request_type=0;
}
if (r->buffer)
switch(r->request_type&(~RHIZOME_HTTP_REQUEST_FROMBUFFER))
{
int i;
for(i=0;i<favicon_len;i++)
r->buffer[i]=favicon_bytes[i];
r->buffer_length=i;
printf("buffer_length for favicon is %d\n",r->buffer_length);
r->request_type=RHIZOME_HTTP_REQUEST_FROMBUFFER;
}
break;
case RHIZOME_HTTP_REQUEST_BLOB:
/* Get more data from the file and put it in the buffer */
r->buffer_length=r->blob_end-r->source_index;
if (r->buffer_length<=0) {
/* end of blob reached */
r->request_type=0; break;
}
if (r->buffer_size<65536) {
free(r->buffer); r->buffer=malloc(65536);
if (!r->buffer) {
if (debug&DEBUG_RHIZOME) WHY("malloc() failed");
r->request_type=0; break;
case RHIZOME_HTTP_REQUEST_FAVICON:
if (r->buffer_size<favicon_len) {
free(r->buffer);
r->buffer_size=0;
r->buffer=malloc(favicon_len);
if (!r->buffer) r->request_type=0;
}
r->buffer_size=65536;
}
if (r->buffer_length>r->buffer_size) r->buffer_length=r->buffer_size;
if (sqlite3_blob_open(rhizome_db,"main",r->blob_table,r->blob_column,
r->blob_rowid,0,&blob)==SQLITE_OK)
if (r->buffer)
{
if(sqlite3_blob_read(blob,&r->buffer[0],r->buffer_length,r->source_index)
==SQLITE_OK)
{
r->request_type|=RHIZOME_HTTP_REQUEST_FROMBUFFER;
r->source_index+=r->buffer_length;
}
else
r->request_type=0;
sqlite3_blob_close(blob);
int i;
for(i=0;i<favicon_len;i++)
r->buffer[i]=favicon_bytes[i];
r->buffer_length=i;
printf("buffer_length for favicon is %d\n",r->buffer_length);
r->request_type=RHIZOME_HTTP_REQUEST_FROMBUFFER;
}
else
break;
case RHIZOME_HTTP_REQUEST_BLOB:
{
if (debug&DEBUG_RHIZOME) WHY("could not open blob to send more data");
/* Get more data from the file and put it in the buffer */
int read_size = 65536;
if (r->blob_end-r->source_index < read_size)
read_size = r->blob_end-r->source_index;
r->request_type=0;
if (read_size>0){
if (r->buffer_size < read_size) {
if (r->buffer)
free(r->buffer);
r->buffer=malloc(read_size);
if (!r->buffer) {
if (debug&DEBUG_RHIZOME) WHY("malloc() failed");
r->request_type=0; break;
}
r->buffer_size=read_size;
}
if(sqlite3_blob_read(r->blob,&r->buffer[0],read_size,r->source_index)
==SQLITE_OK)
{
r->buffer_length = read_size;
r->source_index+=read_size;
r->request_type|=RHIZOME_HTTP_REQUEST_FROMBUFFER;
DEBUGF("Read bytes from DB blob (%lld, %lld)", r->source_index, r->blob_end);
}
}
if (r->source_index >= r->blob_end){
sqlite3_blob_close(r->blob);
r->blob=0;
DEBUG("Closed blob handle");
}else
r->request_type|=RHIZOME_HTTP_REQUEST_BLOB;
}
break;
case RHIZOME_HTTP_REQUEST_FROMBUFFER:
/* This really shouldn't happen! */
return WHY("Something impossible happened.");
break;
default:
WHY("sending data from this type of HTTP request not implemented");
break;
}
break;
default:
WHY("sending data from this type of HTTP request not implemented");
r->request_type=0;
break;
}
}
if (!r->request_type) return rhizome_server_close_http_request(rn);
fcntl(r->socket,F_SETFL,fcntl(r->socket, F_GETFL, NULL)&(~O_NONBLOCK));
return 1;
}
int rhizome_server_http_response_header(rhizome_http_request *r,int result,
char *mime_type,unsigned long long bytes)
{
if (!r->buffer) {
r->buffer_size=bytes+strlen("HTTP/1.0 000 \r\n\r\n")+strlen(httpResultString(A_VALUE_GREATER_THAN_FOUR))+100;
r->buffer=(unsigned char *)malloc(r->buffer_size);
int min_buff = strlen("HTTP/1.0 000 \r\nContent-type: \r\nContent-length: \r\n\r\n")
+strlen(httpResultString(result))
+strlen(mime_type)+20;
if (min_buff+bytes > 65536){
min_buff = 65536;
}else{
min_buff += bytes;
}
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d \r\nContent-type: %s\r\nContent-length: %lld\r\n\r\n",result,mime_type,bytes);
if (r->buffer_size < min_buff) {
if (r->buffer)
free(r->buffer);
r->buffer=(unsigned char *)malloc(min_buff);
r->buffer_size=min_buff;
}
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d %s\r\nContent-type: %s\r\nContent-length: %lld\r\n\r\n",result,httpResultString(result),mime_type,bytes);
r->buffer_length=strlen((char *)r->buffer);
r->buffer_offset=0;

View File

@ -1092,6 +1092,7 @@ int overlay_broadcast_generate_address(unsigned char *a);
int overlay_abbreviate_unset_current_sender();
int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
void rhizome_fetch_poll(int fd);
int rhizome_check_connections();
int rhizome_opendb();
typedef struct dna_identity_status {
@ -1563,3 +1564,7 @@ int fd_next_funcid(const char *funcname);
#define IN() static int _func_id=-1; if (_func_id<0) _func_id=fd_next_funcid(__FUNCTION__); fd_func_enter(_func_id);
#define OUT() fd_func_exit(_func_id);
#define RETURN(X) { OUT() return(X); }
#define SET_NONBLOCKING(X) fcntl(X,F_SETFL,fcntl(X, F_GETFL, NULL)|O_NONBLOCK);
#define SET_BLOCKING(X) fcntl(X,F_SETFL,fcntl(X, F_GETFL, NULL)&(~O_NONBLOCK));

View File

@ -71,8 +71,6 @@ int recvwithttl(int sock,unsigned char *buffer,int bufferlen,int *ttl,
msg.msg_controllen = sizeof(struct cmsghdr)*16;
msg.msg_flags = 0;
fcntl(sock,F_SETFL, O_NONBLOCK);
int len = recvmsg(sock,&msg,0);
if (0&&debug&DEBUG_PACKETRX) {