Rhizome file transfer via http getting close

(send side needs to send file body)
This commit is contained in:
gardners 2012-01-13 17:21:06 +10:30
parent 7bcc852279
commit f862ba0af6
5 changed files with 300 additions and 14 deletions

@ -129,7 +129,8 @@ int overlayServerMode()
fds[0].fd=sock; fds[0].events=POLLIN;
fdcount=1;
rhizome_server_get_fds(fds,&fdcount,128);
rhizome_fetching_get_fds(fds,&fdcount,128);
for(i=0;i<overlay_interface_count;i++)
{
/* Make socket non-blocking so that poll() behaves correctly.
@ -190,13 +191,19 @@ int overlayServerMode()
}
}
overlay_rx_messages();
if (rhizome_datastore_path) rhizome_server_poll();
if (rhizome_datastore_path) {
rhizome_server_poll();
rhizome_fetch_poll();
}
} else {
/* No data before tick occurred, so do nothing.
Well, for now let's just check anyway. */
if (debug&DEBUG_IO) fprintf(stderr,"poll() timeout.\n");
overlay_rx_messages();
if (rhizome_datastore_path) rhizome_server_poll();
if (rhizome_datastore_path) {
rhizome_server_poll();
rhizome_fetch_poll();
}
}
/* Check if we need to trigger any ticks on any interfaces */
overlay_check_ticks();

@ -332,9 +332,6 @@ int overlay_rx_messages()
plen=recvfrom(overlay_interfaces[i].fd,packet,sizeof(packet),
MSG_DONTWAIT,
&src_addr,&addrlen);
fprintf(stderr,"Interface #%d (%s): plen=%d\n",
i,overlay_interfaces[i].name,plen);
perror("recvfrom");
if (plen<0) {
c[i]=0; count--;
} else {

@ -205,3 +205,5 @@ int rhizome_queue_manifest_import(rhizome_manifest *m,struct sockaddr_in *peerip
#define RHIZOME_DONTVERIFY 0
#define RHIZOME_VERIFY 1
int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax);

@ -20,11 +20,30 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "mphlr.h"
#include "rhizome.h"
extern int sigPipeFlag;
extern int sigIoFlag;
typedef struct rhizome_file_fetch_record {
int sock; /* if non-zero this is the socket to read from */
int socket; /* if non-zero this is the socket to read from */
rhizome_manifest *manifest;
char fileid[SHA512_DIGEST_STRING_LENGTH];
FILE *file;
int close;
char request[1024];
int request_len;
int request_ofs;
int file_len;
int file_ofs;
int state;
int last_action;
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 1
#define RHIZOME_FETCH_RXHTTPHEADERS 2
#define RHIZOME_FETCH_RXFILE 3
} rhizome_file_fetch_record;
/* List of queued transfers */
@ -79,6 +98,8 @@ int rhizome_queue_manifest_import(rhizome_manifest *m,
char *filehash=rhizome_manifest_get(m,"filehash",NULL,0);
long long filesize=rhizome_manifest_get_ll(m,"filesize");
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Getting ready to fetch file %s\n",filehash);
if (filesize>0&&(filehash!=NULL))
{
if (strlen(filehash)!=SHA512_DIGEST_STRING_LENGTH-1)
@ -136,12 +157,36 @@ int rhizome_queue_manifest_import(rhizome_manifest *m,
}
return -1;
}
file_fetch_queue[rhizome_file_fetch_queue_count].manifest=m;
strncpy(file_fetch_queue[rhizome_file_fetch_queue_count].fileid,
rhizome_file_fetch_record
*q=&file_fetch_queue[rhizome_file_fetch_queue_count];
q->manifest=m;
q->socket=sock;
strncpy(q->fileid,
filehash,SHA512_DIGEST_STRING_LENGTH);
snprintf(q->request,1024,"GET /rhizome/file/%s HTTP/1.0\r\n\r\n",
q->fileid);
q->request_len=strlen(q->request);
q->request_ofs=0;
q->state=RHIZOME_FETCH_SENDINGHTTPREQUEST;
q->file_len=-1;
q->file_ofs=0;
q->close=0;
q->last_action=time(0);
/* XXX Don't forget to implement resume */
#define RHIZOME_IDLE_TIMEOUT 10
/* XXX We should stream file straight into the database */
char filename[1024];
snprintf(filename,1024,"%s/import/file.%s",rhizome_datastore_path,
rhizome_manifest_get(q->manifest,"id",NULL,0));
q->file=fopen(filename,"w");
if (!q->file) {
if (debug&DEBUG_RHIZOME)
fprintf(stderr,"Could not open '%s' to write received file.\n",
filename);
}
rhizome_file_fetch_queue_count++;
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Queued file for fetching\n");
WHY("Fetch preparation incomplete (socket state recording is needed)");
return 0;
}
else
@ -157,3 +202,237 @@ int rhizome_queue_manifest_import(rhizome_manifest *m,
return WHY("Not implemented.");
}
int rhizome_fetching_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
{
int i;
if ((*fdcount)>=fdmax) return -1;
for(i=0;i<rhizome_file_fetch_queue_count;i++)
{
if ((*fdcount)>=fdmax) return -1;
if (debug&DEBUG_IO) {
fprintf(stderr,"rhizome file fetch request #%d is poll() slot #%d (fd %d)\n",
i,*fdcount,file_fetch_queue[i].socket); }
fds[*fdcount].fd=file_fetch_queue[i].socket;
switch(file_fetch_queue[i].state) {
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
fds[*fdcount].events=POLLOUT; break;
case RHIZOME_FETCH_RXHTTPHEADERS:
case RHIZOME_FETCH_RXFILE:
default:
fds[*fdcount].events=POLLIN; break;
}
(*fdcount)++;
}
return 0;
}
int rhizome_fetch_poll()
{
int rn;
if (debug&DEBUG_RHIZOME) printf("Checking %d active fetch requests\n",
rhizome_file_fetch_queue_count);
for(rn=0;rn<rhizome_file_fetch_queue_count;rn++)
{
rhizome_file_fetch_record *q=&file_fetch_queue[rn];
int action=0;
int bytes;
/* Make socket non-blocking */
fcntl(q->socket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)|O_NONBLOCK);
switch(q->state)
{
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
bytes=write(q->socket,&q->request[q->request_ofs],
q->request_len-q->request_ofs);
if (bytes>0) {
action=1;
q->request_ofs+=bytes;
if (q->request_ofs>=q->request_len) {
/* Sent all of request. Switch to listening for HTTP response headers.
*/
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Sent http request to fetch file. (%d of %d bytes)\n",q->request_ofs,q->request_len);
fprintf(stderr,"sent [%s]\n",q->request);
q->request_len=0; q->request_ofs=0;
q->state=RHIZOME_FETCH_RXHTTPHEADERS;
}
}
break;
case RHIZOME_FETCH_RXFILE:
/* Keep reading until we have two CR/LFs in a row */
if (debug&DEBUG_RHIZOME)
fprintf(stderr,"receiving rhizome fetch file body (current offset=%d)\n",
q->file_ofs);
sigPipeFlag=0;
errno=0;
char buffer[8192];
int bytes=read(q->socket,buffer,8192);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
action=1;
if (debug&DEBUG_RHIZOME)
fprintf(stderr,"Read %d bytes; we now have %d of %d bytes.\n",
bytes,q->file_ofs+bytes,q->file_len);
if (bytes>(q->file_len-q->file_ofs))
bytes=q->file_len-q->file_ofs;
if (fwrite(buffer,bytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Failed writing %d bytes to file. @ offset %d\n",bytes,q->file_ofs);
q->close=1;
continue;
}
q->file_ofs+=bytes;
if (q->file_ofs>=q->file_len)
{
/* got all of file */
q->close=1;
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Received all of file via rhizome -- now to import it\n");
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS:
/* Keep reading until we have two CR/LFs in a row */
if (debug&DEBUG_RHIZOME) WHY("receiving rhizome fetch http headers");
sigPipeFlag=0;
errno=0;
bytes=read(q->socket,&q->request[q->request_len],
1024-q->request_len-1);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) {
action=1;
int lfcount=0;
int i=q->request_len-160;
if (i<0) i=0;
q->request_len+=bytes;
if (q->request_len<1024)
q->request[q->request_len]=0;
dump("http reply headers",(unsigned char *)q->request,q->request_len);
for(;i<(q->request_len+bytes);i++)
{
switch(q->request[i]) {
case '\n': lfcount++; break;
case '\r': /* ignore CR */ break;
case 0: /* ignore NUL (telnet inserts them) */ break;
default: lfcount=0; break;
}
if (lfcount==2) break;
}
if (lfcount==2) {
/* We have the response headers, so parse.
(we may also have some extra bytes, so we need to be a little
careful) */
/* Terminate string at end of headers */
q->request[i]=0;
/* Get HTTP result code */
char *s=strstr(q->request,"HTTP/1.0 ");
if (!s) {
if (debug&DEBUG_RHIZOME) fprintf(stderr,"HTTP response lacked HTTP/1.0 response code.\n");
q->close=1; continue; }
int http_response_code=strtoll(&s[9],NULL,10);
if (http_response_code!=200) {
if (debug&DEBUG_RHIZOME) fprintf(stderr,"Rhizome web server returned %d != 200 OK\n",http_response_code);
q->close=1; continue;
}
/* Get content length */
s=strstr(q->request,"Content-length: ");
if (!s) {
if (debug&DEBUG_RHIZOME)
fprintf(stderr,"Missing Content-Length: header.\n");
q->close=1; continue; }
q->file_len=strtoll(&s[16],NULL,10);
if (q->file_len<0) {
if (debug&DEBUG_RHIZOME)
fprintf(stderr,"Illegal file size (%d).\n",q->file_len);
q->close=1; continue; }
/* Okay, we have both, and are all set.
File is already open, so just write out any initial bytes of the
file we read, and update state flag.
*/
int fileRxBytes=q->request_len-(i+1);
if (fileRxBytes>0)
if (fwrite(&q->request[i+1],fileRxBytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME)
fprintf(stderr,"Failed writing initial %d bytes to file.\n",
fileRxBytes);
q->close=1;
continue;
}
q->file_ofs=fileRxBytes;
if (debug&DEBUG_RHIZOME)
fprintf(stderr,"Read %d initial bytes of %d total\n",
q->file_ofs,q->file_len);
q->state=RHIZOME_FETCH_RXFILE;
}
q->request_len+=bytes;
}
/* Give up fairly quickly if there is no action, because the peer may
have moved out of range. */
if (!action) {
if (time(0)-q->last_action>RHIZOME_IDLE_TIMEOUT) {
q->close=1;
continue;
}
} else q->last_action=time(0);
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
if (debug&DEBUG_RHIZOME)
WHY("Closing rhizome fetch connection due to sigpipe");
q->close=1;
continue;
}
break;
default:
if (debug&DEBUG_RHIZOME)
WHY("Closing rhizome fetch connection due to illegal/unimplemented state.");
q->close=1;
break;
}
/* Make socket blocking again for poll()/select() */
fcntl(q->socket,F_SETFL,fcntl(q->socket, F_GETFL, NULL)&(~O_NONBLOCK));
}
int i;
for(i=rhizome_file_fetch_queue_count-1;i>=0;i--)
{
if (file_fetch_queue[i].close) {
/* Free ephemeral data */
if (file_fetch_queue[i].file) fclose(file_fetch_queue[i].file);
file_fetch_queue[i].file=NULL;
if (file_fetch_queue[i].manifest)
rhizome_manifest_free(file_fetch_queue[i].manifest);
file_fetch_queue[i].manifest=NULL;
/* reshuffle higher numbered slot down if required */
if (i<(rhizome_file_fetch_queue_count-1))
bcopy(&file_fetch_queue[rhizome_file_fetch_queue_count-1],
&file_fetch_queue[i],sizeof(rhizome_file_fetch_record));
/* Reduce count of open connections */
rhizome_file_fetch_queue_count--;
}
}
return 0;
}

@ -155,7 +155,8 @@ int rhizome_server_poll()
{
case RHIZOME_HTTP_REQUEST_RECEIVING:
/* Keep reading until we have two CR/LFs in a row */
WHY("receiving http request data");
r->request[r->request_length]=0;
fprintf(stderr,"http request so far: [%s]\n",r->request);
sigPipeFlag=0;
@ -544,7 +545,7 @@ int rhizome_server_parse_http_request(int rn,rhizome_http_request *r)
rhizome_server_http_response_header(r,200,"application/binary",
r->blob_end-r->source_index);
sqlite3_blob_close(blob);
WHY("opened blob and file");
WHY("opened blob and file -- but still need to send file body.");
}
}
}
@ -619,7 +620,7 @@ int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r)
bytes=r->buffer_length-r->buffer_offset;
bytes=write(r->socket,&r->buffer[r->buffer_offset],bytes);
if (bytes>0) {
printf("wrote %d bytes\n",bytes);
fprintf(stderr,"wrote %d bytes\n",bytes);
dump("bytes written",&r->buffer[r->buffer_offset],bytes);
r->buffer_offset+=bytes;
if (r->buffer_offset>=r->buffer_length) {
@ -682,7 +683,7 @@ int rhizome_server_http_response_header(rhizome_http_request *r,int result,
r->buffer_size=bytes+strlen("HTTP/1.0 000 \r\n\r\n")+strlen(httpResultString(A_VALUE_GREATER_THAN_FOUR))+100;
r->buffer=(unsigned char *)malloc(r->buffer_size);
}
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d \r\nContent-type: text/html\r\nContent-length: %lld\r\n\r\n",result,bytes);
snprintf((char *)r->buffer,r->buffer_size,"HTTP/1.0 %03d \r\nContent-type: %s\r\nContent-length: %lld\r\n\r\n",result,mime_type,bytes);
r->buffer_length=strlen((char *)r->buffer);
r->buffer_offset=0;