Transfer should work for small files that arrive with the headers

This commit is contained in:
Jeremy Lakeman 2012-07-13 12:10:55 +09:30
parent 5e915bcc09
commit 0255d7938e

View File

@ -752,6 +752,66 @@ void rhizome_fetch_write(rhizome_file_fetch_record *q){
} }
} }
void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes){
if (bytes>(q->file_len-q->file_ofs))
bytes=q->file_len-q->file_ofs;
if (fwrite(buffer,bytes,1,q->file)!=1)
{
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, q->file_ofs);
rhizome_fetch_close(q);
return;
}
q->file_ofs+=bytes;
if (q->file_ofs>=q->file_len)
{
/* got all of file */
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Received all of file via rhizome -- now to import it");
{
fclose(q->file);
q->file = NULL;
// TODO: We already have the manifest struct in memory, should import the bundle
// directly from that, not by writing it to a file and re-reading it!
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL)
{ WHY("Manifest missing ID"); return; }
if (create_rhizome_import_dir() == -1)
return;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename,"manifest.%s", id))
return;
/* Do really write the manifest unchanged */
if (debug & DEBUG_RHIZOME_RX) {
DEBUGF("manifest has %d signatories",q->manifest->sig_count);
DEBUGF("manifest bid=%s len=%d",
rhizome_manifest_get(q->manifest, "id", NULL, 0),
q->manifest->manifest_bytes);
dump("manifest",&q->manifest->manifestdata[0],
q->manifest->manifest_all_bytes);
}
q->manifest->finalised=1;
q->manifest->manifest_bytes=q->manifest->manifest_all_bytes;
if (rhizome_write_manifest_file(q->manifest,filename) != -1) {
rhizome_bundle_import(q->manifest, NULL, id,
q->manifest->ttl - 1 /* TTL */);
}
rhizome_manifest_free(q->manifest);
q->manifest = NULL;
}
rhizome_fetch_close(q);
return;
}
// reset timeout due to activity
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
}
void rhizome_fetch_poll(struct sched_ent *alarm) void rhizome_fetch_poll(struct sched_ent *alarm)
{ {
rhizome_file_fetch_record *q=(rhizome_file_fetch_record *)alarm; rhizome_file_fetch_record *q=(rhizome_file_fetch_record *)alarm;
@ -779,70 +839,9 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
int bytes=read(q->alarm.poll.fd,buffer,8192); int bytes=read(q->alarm.poll.fd,buffer,8192);
/* If we got some data, see if we have found the end of the HTTP request */ /* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) { if (bytes>0)
rhizome_write_content(q, buffer, bytes);
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm=overlay_gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
if (bytes>(q->file_len-q->file_ofs))
bytes=q->file_len-q->file_ofs;
if (fwrite(buffer,bytes,1,q->file)!=1)
{
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, q->file_ofs);
rhizome_fetch_close(q);
return;
}
q->file_ofs+=bytes;
} else if (bytes==0) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Got zero bytes, assume socket dead.");
rhizome_fetch_close(q);
return;
}
if (q->file_ofs>=q->file_len)
{
/* got all of file */
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Received all of file via rhizome -- now to import it");
{
fclose(q->file);
q->file = NULL;
// TODO: We already have the manifest struct in memory, should import the bundle
// directly from that, not by writing it to a file and re-reading it!
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL)
{ WHY("Manifest missing ID"); return; }
if (create_rhizome_import_dir() == -1)
return;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename,"manifest.%s", id))
return;
/* Do really write the manifest unchanged */
if (debug & DEBUG_RHIZOME_RX) {
DEBUGF("manifest has %d signatories",q->manifest->sig_count);
DEBUGF("manifest bid=%s len=%d",
rhizome_manifest_get(q->manifest, "id", NULL, 0),
q->manifest->manifest_bytes);
dump("manifest",&q->manifest->manifestdata[0],
q->manifest->manifest_all_bytes);
}
q->manifest->finalised=1;
q->manifest->manifest_bytes=q->manifest->manifest_all_bytes;
if (rhizome_write_manifest_file(q->manifest,filename) != -1) {
rhizome_bundle_import(q->manifest, NULL, id,
q->manifest->ttl - 1 /* TTL */);
}
rhizome_manifest_free(q->manifest);
q->manifest = NULL;
}
rhizome_fetch_close(q);
return;
}
break; break;
case RHIZOME_FETCH_RXHTTPHEADERS: case RHIZOME_FETCH_RXHTTPHEADERS:
/* Keep reading until we have two CR/LFs in a row */ /* Keep reading until we have two CR/LFs in a row */
@ -852,6 +851,14 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
bytes=read(q->alarm.poll.fd,&q->request[q->request_len], bytes=read(q->alarm.poll.fd,&q->request[q->request_len],
1024-q->request_len-1); 1024-q->request_len-1);
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to sigpipe");
rhizome_fetch_close(q);
return;
}
/* If we got some data, see if we have found the end of the HTTP request */ /* If we got some data, see if we have found the end of the HTTP request */
if (bytes>0) { if (bytes>0) {
int lfcount=0; int lfcount=0;
@ -924,31 +931,16 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
File is already open, so just write out any initial bytes of the File is already open, so just write out any initial bytes of the
file we read, and update state flag. file we read, and update state flag.
*/ */
int fileRxBytes=q->request_len-(i+1);
if (fileRxBytes>0)
if (fwrite(&q->request[i+1],fileRxBytes,1,q->file)!=1)
{
if (debug&DEBUG_RHIZOME)
DEBUGF("Failed writing initial %d bytes to file.",
fileRxBytes);
rhizome_fetch_close(q);
return;
}
q->file_ofs=fileRxBytes;
DEBUGF("Transferred (%lld of %lld)",
q->file_ofs,q->file_len);
q->state=RHIZOME_FETCH_RXFILE; q->state=RHIZOME_FETCH_RXFILE;
int fileRxBytes=q->request_len-(i+1);
if (fileRxBytes>0)
rhizome_write_content(q, &q->request[i+1], fileRxBytes);
} }
} }
if (sigPipeFlag||((bytes==0)&&(errno==0))) {
/* broken pipe, so close connection */
if (debug&DEBUG_RHIZOME)
DEBUG("Closing rhizome fetch connection due to sigpipe");
rhizome_fetch_close(q);
return;
}
break; break;
default: default:
if (debug&DEBUG_RHIZOME) if (debug&DEBUG_RHIZOME)