From f862ba0af60fefb113dcacb6409049dd98536fed Mon Sep 17 00:00:00 2001 From: gardners Date: Fri, 13 Jan 2012 17:21:06 +1030 Subject: [PATCH] Rhizome file transfer via http getting close (send side needs to send file body) --- overlay.c | 13 +- overlay_interface.c | 3 - rhizome.h | 2 + rhizome_fetch.c | 287 +++++++++++++++++++++++++++++++++++++++++++- rhizome_http.c | 9 +- 5 files changed, 300 insertions(+), 14 deletions(-) diff --git a/overlay.c b/overlay.c index 306919c4..ba9bb624 100644 --- a/overlay.c +++ b/overlay.c @@ -129,7 +129,8 @@ int overlayServerMode() fds[0].fd=sock; fds[0].events=POLLIN; fdcount=1; rhizome_server_get_fds(fds,&fdcount,128); - + rhizome_fetching_get_fds(fds,&fdcount,128); + for(i=0;i0&&(filehash!=NULL)) { if (strlen(filehash)!=SHA512_DIGEST_STRING_LENGTH-1) @@ -136,12 +157,36 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, } return -1; } - file_fetch_queue[rhizome_file_fetch_queue_count].manifest=m; - strncpy(file_fetch_queue[rhizome_file_fetch_queue_count].fileid, + + rhizome_file_fetch_record + *q=&file_fetch_queue[rhizome_file_fetch_queue_count]; + q->manifest=m; + q->socket=sock; + strncpy(q->fileid, filehash,SHA512_DIGEST_STRING_LENGTH); + snprintf(q->request,1024,"GET /rhizome/file/%s HTTP/1.0\r\n\r\n", + q->fileid); + q->request_len=strlen(q->request); + q->request_ofs=0; + q->state=RHIZOME_FETCH_SENDINGHTTPREQUEST; + q->file_len=-1; + q->file_ofs=0; + q->close=0; + q->last_action=time(0); + /* XXX Don't forget to implement resume */ +#define RHIZOME_IDLE_TIMEOUT 10 + /* XXX We should stream file straight into the database */ + char filename[1024]; + snprintf(filename,1024,"%s/import/file.%s",rhizome_datastore_path, + rhizome_manifest_get(q->manifest,"id",NULL,0)); + q->file=fopen(filename,"w"); + if (!q->file) { + if (debug&DEBUG_RHIZOME) + fprintf(stderr,"Could not open '%s' to write received file.\n", + filename); + } rhizome_file_fetch_queue_count++; if (debug&DEBUG_RHIZOME) fprintf(stderr,"Queued file for fetching\n"); - WHY("Fetch preparation incomplete (socket state recording is needed)"); return 0; } else @@ -157,3 +202,237 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, return WHY("Not implemented."); } + +int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax) +{ + int i; + if ((*fdcount)>=fdmax) return -1; + + for(i=0;i=fdmax) return -1; + if (debug&DEBUG_IO) { + fprintf(stderr,"rhizome file fetch request #%d is poll() slot #%d (fd %d)\n", + i,*fdcount,file_fetch_queue[i].socket); } + fds[*fdcount].fd=file_fetch_queue[i].socket; + switch(file_fetch_queue[i].state) { + case RHIZOME_FETCH_SENDINGHTTPREQUEST: + fds[*fdcount].events=POLLOUT; break; + case RHIZOME_FETCH_RXHTTPHEADERS: + case RHIZOME_FETCH_RXFILE: + default: + fds[*fdcount].events=POLLIN; break; + } + (*fdcount)++; + } + return 0; +} + + +int rhizome_fetch_poll() +{ + int rn; + if (debug&DEBUG_RHIZOME) printf("Checking %d active fetch requests\n", + rhizome_file_fetch_queue_count); + for(rn=0;rnsocket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)|O_NONBLOCK); + + switch(q->state) + { + case RHIZOME_FETCH_SENDINGHTTPREQUEST: + bytes=write(q->socket,&q->request[q->request_ofs], + q->request_len-q->request_ofs); + if (bytes>0) { + action=1; + q->request_ofs+=bytes; + if (q->request_ofs>=q->request_len) { + /* Sent all of request. Switch to listening for HTTP response headers. + */ + if (debug&DEBUG_RHIZOME) fprintf(stderr,"Sent http request to fetch file. (%d of %d bytes)\n",q->request_ofs,q->request_len); + fprintf(stderr,"sent [%s]\n",q->request); + q->request_len=0; q->request_ofs=0; + q->state=RHIZOME_FETCH_RXHTTPHEADERS; + } + } + break; + case RHIZOME_FETCH_RXFILE: + /* Keep reading until we have two CR/LFs in a row */ + if (debug&DEBUG_RHIZOME) + fprintf(stderr,"receiving rhizome fetch file body (current offset=%d)\n", + q->file_ofs); + + sigPipeFlag=0; + + errno=0; + char buffer[8192]; + + int bytes=read(q->socket,buffer,8192); + + /* If we got some data, see if we have found the end of the HTTP request */ + if (bytes>0) { + action=1; + + if (debug&DEBUG_RHIZOME) + fprintf(stderr,"Read %d bytes; we now have %d of %d bytes.\n", + bytes,q->file_ofs+bytes,q->file_len); + + if (bytes>(q->file_len-q->file_ofs)) + bytes=q->file_len-q->file_ofs; + if (fwrite(buffer,bytes,1,q->file)!=1) + { + if (debug&DEBUG_RHIZOME) fprintf(stderr,"Failed writing %d bytes to file. @ offset %d\n",bytes,q->file_ofs); + q->close=1; + continue; + } + q->file_ofs+=bytes; + if (q->file_ofs>=q->file_len) + { + /* got all of file */ + q->close=1; + if (debug&DEBUG_RHIZOME) fprintf(stderr,"Received all of file via rhizome -- now to import it\n"); + } + + } + break; + case RHIZOME_FETCH_RXHTTPHEADERS: + /* Keep reading until we have two CR/LFs in a row */ + if (debug&DEBUG_RHIZOME) WHY("receiving rhizome fetch http headers"); + + sigPipeFlag=0; + + errno=0; + bytes=read(q->socket,&q->request[q->request_len], + 1024-q->request_len-1); + + /* If we got some data, see if we have found the end of the HTTP request */ + if (bytes>0) { + action=1; + int lfcount=0; + int i=q->request_len-160; + if (i<0) i=0; + q->request_len+=bytes; + if (q->request_len<1024) + q->request[q->request_len]=0; + dump("http reply headers",(unsigned char *)q->request,q->request_len); + for(;i<(q->request_len+bytes);i++) + { + switch(q->request[i]) { + case '\n': lfcount++; break; + case '\r': /* ignore CR */ break; + case 0: /* ignore NUL (telnet inserts them) */ break; + default: lfcount=0; break; + } + if (lfcount==2) break; + } + if (lfcount==2) { + /* We have the response headers, so parse. + (we may also have some extra bytes, so we need to be a little + careful) */ + + /* Terminate string at end of headers */ + q->request[i]=0; + + /* Get HTTP result code */ + char *s=strstr(q->request,"HTTP/1.0 "); + if (!s) { + if (debug&DEBUG_RHIZOME) fprintf(stderr,"HTTP response lacked HTTP/1.0 response code.\n"); + q->close=1; continue; } + int http_response_code=strtoll(&s[9],NULL,10); + if (http_response_code!=200) { + if (debug&DEBUG_RHIZOME) fprintf(stderr,"Rhizome web server returned %d != 200 OK\n",http_response_code); + q->close=1; continue; + } + /* Get content length */ + s=strstr(q->request,"Content-length: "); + if (!s) { + if (debug&DEBUG_RHIZOME) + fprintf(stderr,"Missing Content-Length: header.\n"); + q->close=1; continue; } + q->file_len=strtoll(&s[16],NULL,10); + if (q->file_len<0) { + if (debug&DEBUG_RHIZOME) + fprintf(stderr,"Illegal file size (%d).\n",q->file_len); + q->close=1; continue; } + + /* Okay, we have both, and are all set. + File is already open, so just write out any initial bytes of the + file we read, and update state flag. + */ + int fileRxBytes=q->request_len-(i+1); + if (fileRxBytes>0) + if (fwrite(&q->request[i+1],fileRxBytes,1,q->file)!=1) + { + if (debug&DEBUG_RHIZOME) + fprintf(stderr,"Failed writing initial %d bytes to file.\n", + fileRxBytes); + q->close=1; + continue; + } + q->file_ofs=fileRxBytes; + if (debug&DEBUG_RHIZOME) + fprintf(stderr,"Read %d initial bytes of %d total\n", + q->file_ofs,q->file_len); + q->state=RHIZOME_FETCH_RXFILE; + } + + q->request_len+=bytes; + } + + /* Give up fairly quickly if there is no action, because the peer may + have moved out of range. */ + if (!action) { + if (time(0)-q->last_action>RHIZOME_IDLE_TIMEOUT) { + q->close=1; + continue; + } + } else q->last_action=time(0); + + if (sigPipeFlag||((bytes==0)&&(errno==0))) { + /* broken pipe, so close connection */ + if (debug&DEBUG_RHIZOME) + WHY("Closing rhizome fetch connection due to sigpipe"); + q->close=1; + continue; + } + break; + default: + if (debug&DEBUG_RHIZOME) + WHY("Closing rhizome fetch connection due to illegal/unimplemented state."); + q->close=1; + break; + } + + /* Make socket blocking again for poll()/select() */ + fcntl(q->socket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)&(~O_NONBLOCK)); + } + + int i; + for(i=rhizome_file_fetch_queue_count-1;i>=0;i--) + { + if (file_fetch_queue[i].close) { + /* Free ephemeral data */ + if (file_fetch_queue[i].file) fclose(file_fetch_queue[i].file); + file_fetch_queue[i].file=NULL; + if (file_fetch_queue[i].manifest) + rhizome_manifest_free(file_fetch_queue[i].manifest); + file_fetch_queue[i].manifest=NULL; + + /* reshuffle higher numbered slot down if required */ + if (i<(rhizome_file_fetch_queue_count-1)) + bcopy(&file_fetch_queue[rhizome_file_fetch_queue_count-1], + &file_fetch_queue[i],sizeof(rhizome_file_fetch_record)); + + /* Reduce count of open connections */ + rhizome_file_fetch_queue_count--; + } + } + + + return 0; +} diff --git a/rhizome_http.c b/rhizome_http.c index 85ac46a0..797ec435 100644 --- a/rhizome_http.c +++ b/rhizome_http.c @@ -155,7 +155,8 @@ int rhizome_server_poll() { case RHIZOME_HTTP_REQUEST_RECEIVING: /* Keep reading until we have two CR/LFs in a row */ - WHY("receiving http request data"); + r->request[r->request_length]=0; + fprintf(stderr,"http request so far: [%s]\n",r->request); sigPipeFlag=0; @@ -544,7 +545,7 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r) rhizome_server_http_response_header(r,200,"application/binary", r->blob_end-r->source_index); sqlite3_blob_close(blob); - WHY("opened blob and file"); + WHY("opened blob and file -- but still need to send file body."); } } } @@ -619,7 +620,7 @@ int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r) bytes=r->buffer_length-r->buffer_offset; bytes=write(r->socket,&r->buffer[r->buffer_offset],bytes); if (bytes>0) { - printf("wrote %d bytes\n",bytes); + fprintf(stderr,"wrote %d bytes\n",bytes); dump("bytes written",&r->buffer[r->buffer_offset],bytes); r->buffer_offset+=bytes; if (r->buffer_offset>=r->buffer_length) { @@ -682,7 +683,7 @@ int rhizome_server_http_response_header(rhizome_http_request *r,int result, r->buffer_size=bytes+strlen("HTTP/1.0 000 \r\n\r\n")+strlen(httpResultString(A_VALUE_GREATER_THAN_FOUR))+100; r->buffer=(unsigned char *)malloc(r->buffer_size); } - snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d \r\nContent-type: text/html\r\nContent-length: %lld\r\n\r\n",result,bytes); + snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d \r\nContent-type: %s\r\nContent-length: %lld\r\n\r\n",result,mime_type,bytes); r->buffer_length=strlen((char *)r->buffer); r->buffer_offset=0;