Write out of order packets with MDP transport

- add random order writing to rhizome store API
- remove rhizome flush, force all callers to provide buffer
This commit is contained in:
Jeremy Lakeman 2013-07-17 15:47:35 +09:30
parent 49e0286b43
commit a4bf4edbfc
5 changed files with 302 additions and 229 deletions

View File

@ -173,9 +173,6 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]);
int count=mdp->out.payload_length-(1+16+8+8); int count=mdp->out.payload_length-(1+16+8+8);
unsigned char *bytes=&mdp->out.payload[1+16+8+8]; unsigned char *bytes=&mdp->out.payload[1+16+8+8];
if (config.debug.rhizome_rx)
DEBUGF("Received %d bytes @ 0x%"PRIx64" for %s* version 0x%"PRIx64,
count,offset,alloca_tohex(bidprefix,16),version);
/* Now see if there is a slot that matches. If so, then /* Now see if there is a slot that matches. If so, then
see if the bytes are in the window, and write them. see if the bytes are in the window, and write them.

View File

@ -383,16 +383,19 @@ int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len);
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length); rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length);
/* Rhizome file storage api */ /* Rhizome file storage api */
struct rhizome_write_buffer{
struct rhizome_write_buffer *_next;
int64_t offset;
int buffer_size;
int data_size;
unsigned char data[0];
};
struct rhizome_write{ struct rhizome_write{
char id[SHA512_DIGEST_STRING_LENGTH+1]; char id[SHA512_DIGEST_STRING_LENGTH+1];
char id_known; char id_known;
unsigned char *buffer;
int buffer_size;
int data_size;
int64_t file_offset; int64_t file_offset;
int64_t file_length; int64_t file_length;
@ -403,6 +406,9 @@ struct rhizome_write{
SHA512_CTX sha512_context; SHA512_CTX sha512_context;
int64_t blob_rowid; int64_t blob_rowid;
int blob_fd; int blob_fd;
struct rhizome_write_buffer *out_of_order;
int total_data_size;
}; };
struct rhizome_read{ struct rhizome_read{
@ -674,7 +680,8 @@ int unpack_http_response(char *response, struct http_response_parts *parts);
int rhizome_exists(const char *fileHash); int rhizome_exists(const char *fileHash);
int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int64_t file_length, int priority); int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int64_t file_length, int priority);
int rhizome_flush(struct rhizome_write *write); int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffer, int data_size);
int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsigned char *buffer, int data_size);
int rhizome_write_file(struct rhizome_write *write, const char *filename); int rhizome_write_file(struct rhizome_write *write, const char *filename);
int rhizome_fail_write(struct rhizome_write *write); int rhizome_fail_write(struct rhizome_write *write);
int rhizome_finish_write(struct rhizome_write *write); int rhizome_finish_write(struct rhizome_write *write);

View File

@ -77,13 +77,12 @@ struct rhizome_fetch_slot {
/* MDP transport specific elements */ /* MDP transport specific elements */
unsigned char bid[RHIZOME_MANIFEST_ID_BYTES]; unsigned char bid[RHIZOME_MANIFEST_ID_BYTES];
int64_t bidVersion; int64_t bidVersion;
int bidP;
unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES];
int prefix_length; int prefix_length;
int mdpIdleTimeout; int mdpIdleTimeout;
time_ms_t mdp_last_request_time;
uint64_t mdp_last_request_offset;
int mdpResponsesOutstanding; int mdpResponsesOutstanding;
int mdpRXBlockLength; int mdpRXBlockLength;
uint32_t mdpRXBitmap;
unsigned char mdpRXWindow[32*200]; unsigned char mdpRXWindow[32*200];
}; };
@ -141,17 +140,16 @@ int rhizome_active_fetch_count()
int rhizome_active_fetch_bytes_received(int q) int rhizome_active_fetch_bytes_received(int q)
{ {
if (q<0) return -1; if (q<0 || q>=NQUEUES) return -1;
if (q>=NQUEUES) return -1;
if (rhizome_fetch_queues[q].active.state==RHIZOME_FETCH_FREE) return -1; if (rhizome_fetch_queues[q].active.state==RHIZOME_FETCH_FREE) return -1;
return (int)rhizome_fetch_queues[q].active.write_state.file_offset + rhizome_fetch_queues[q].active.write_state.data_size; return (int)rhizome_fetch_queues[q].active.write_state.file_offset;
} }
int rhizome_fetch_queue_bytes(){ int rhizome_fetch_queue_bytes(){
int i,j,bytes=0; int i,j,bytes=0;
for(i=0;i<NQUEUES;i++){ for(i=0;i<NQUEUES;i++){
if (rhizome_fetch_queues[i].active.state!=RHIZOME_FETCH_FREE){ if (rhizome_fetch_queues[i].active.state!=RHIZOME_FETCH_FREE){
int received=rhizome_fetch_queues[i].active.write_state.file_offset + rhizome_fetch_queues[i].active.write_state.data_size; int received=rhizome_fetch_queues[i].active.write_state.file_offset;
bytes+=rhizome_fetch_queues[i].active.manifest->fileLength - received; bytes+=rhizome_fetch_queues[i].active.manifest->fileLength - received;
} }
for (j=0;j<rhizome_fetch_queues[i].candidate_queue_size;j++){ for (j=0;j<rhizome_fetch_queues[i].candidate_queue_size;j++){
@ -169,9 +167,8 @@ int rhizome_fetch_status_html(struct strbuf *b)
struct rhizome_fetch_queue *q=&rhizome_fetch_queues[i]; struct rhizome_fetch_queue *q=&rhizome_fetch_queues[i];
strbuf_sprintf(b, "<p>Slot %d, ", i); strbuf_sprintf(b, "<p>Slot %d, ", i);
if (q->active.state!=RHIZOME_FETCH_FREE){ if (q->active.state!=RHIZOME_FETCH_FREE){
strbuf_sprintf(b, "%lld[+%d] of %lld", strbuf_sprintf(b, "%lld of %lld",
q->active.write_state.file_offset, q->active.write_state.file_offset,
q->active.write_state.data_size,
q->active.manifest->fileLength); q->active.manifest->fileLength);
}else{ }else{
strbuf_puts(b, "inactive"); strbuf_puts(b, "inactive");
@ -413,14 +410,36 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->write_state.blob_rowid=-1; slot->write_state.blob_rowid=-1;
if (slot->manifest) { if (slot->manifest) {
bcopy(slot->manifest->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES);
slot->prefix_length=RHIZOME_MANIFEST_ID_BYTES;
slot->bidVersion=slot->manifest->version;
/* Don't provide a filename, because we will stream the file straight into
the database. */
slot->manifest->dataFileName = NULL;
slot->manifest->dataFileUnlinkOnFree = 0;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", slot->manifest->fileHexHash);
if (strbuf_overrun(r))
RETURN(WHY("request overrun"));
slot->request_len = strbuf_len(r);
if (rhizome_open_write(&slot->write_state, slot->manifest->fileHexHash, slot->manifest->fileLength, RHIZOME_PRIORITY_DEFAULT)) if (rhizome_open_write(&slot->write_state, slot->manifest->fileHexHash, slot->manifest->fileLength, RHIZOME_PRIORITY_DEFAULT))
RETURN(-1); RETURN(-1);
} else { } else {
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(slot->bid, slot->prefix_length));
if (strbuf_overrun(r))
RETURN(WHY("request overrun"));
slot->request_len = strbuf_len(r);
slot->manifest_bytes=0;
slot->write_state.file_offset=0; slot->write_state.file_offset=0;
slot->write_state.file_length=-1; slot->write_state.file_length=-1;
} }
slot->request_ofs = 0; slot->request_ofs = 0;
slot->state = RHIZOME_FETCH_CONNECTING; slot->state = RHIZOME_FETCH_CONNECTING;
slot->alarm.function = rhizome_fetch_poll; slot->alarm.function = rhizome_fetch_poll;
fetch_stats.name = "rhizome_fetch_poll"; fetch_stats.name = "rhizome_fetch_poll";
@ -473,7 +492,6 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
bail_http: bail_http:
/* Fetch via overlay, either because no IP address was provided, or because /* Fetch via overlay, either because no IP address was provided, or because
the connection/attempt to fetch via HTTP failed. */ the connection/attempt to fetch via HTTP failed. */
slot->state=RHIZOME_FETCH_RXFILEMDP;
rhizome_fetch_switch_to_mdp(slot); rhizome_fetch_switch_to_mdp(slot);
RETURN(0); RETURN(0);
OUT(); OUT();
@ -608,36 +626,13 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
RETURN(IMPORTED); RETURN(IMPORTED);
} }
// Start the fetch. /* Prepare for fetching */
//dump("peerip", peerip, sizeof *peerip);
/* Prepare for fetching via HTTP */
slot->peer_ipandport = *peerip; slot->peer_ipandport = *peerip;
slot->alarm.poll.fd=-1;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash);
if (strbuf_overrun(r))
RETURN(WHY("request overrun"));
slot->request_len = strbuf_len(r);
/* Prepare for fetching via MDP */
bcopy(peersid,slot->peer_sid,SID_SIZE); bcopy(peersid,slot->peer_sid,SID_SIZE);
bcopy(m->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES);
slot->bidVersion=m->version;
slot->bidP=1;
/* Don't provide a filename, because we will stream the file straight into
the database. */
m->dataFileName = NULL;
m->dataFileUnlinkOnFree = 0;
slot->manifest = m; slot->manifest = m;
if (schedule_fetch(slot) == -1) if (schedule_fetch(slot) == -1)
RETURN(-1); RETURN(-1);
if (config.debug.rhizome_rx)
DEBUGF(" started fetch bid %s version 0x%"PRIx64" into %s, slot=%d filehash=%s",
alloca_tohex_bid(slot->bid), slot->bidVersion,
alloca_str_toprint(slot->manifest->dataFileName), slotno(slot), m->fileHexHash);
RETURN(STARTED); RETURN(STARTED);
} }
@ -658,24 +653,14 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip,
/* Prepare for fetching via HTTP */ /* Prepare for fetching via HTTP */
slot->peer_ipandport = *peerip; slot->peer_ipandport = *peerip;
slot->manifest = NULL; slot->manifest = NULL;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length));
if (strbuf_overrun(r))
return WHY("request overrun");
slot->request_len = strbuf_len(r);
/* Prepare for fetching via MDP */
bcopy(peersid,slot->peer_sid,SID_SIZE); bcopy(peersid,slot->peer_sid,SID_SIZE);
bcopy(prefix,slot->prefix,prefix_length); bcopy(prefix,slot->bid,prefix_length);
slot->prefix_length=prefix_length; slot->prefix_length=prefix_length;
slot->bidP=0;
/* Don't stream into a file blob in the database, because it is a manifest. /* Don't stream into a file blob in the database, because it is a manifest.
We do need to cache it in the slot structure, though, and then offer it We do need to cache it in the slot structure, though, and then offer it
for inserting into the database, but we can avoid the temporary file in for inserting into the database, but we can avoid the temporary file in
the process. */ the process. */
slot->write_state.blob_rowid=-1;
slot->manifest_bytes=0;
if (schedule_fetch(slot) == -1) { if (schedule_fetch(slot) == -1) {
return -1; return -1;
@ -950,7 +935,8 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
rhizome_manifest_free(slot->manifest); rhizome_manifest_free(slot->manifest);
slot->manifest = NULL; slot->manifest = NULL;
if (slot->write_state.buffer) if (slot->write_state.blob_fd>=0 ||
slot->write_state.blob_rowid>=0)
rhizome_fail_write(&slot->write_state); rhizome_fail_write(&slot->write_state);
// Release the fetch slot. // Release the fetch slot.
@ -972,14 +958,14 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
if (now-slot->last_write_time>slot->mdpIdleTimeout) { if (now-slot->last_write_time>slot->mdpIdleTimeout) {
DEBUGF("MDP connection timed out: last RX %lldms ago (read %"PRId64" of %"PRId64" bytes)", DEBUGF("MDP connection timed out: last RX %lldms ago (read %"PRId64" of %"PRId64" bytes)",
now-slot->last_write_time, now-slot->last_write_time,
slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length); slot->write_state.file_offset, slot->write_state.file_length);
rhizome_fetch_close(slot); rhizome_fetch_close(slot);
OUT(); OUT();
return; return;
} }
if (config.debug.rhizome_rx) if (config.debug.rhizome_rx)
DEBUGF("Timeout: Resending request for slot=0x%p (%"PRId64" of %"PRId64" received)", DEBUGF("Timeout: Resending request for slot=0x%p (%"PRId64" of %"PRId64" received)",
slot,slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length); slot,slot->write_state.file_offset, slot->write_state.file_length);
rhizome_fetch_mdp_requestblocks(slot); rhizome_fetch_mdp_requestblocks(slot);
OUT(); OUT();
} }
@ -1023,22 +1009,41 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
mdp.out.payload_length=RHIZOME_MANIFEST_ID_BYTES+8+8+4+2; mdp.out.payload_length=RHIZOME_MANIFEST_ID_BYTES+8+8+4+2;
bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES);
write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES],slot->bidVersion); uint32_t bitmap=0;
write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8],slot->write_state.file_offset + slot->write_state.data_size); int requests=32;
write_uint32(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8],slot->mdpRXBitmap); int i;
write_uint16(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4],slot->mdpRXBlockLength); struct rhizome_write_buffer *p = slot->write_state.out_of_order;
uint64_t offset = slot->write_state.file_offset;
for (i=0;i<32;i++){
while(p && p->offset + p->data_size < offset)
p=p->_next;
if (!p)
break;
if (p->offset <= offset && p->offset+p->data_size >= offset+slot->mdpRXBlockLength){
bitmap |= 1<<(31-i);
requests --;
}
offset+=slot->mdpRXBlockLength;
}
write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES], slot->bidVersion);
write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8], slot->write_state.file_offset);
write_uint32(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8], bitmap);
write_uint16(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4], slot->mdpRXBlockLength);
if (config.debug.rhizome_tx) if (config.debug.rhizome_tx)
DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%"PRIx64, DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%"PRIx64,
alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid),
slot->write_state.file_offset + slot->write_state.data_size); slot->write_state.file_offset);
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
// remember when we sent the request so that we can adjust the inter-request // remember when we sent the request so that we can adjust the inter-request
// interval based on how fast the packets arrive. // interval based on how fast the packets arrive.
slot->mdpResponsesOutstanding=32; // TODO: set according to bitmap slot->mdpResponsesOutstanding=requests;
slot->mdp_last_request_offset = slot->write_state.file_offset;
slot->mdp_last_request_time = gettime_ms();
rhizome_fetch_mdp_touch_timeout(slot); rhizome_fetch_mdp_touch_timeout(slot);
RETURN(0); RETURN(0);
@ -1056,7 +1061,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
or with a temporary generated SID, so that we don't end up with two or with a temporary generated SID, so that we don't end up with two
instances with the same SID. instances with the same SID.
*/ */
IN() IN();
if (!my_subscriber) { if (!my_subscriber) {
DEBUGF("I don't have an identity, so we cannot fall back to MDP"); DEBUGF("I don't have an identity, so we cannot fall back to MDP");
RETURN(rhizome_fetch_close(slot)); RETURN(rhizome_fetch_close(slot));
@ -1096,9 +1101,8 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
transport. transport.
*/ */
slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds
slot->mdpRXBitmap=0x00000000; // no blocks received yet
slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size
rhizome_fetch_mdp_requestblocks(slot); rhizome_fetch_mdp_requestblocks(slot);
RETURN(0); RETURN(0);
OUT(); OUT();
@ -1136,7 +1140,76 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
return; return;
} }
int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) int rhizome_write_complete(struct rhizome_fetch_slot *slot)
{
IN();
if (slot->manifest) {
if (slot->write_state.file_offset < slot->write_state.file_length)
RETURN(0);
// Were fetching payload, now we have it.
if (config.debug.rhizome_rx)
DEBUGF("Received all of file via rhizome -- now to import it");
if (rhizome_finish_write(&slot->write_state)){
rhizome_fetch_close(slot);
RETURN(-1);
}
if (rhizome_import_received_bundle(slot->manifest)){
rhizome_fetch_close(slot);
RETURN(-1);
}
if (slot->state==RHIZOME_FETCH_RXFILE) {
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
INFOF("Completed http request from %s:%u for file %s",
buf, ntohs(slot->peer_ipandport.sin_port),
slot->manifest->fileHexHash);
} else {
INFOF("Completed MDP request from %s for file %s",
alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash);
}
} else {
/* This was to fetch the manifest, so now fetch the file if needed */
if (config.debug.rhizome_rx)
DEBUGF("Received a manifest in response to supplying a manifest prefix.");
/* Read the manifest and add it to suggestion queue, then immediately
call schedule queued items. */
rhizome_manifest *m = rhizome_new_manifest();
if (m) {
if (rhizome_read_manifest_file(m, slot->manifest_buffer,
slot->manifest_bytes) == -1) {
DEBUGF("Couldn't read manifest");
rhizome_manifest_free(m);
} else {
if (config.debug.rhizome_rx){
DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic));
dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport));
dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid));
}
rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport,
slot->peer_sid);
}
}
}
if (config.debug.rhizome_rx)
DEBUGF("Closing rhizome fetch slot = 0x%p. Received %lld bytes in %lldms (%lldKB/sec).",
slot,(long long)slot->write_state.file_offset,
(long long)gettime_ms()-slot->start_time,
(long long)(slot->write_state.file_offset)/(gettime_ms()-slot->start_time));
rhizome_fetch_close(slot);
RETURN(-1);
}
int rhizome_write_content(struct rhizome_fetch_slot *slot, unsigned char *buffer, int bytes)
{ {
IN(); IN();
@ -1145,8 +1218,8 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
// Truncate to known length of file (handy for reading from journal bundles that // Truncate to known length of file (handy for reading from journal bundles that
// might grow while we are reading from them). // might grow while we are reading from them).
if (bytes>(slot->write_state.file_length-(slot->write_state.file_offset+slot->write_state.data_size))){ if (bytes>(slot->write_state.file_length-(slot->write_state.file_offset))){
bytes=slot->write_state.file_length-(slot->write_state.file_offset+slot->write_state.data_size); bytes=slot->write_state.file_length-(slot->write_state.file_offset);
} }
if (!slot->manifest){ if (!slot->manifest){
@ -1159,88 +1232,15 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
} else { } else {
/* We are reading a file. Stream it into the database. */ /* We are reading a file. Stream it into the database. */
int ofs=0; if (rhizome_write_buffer(&slot->write_state, buffer, bytes)){
while (ofs<bytes){ rhizome_fetch_close(slot);
int block_size = bytes - ofs; RETURN(-1);
if (block_size > slot->write_state.buffer_size - slot->write_state.data_size)
block_size = slot->write_state.buffer_size - slot->write_state.data_size;
if (block_size>0){
bcopy(buffer+ofs, slot->write_state.buffer + slot->write_state.data_size, block_size);
slot->write_state.data_size+=block_size;
ofs+=block_size;
}
if (slot->write_state.data_size>=slot->write_state.buffer_size){
int ret = rhizome_flush(&slot->write_state);
if (ret!=0){
rhizome_fetch_close(slot);
RETURN(-1);
}
}
} }
} }
slot->last_write_time=gettime_ms(); slot->last_write_time=gettime_ms();
if (slot->write_state.file_offset + slot->write_state.data_size>=slot->write_state.file_length) { RETURN(rhizome_write_complete(slot));
/* got all of file */
if (config.debug.rhizome_rx)
DEBUGF("Received all of file via rhizome -- now to import it");
if (slot->manifest) {
// Were fetching payload, now we have it.
if (rhizome_finish_write(&slot->write_state)){
rhizome_fetch_close(slot);
RETURN(-1);
}
if (!rhizome_import_received_bundle(slot->manifest)){
if (slot->state==RHIZOME_FETCH_RXFILE) {
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
INFOF("Completed http request from %s:%u for file %s",
buf, ntohs(slot->peer_ipandport.sin_port),
slot->manifest->fileHexHash);
} else {
INFOF("Completed MDP request from %s for file %s",
alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash);
}
}
} else {
/* This was to fetch the manifest, so now fetch the file if needed */
if (config.debug.rhizome_rx)
DEBUGF("Received a manifest in response to supplying a manifest prefix.");
/* Read the manifest and add it to suggestion queue, then immediately
call schedule queued items. */
rhizome_manifest *m = rhizome_new_manifest();
if (m) {
if (rhizome_read_manifest_file(m, slot->manifest_buffer,
slot->manifest_bytes) == -1) {
DEBUGF("Couldn't read manifest");
rhizome_manifest_free(m);
} else {
if (config.debug.rhizome_rx){
DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic));
dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport));
dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid));
}
rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport,
slot->peer_sid);
}
}
}
if (config.debug.rhizome_rx)
DEBUGF("Closing rhizome fetch slot = 0x%p. Received %lld bytes in %lldms (%lldKB/sec). Buffer size = %d",
slot,(long long)slot->write_state.file_offset+slot->write_state.data_size,
(long long)gettime_ms()-slot->start_time,
(long long)(slot->write_state.file_offset+slot->write_state.data_size)/(gettime_ms()-slot->start_time),
slot->write_state.buffer_size);
rhizome_fetch_close(slot);
RETURN(-1);
}
// slot is still open // slot is still open
RETURN(0); RETURN(0);
@ -1255,32 +1255,33 @@ int rhizome_received_content(unsigned char *bidprefix,
int i; int i;
for(i=0;i<NQUEUES;i++) { for(i=0;i<NQUEUES;i++) {
struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active; struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active;
if (slot->state==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) {
if (!memcmp(slot->bid,bidprefix,16))
{
if (slot->write_state.file_offset + slot->write_state.data_size==offset) {
if (!rhizome_write_content(slot,(char *)bytes,count))
{
rhizome_fetch_mdp_touch_timeout(slot);
slot->mdpResponsesOutstanding--;
if (slot->mdpResponsesOutstanding==0) {
// We have received all responses, so immediately ask for more
rhizome_fetch_mdp_requestblocks(slot);
}
// TODO: Try flushing out stuck packets that we have kept due to
// packet loss / out-of-order delivery.
}
RETURN(0); if (slot->state!=RHIZOME_FETCH_RXFILEMDP
} else { || version != slot->bidVersion
// TODO: Implement out-of-order buffering so that lost packets || memcmp(slot->bid,bidprefix,16)!=0)
// don't cause wastage continue;
}
RETURN(0); if (rhizome_random_write(&slot->write_state, offset, bytes, count)){
} if (config.debug.rhizome)
DEBUGF("Write failed!");
RETURN (-1);
} }
}
if (rhizome_write_complete(slot)){
if (config.debug.rhizome)
DEBUGF("Complete failed!");
RETURN(-1);
}
slot->last_write_time=gettime_ms();
rhizome_fetch_mdp_touch_timeout(slot);
slot->mdpResponsesOutstanding--;
if (slot->mdpResponsesOutstanding==0) {
// We have received all responses, so immediately ask for more
rhizome_fetch_mdp_requestblocks(slot);
}
RETURN(0);
}
RETURN(-1); RETURN(-1);
OUT(); OUT();
@ -1302,7 +1303,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
switch (slot->state) { switch (slot->state) {
case RHIZOME_FETCH_RXFILE: { case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */ /* Keep reading until we have the promised amount of data */
char buffer[8192]; unsigned char buffer[8192];
sigPipeFlag = 0; sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */ /* If we got some data, see if we have found the end of the HTTP request */
@ -1317,7 +1318,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
} else { } else {
if (config.debug.rhizome_rx) if (config.debug.rhizome_rx)
DEBUGF("Empty read, closing connection: received %"PRId64" of %"PRId64" bytes", DEBUGF("Empty read, closing connection: received %"PRId64" of %"PRId64" bytes",
slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length); slot->write_state.file_offset,slot->write_state.file_length);
rhizome_fetch_switch_to_mdp(slot); rhizome_fetch_switch_to_mdp(slot);
return; return;
} }
@ -1375,7 +1376,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
slot->state = RHIZOME_FETCH_RXFILE; slot->state = RHIZOME_FETCH_RXFILE;
int content_bytes = slot->request + slot->request_len - parts.content_start; int content_bytes = slot->request + slot->request_len - parts.content_start;
if (content_bytes > 0){ if (content_bytes > 0){
rhizome_write_content(slot, parts.content_start, content_bytes); rhizome_write_content(slot, (unsigned char*)parts.content_start, content_bytes);
// reset inactivity timeout // reset inactivity timeout
unschedule(&slot->alarm); unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout; slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout;

View File

@ -123,15 +123,6 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
SHA512_Init(&write->sha512_context); SHA512_Init(&write->sha512_context);
write->buffer_size=write->file_length;
if (write->buffer_size>RHIZOME_BUFFER_MAXIMUM_SIZE)
write->buffer_size=RHIZOME_BUFFER_MAXIMUM_SIZE;
write->buffer=malloc(write->buffer_size);
if (!write->buffer)
return WHY("Unable to allocate write buffer");
return 0; return 0;
} }
@ -213,18 +204,88 @@ int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffe
write_state->file_offset+=data_size; write_state->file_offset+=data_size;
if (config.debug.rhizome) if (config.debug.rhizome)
DEBUGF("Written %"PRId64" of %"PRId64, write_state->file_offset, write_state->file_length); DEBUGF("Written %"PRId64" of %"PRId64, write_state->file_offset, write_state->file_length);
RETURN(data_size); RETURN(0);
OUT(); OUT();
} }
/* Write write_state->buffer into the store int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsigned char *buffer, int data_size){
Note that we don't support random writes as the contents must be hashed in order // search the out of order buffer list for the insert position
But we don't enforce linear writes yet. */ struct rhizome_write_buffer **ptr = &write_state->out_of_order;
int rhizome_flush(struct rhizome_write *write_state){ if (offset + data_size > write_state->file_length)
int wrote = rhizome_write_buffer(write_state, write_state->buffer, write_state->data_size); data_size = write_state->file_length - offset;
if (wrote == write_state->data_size) if (data_size<=0)
write_state->data_size=0; return 0;
return wrote>=0?0:-1; int64_t last_offset = write_state->file_offset;
while(1){
// if existing data can be written, write it now
if (*ptr && (*ptr)->offset == write_state->file_offset){
struct rhizome_write_buffer *n=*ptr;
if (config.debug.rhizome)
DEBUGF("Writing caching block %"PRId64", %d", n->offset, n->data_size);
if (rhizome_write_buffer(write_state, n->data, n->data_size))
return -1;
last_offset = write_state->file_offset;
*ptr=n->_next;
free(n);
continue;
}
if (offset < last_offset){
int64_t delta = last_offset - offset;
if (delta >= data_size)
return 0;
data_size -= delta;
offset+=delta;
buffer+=delta;
}
if (data_size<=0)
return 0;
if (!*ptr || offset < (*ptr)->offset){
// found the insert position in the list
int64_t size = data_size;
// allow for buffers to overlap, we may need to split the incoming buffer into multiple pieces.
if (*ptr && offset+size > (*ptr)->offset)
size = (*ptr)->offset - offset;
if (offset == write_state->file_offset){
if (rhizome_write_buffer(write_state, buffer, size))
return -1;
// we need to go around the loop again to re-test if this buffer can now be written
}else{
// impose a limit on the total amount of cached data
if (write_state->total_data_size + size > RHIZOME_BUFFER_MAXIMUM_SIZE)
size = RHIZOME_BUFFER_MAXIMUM_SIZE - write_state->total_data_size;
if (size<=0)
return 0;
if (config.debug.rhizome)
DEBUGF("Caching block @%"PRId64", %"PRId64" received out of order", offset, size);
struct rhizome_write_buffer *i = emalloc(size + sizeof(struct rhizome_write_buffer));
if (!i)
return -1;
i->offset = offset;
i->buffer_size = i->data_size = size;
bcopy(buffer, i->data, size);
i->_next = *ptr;
write_state->total_data_size += size;
*ptr = i;
// if there's any overlap of this buffer and the current one, we may need to add another buffer.
ptr = &((*ptr)->_next);
}
data_size -= size;
offset+=size;
buffer+=size;
continue;
}
last_offset = (*ptr)->offset + (*ptr)->data_size;
ptr = &((*ptr)->_next);
}
return 0;
} }
/* Expects file to be at least file_length in size, ignoring anything longer than that */ /* Expects file to be at least file_length in size, ignoring anything longer than that */
@ -232,29 +293,29 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename){
FILE *f = fopen(filename, "r"); FILE *f = fopen(filename, "r");
if (!f) if (!f)
return WHY_perror("fopen"); return WHY_perror("fopen");
unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE];
int ret=0;
while(write->file_offset < write->file_length){ while(write->file_offset < write->file_length){
int size=write->buffer_size - write->data_size; int size=RHIZOME_CRYPT_PAGE_SIZE;
if (write->file_offset + size > write->file_length) if (write->file_offset + size > write->file_length)
size=write->file_length - write->file_offset; size=write->file_length - write->file_offset;
int r = fread(write->buffer + write->data_size, 1, size, f); int r = fread(buffer, 1, size, f);
if (r==-1){ if (r==-1){
WHY_perror("fread"); ret = WHY_perror("fread");
fclose(f); goto end;
return -1;
} }
write->data_size+=r;
DEBUGF("Read %d from file", r); DEBUGF("Read %d from file", r);
if (rhizome_flush(write)){ if (rhizome_write_buffer(write, buffer, r)){
fclose(f); ret=-1;
return -1; goto end;
} }
} }
end:
fclose(f); fclose(f);
return 0; return ret;
} }
int rhizome_store_delete(const char *id){ int rhizome_store_delete(const char *id){
@ -270,10 +331,6 @@ int rhizome_store_delete(const char *id){
} }
int rhizome_fail_write(struct rhizome_write *write){ int rhizome_fail_write(struct rhizome_write *write){
if (write->buffer)
free(write->buffer);
write->buffer=NULL;
if (write->blob_fd>=0){ if (write->blob_fd>=0){
if (config.debug.externalblobs) if (config.debug.externalblobs)
DEBUGF("Closing and removing fd %d", write->blob_fd); DEBUGF("Closing and removing fd %d", write->blob_fd);
@ -281,23 +338,21 @@ int rhizome_fail_write(struct rhizome_write *write){
write->blob_fd=-1; write->blob_fd=-1;
rhizome_store_delete(write->id); rhizome_store_delete(write->id);
} }
// don't worry too much about sql failures. // don't worry too much about sql failures.
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (!config.rhizome.external_blobs) if (write->blob_rowid>=0){
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry,
"DELETE FROM FILEBLOBS WHERE rowid=%lld",write->blob_rowid); "DELETE FROM FILEBLOBS WHERE rowid=%lld",write->blob_rowid);
write->blob_rowid=-1;
}
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry,
"DELETE FROM FILES WHERE id='%s'", "DELETE FROM FILES WHERE id='%s'",
write->id); write->id);
return 0; return 0;
} }
int rhizome_finish_write(struct rhizome_write *write){ int rhizome_finish_write(struct rhizome_write *write){
if (write->data_size>0){
if (rhizome_flush(write))
return -1;
}
int fd = write->blob_fd; int fd = write->blob_fd;
if (fd>=0){ if (fd>=0){
if (config.debug.externalblobs) if (config.debug.externalblobs)
@ -305,9 +360,6 @@ int rhizome_finish_write(struct rhizome_write *write){
close(fd); close(fd);
write->blob_fd=-1; write->blob_fd=-1;
} }
if (write->buffer)
free(write->buffer);
write->buffer=NULL;
char hash_out[SHA512_DIGEST_STRING_LENGTH+1]; char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&write->sha512_context, hash_out); SHA512_End(&write->sha512_context, hash_out);
@ -371,6 +423,7 @@ int rhizome_finish_write(struct rhizome_write *write){
} }
if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1) if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1)
goto failure; goto failure;
write->blob_rowid=-1;
return 0; return 0;
failure: failure:
@ -645,7 +698,7 @@ int rhizome_open_decrypt_read(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhi
// the contents as we go // the contents as we go
if (rhizome_derive_key(m, bsk)) { if (rhizome_derive_key(m, bsk)) {
rhizome_read_close(read_state); rhizome_read_close(read_state);
return -1; return WHY("Unable to decrypt bundle, valid key not found");
} }
if (config.debug.rhizome) if (config.debug.rhizome)
DEBUGF("Decrypting file contents"); DEBUGF("Decrypting file contents");
@ -698,19 +751,19 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write,
if (length > write->file_length - write->file_offset) if (length > write->file_length - write->file_offset)
return WHY("Unable to pipe that much data"); return WHY("Unable to pipe that much data");
unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE];
while(length>0){ while(length>0){
int size=write->buffer_size - write->data_size; int size=RHIZOME_CRYPT_PAGE_SIZE;
if (size > length) if (size > length)
size=length; size=length;
int r = rhizome_read(read, write->buffer + write->data_size, size); int r = rhizome_read(read, buffer, size);
if (r<0) if (r<0)
return r; return r;
write->data_size+=r;
length -= r; length -= r;
DEBUGF("Piping %d bytes", r); DEBUGF("Piping %d bytes", r);
if (rhizome_flush(write)) if (rhizome_write_buffer(write, buffer, r))
return -1; return -1;
} }

View File

@ -176,10 +176,10 @@ test_HTTPTransport() {
doc_MDPTransport="Rhizome over MDP transport" doc_MDPTransport="Rhizome over MDP transport"
setup_MDPTransport() { setup_MDPTransport() {
setup_common setup_common
set_instance +B foreach_instance +A +B \
executeOk_servald config set rhizome.http.enable 0 executeOk_servald config \
set rhizome.http.enable 0
set_instance +A set_instance +A
executeOk_servald config set rhizome.http.enable 0
rhizome_add_file file1 rhizome_add_file file1
start_servald_instances +A +B start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B foreach_instance +A assert_peers_are_instances +B
@ -230,6 +230,21 @@ test_FileTransferBigMDP() {
bigfile_common_test bigfile_common_test
} }
doc_FileTransferUnreliableBigMDP="Big new bundle over unreliable MDP transport"
setup_FileTransferUnreliableBigMDP() {
setup_common
foreach_instance +A +B \
executeOk_servald config \
set rhizome.http.enable 0 \
set interfaces.1.file dummy \
set interfaces.1.drop_broadcasts 20
setup_bigfile_common
}
test_FileTransferUnreliableBigMDP() {
bigfile_common_test
}
doc_FileTransferBig="Big new bundle transfers to one node via HTTP" doc_FileTransferBig="Big new bundle transfers to one node via HTTP"
setup_FileTransferBig() { setup_FileTransferBig() {
setup_common setup_common