diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index e4ebd1e7..e7386d1a 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -173,9 +173,6 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); int count=mdp->out.payload_length-(1+16+8+8); unsigned char *bytes=&mdp->out.payload[1+16+8+8]; - if (config.debug.rhizome_rx) - DEBUGF("Received %d bytes @ 0x%"PRIx64" for %s* version 0x%"PRIx64, - count,offset,alloca_tohex(bidprefix,16),version); /* Now see if there is a slot that matches. If so, then see if the bytes are in the window, and write them. diff --git a/rhizome.h b/rhizome.h index 552307c6..72cb6d1f 100644 --- a/rhizome.h +++ b/rhizome.h @@ -383,16 +383,19 @@ int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len); int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length); - /* Rhizome file storage api */ +struct rhizome_write_buffer{ + struct rhizome_write_buffer *_next; + int64_t offset; + int buffer_size; + int data_size; + unsigned char data[0]; +}; + struct rhizome_write{ char id[SHA512_DIGEST_STRING_LENGTH+1]; char id_known; - unsigned char *buffer; - int buffer_size; - int data_size; - int64_t file_offset; int64_t file_length; @@ -403,6 +406,9 @@ struct rhizome_write{ SHA512_CTX sha512_context; int64_t blob_rowid; int blob_fd; + + struct rhizome_write_buffer *out_of_order; + int total_data_size; }; struct rhizome_read{ @@ -674,7 +680,8 @@ int unpack_http_response(char *response, struct http_response_parts *parts); int rhizome_exists(const char *fileHash); int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int64_t file_length, int priority); -int rhizome_flush(struct rhizome_write *write); +int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffer, int data_size); +int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsigned char *buffer, int data_size); int rhizome_write_file(struct rhizome_write *write, const char *filename); int rhizome_fail_write(struct rhizome_write *write); int rhizome_finish_write(struct rhizome_write *write); diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 889f963c..45e118d1 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -77,13 +77,12 @@ struct rhizome_fetch_slot { /* MDP transport specific elements */ unsigned char bid[RHIZOME_MANIFEST_ID_BYTES]; int64_t bidVersion; - int bidP; - unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; int prefix_length; int mdpIdleTimeout; + time_ms_t mdp_last_request_time; + uint64_t mdp_last_request_offset; int mdpResponsesOutstanding; int mdpRXBlockLength; - uint32_t mdpRXBitmap; unsigned char mdpRXWindow[32*200]; }; @@ -141,17 +140,16 @@ int rhizome_active_fetch_count() int rhizome_active_fetch_bytes_received(int q) { - if (q<0) return -1; - if (q>=NQUEUES) return -1; + if (q<0 || q>=NQUEUES) return -1; if (rhizome_fetch_queues[q].active.state==RHIZOME_FETCH_FREE) return -1; - return (int)rhizome_fetch_queues[q].active.write_state.file_offset + rhizome_fetch_queues[q].active.write_state.data_size; + return (int)rhizome_fetch_queues[q].active.write_state.file_offset; } int rhizome_fetch_queue_bytes(){ int i,j,bytes=0; for(i=0;ifileLength - received; } for (j=0;jSlot %d, ", i); if (q->active.state!=RHIZOME_FETCH_FREE){ - strbuf_sprintf(b, "%lld[+%d] of %lld", + strbuf_sprintf(b, "%lld of %lld", q->active.write_state.file_offset, - q->active.write_state.data_size, q->active.manifest->fileLength); }else{ strbuf_puts(b, "inactive"); @@ -413,14 +410,36 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) slot->write_state.blob_rowid=-1; if (slot->manifest) { + bcopy(slot->manifest->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES); + slot->prefix_length=RHIZOME_MANIFEST_ID_BYTES; + slot->bidVersion=slot->manifest->version; + /* Don't provide a filename, because we will stream the file straight into + the database. */ + slot->manifest->dataFileName = NULL; + slot->manifest->dataFileUnlinkOnFree = 0; + + strbuf r = strbuf_local(slot->request, sizeof slot->request); + strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", slot->manifest->fileHexHash); + if (strbuf_overrun(r)) + RETURN(WHY("request overrun")); + slot->request_len = strbuf_len(r); + if (rhizome_open_write(&slot->write_state, slot->manifest->fileHexHash, slot->manifest->fileLength, RHIZOME_PRIORITY_DEFAULT)) RETURN(-1); } else { + strbuf r = strbuf_local(slot->request, sizeof slot->request); + strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(slot->bid, slot->prefix_length)); + if (strbuf_overrun(r)) + RETURN(WHY("request overrun")); + slot->request_len = strbuf_len(r); + + slot->manifest_bytes=0; slot->write_state.file_offset=0; slot->write_state.file_length=-1; } slot->request_ofs = 0; + slot->state = RHIZOME_FETCH_CONNECTING; slot->alarm.function = rhizome_fetch_poll; fetch_stats.name = "rhizome_fetch_poll"; @@ -473,7 +492,6 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) bail_http: /* Fetch via overlay, either because no IP address was provided, or because the connection/attempt to fetch via HTTP failed. */ - slot->state=RHIZOME_FETCH_RXFILEMDP; rhizome_fetch_switch_to_mdp(slot); RETURN(0); OUT(); @@ -608,36 +626,13 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct RETURN(IMPORTED); } - // Start the fetch. - //dump("peerip", peerip, sizeof *peerip); - - /* Prepare for fetching via HTTP */ + /* Prepare for fetching */ slot->peer_ipandport = *peerip; - slot->alarm.poll.fd=-1; - - strbuf r = strbuf_local(slot->request, sizeof slot->request); - strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash); - if (strbuf_overrun(r)) - RETURN(WHY("request overrun")); - slot->request_len = strbuf_len(r); - - /* Prepare for fetching via MDP */ bcopy(peersid,slot->peer_sid,SID_SIZE); - bcopy(m->cryptoSignPublic,slot->bid,RHIZOME_MANIFEST_ID_BYTES); - slot->bidVersion=m->version; - slot->bidP=1; - - /* Don't provide a filename, because we will stream the file straight into - the database. */ - m->dataFileName = NULL; - m->dataFileUnlinkOnFree = 0; slot->manifest = m; + if (schedule_fetch(slot) == -1) RETURN(-1); - if (config.debug.rhizome_rx) - DEBUGF(" started fetch bid %s version 0x%"PRIx64" into %s, slot=%d filehash=%s", - alloca_tohex_bid(slot->bid), slot->bidVersion, - alloca_str_toprint(slot->manifest->dataFileName), slotno(slot), m->fileHexHash); RETURN(STARTED); } @@ -658,24 +653,14 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, /* Prepare for fetching via HTTP */ slot->peer_ipandport = *peerip; slot->manifest = NULL; - strbuf r = strbuf_local(slot->request, sizeof slot->request); - strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length)); - if (strbuf_overrun(r)) - return WHY("request overrun"); - slot->request_len = strbuf_len(r); - - /* Prepare for fetching via MDP */ bcopy(peersid,slot->peer_sid,SID_SIZE); - bcopy(prefix,slot->prefix,prefix_length); + bcopy(prefix,slot->bid,prefix_length); slot->prefix_length=prefix_length; - slot->bidP=0; /* Don't stream into a file blob in the database, because it is a manifest. We do need to cache it in the slot structure, though, and then offer it for inserting into the database, but we can avoid the temporary file in the process. */ - slot->write_state.blob_rowid=-1; - slot->manifest_bytes=0; if (schedule_fetch(slot) == -1) { return -1; @@ -950,7 +935,8 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) rhizome_manifest_free(slot->manifest); slot->manifest = NULL; - if (slot->write_state.buffer) + if (slot->write_state.blob_fd>=0 || + slot->write_state.blob_rowid>=0) rhizome_fail_write(&slot->write_state); // Release the fetch slot. @@ -972,14 +958,14 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) if (now-slot->last_write_time>slot->mdpIdleTimeout) { DEBUGF("MDP connection timed out: last RX %lldms ago (read %"PRId64" of %"PRId64" bytes)", now-slot->last_write_time, - slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length); + slot->write_state.file_offset, slot->write_state.file_length); rhizome_fetch_close(slot); OUT(); return; } if (config.debug.rhizome_rx) DEBUGF("Timeout: Resending request for slot=0x%p (%"PRId64" of %"PRId64" received)", - slot,slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length); + slot,slot->write_state.file_offset, slot->write_state.file_length); rhizome_fetch_mdp_requestblocks(slot); OUT(); } @@ -1023,22 +1009,41 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) mdp.out.payload_length=RHIZOME_MANIFEST_ID_BYTES+8+8+4+2; bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); - write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES],slot->bidVersion); - write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8],slot->write_state.file_offset + slot->write_state.data_size); - write_uint32(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8],slot->mdpRXBitmap); - write_uint16(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4],slot->mdpRXBlockLength); + uint32_t bitmap=0; + int requests=32; + int i; + struct rhizome_write_buffer *p = slot->write_state.out_of_order; + uint64_t offset = slot->write_state.file_offset; + for (i=0;i<32;i++){ + while(p && p->offset + p->data_size < offset) + p=p->_next; + if (!p) + break; + if (p->offset <= offset && p->offset+p->data_size >= offset+slot->mdpRXBlockLength){ + bitmap |= 1<<(31-i); + requests --; + } + offset+=slot->mdpRXBlockLength; + } + + write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES], slot->bidVersion); + write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8], slot->write_state.file_offset); + write_uint32(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8], bitmap); + write_uint16(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4], slot->mdpRXBlockLength); if (config.debug.rhizome_tx) DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%"PRIx64, - alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), - slot->write_state.file_offset + slot->write_state.data_size); + alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), + slot->write_state.file_offset); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); // remember when we sent the request so that we can adjust the inter-request // interval based on how fast the packets arrive. - slot->mdpResponsesOutstanding=32; // TODO: set according to bitmap - + slot->mdpResponsesOutstanding=requests; + slot->mdp_last_request_offset = slot->write_state.file_offset; + slot->mdp_last_request_time = gettime_ms(); + rhizome_fetch_mdp_touch_timeout(slot); RETURN(0); @@ -1056,7 +1061,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) or with a temporary generated SID, so that we don't end up with two instances with the same SID. */ - IN() + IN(); if (!my_subscriber) { DEBUGF("I don't have an identity, so we cannot fall back to MDP"); RETURN(rhizome_fetch_close(slot)); @@ -1096,9 +1101,8 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) transport. */ slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds - slot->mdpRXBitmap=0x00000000; // no blocks received yet slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size - rhizome_fetch_mdp_requestblocks(slot); + rhizome_fetch_mdp_requestblocks(slot); RETURN(0); OUT(); @@ -1136,7 +1140,76 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) return; } -int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) +int rhizome_write_complete(struct rhizome_fetch_slot *slot) +{ + IN(); + + if (slot->manifest) { + if (slot->write_state.file_offset < slot->write_state.file_length) + RETURN(0); + + // Were fetching payload, now we have it. + if (config.debug.rhizome_rx) + DEBUGF("Received all of file via rhizome -- now to import it"); + + if (rhizome_finish_write(&slot->write_state)){ + rhizome_fetch_close(slot); + RETURN(-1); + } + + if (rhizome_import_received_bundle(slot->manifest)){ + rhizome_fetch_close(slot); + RETURN(-1); + } + + if (slot->state==RHIZOME_FETCH_RXFILE) { + char buf[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) { + buf[0] = '*'; + buf[1] = '\0'; + } + INFOF("Completed http request from %s:%u for file %s", + buf, ntohs(slot->peer_ipandport.sin_port), + slot->manifest->fileHexHash); + } else { + INFOF("Completed MDP request from %s for file %s", + alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash); + } + } else { + /* This was to fetch the manifest, so now fetch the file if needed */ + if (config.debug.rhizome_rx) + DEBUGF("Received a manifest in response to supplying a manifest prefix."); + /* Read the manifest and add it to suggestion queue, then immediately + call schedule queued items. */ + rhizome_manifest *m = rhizome_new_manifest(); + if (m) { + if (rhizome_read_manifest_file(m, slot->manifest_buffer, + slot->manifest_bytes) == -1) { + DEBUGF("Couldn't read manifest"); + rhizome_manifest_free(m); + } else { + if (config.debug.rhizome_rx){ + DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic)); + dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport)); + dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid)); + } + rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport, + slot->peer_sid); + } + } + } + + if (config.debug.rhizome_rx) + DEBUGF("Closing rhizome fetch slot = 0x%p. Received %lld bytes in %lldms (%lldKB/sec).", + slot,(long long)slot->write_state.file_offset, + (long long)gettime_ms()-slot->start_time, + (long long)(slot->write_state.file_offset)/(gettime_ms()-slot->start_time)); + + rhizome_fetch_close(slot); + RETURN(-1); +} + +int rhizome_write_content(struct rhizome_fetch_slot *slot, unsigned char *buffer, int bytes) { IN(); @@ -1145,8 +1218,8 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt // Truncate to known length of file (handy for reading from journal bundles that // might grow while we are reading from them). - if (bytes>(slot->write_state.file_length-(slot->write_state.file_offset+slot->write_state.data_size))){ - bytes=slot->write_state.file_length-(slot->write_state.file_offset+slot->write_state.data_size); + if (bytes>(slot->write_state.file_length-(slot->write_state.file_offset))){ + bytes=slot->write_state.file_length-(slot->write_state.file_offset); } if (!slot->manifest){ @@ -1159,88 +1232,15 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt } else { /* We are reading a file. Stream it into the database. */ - int ofs=0; - while (ofs slot->write_state.buffer_size - slot->write_state.data_size) - block_size = slot->write_state.buffer_size - slot->write_state.data_size; - - if (block_size>0){ - bcopy(buffer+ofs, slot->write_state.buffer + slot->write_state.data_size, block_size); - slot->write_state.data_size+=block_size; - ofs+=block_size; - } - - if (slot->write_state.data_size>=slot->write_state.buffer_size){ - int ret = rhizome_flush(&slot->write_state); - if (ret!=0){ - rhizome_fetch_close(slot); - RETURN(-1); - } - } + if (rhizome_write_buffer(&slot->write_state, buffer, bytes)){ + rhizome_fetch_close(slot); + RETURN(-1); } + } slot->last_write_time=gettime_ms(); - if (slot->write_state.file_offset + slot->write_state.data_size>=slot->write_state.file_length) { - /* got all of file */ - if (config.debug.rhizome_rx) - DEBUGF("Received all of file via rhizome -- now to import it"); - if (slot->manifest) { - - // Were fetching payload, now we have it. - if (rhizome_finish_write(&slot->write_state)){ - rhizome_fetch_close(slot); - RETURN(-1); - } - - if (!rhizome_import_received_bundle(slot->manifest)){ - if (slot->state==RHIZOME_FETCH_RXFILE) { - char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &slot->peer_ipandport.sin_addr, buf, sizeof buf) == NULL) { - buf[0] = '*'; - buf[1] = '\0'; - } - INFOF("Completed http request from %s:%u for file %s", - buf, ntohs(slot->peer_ipandport.sin_port), - slot->manifest->fileHexHash); - } else { - INFOF("Completed MDP request from %s for file %s", - alloca_tohex_sid(slot->peer_sid), slot->manifest->fileHexHash); - } - } - } else { - /* This was to fetch the manifest, so now fetch the file if needed */ - if (config.debug.rhizome_rx) - DEBUGF("Received a manifest in response to supplying a manifest prefix."); - /* Read the manifest and add it to suggestion queue, then immediately - call schedule queued items. */ - rhizome_manifest *m = rhizome_new_manifest(); - if (m) { - if (rhizome_read_manifest_file(m, slot->manifest_buffer, - slot->manifest_bytes) == -1) { - DEBUGF("Couldn't read manifest"); - rhizome_manifest_free(m); - } else { - if (config.debug.rhizome_rx){ - DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic)); - dump("slot->peerip",&slot->peer_ipandport,sizeof(slot->peer_ipandport)); - dump("slot->peersid",&slot->peer_sid,sizeof(slot->peer_sid)); - } - rhizome_suggest_queue_manifest_import(m, &slot->peer_ipandport, - slot->peer_sid); - } - } - } - if (config.debug.rhizome_rx) - DEBUGF("Closing rhizome fetch slot = 0x%p. Received %lld bytes in %lldms (%lldKB/sec). Buffer size = %d", - slot,(long long)slot->write_state.file_offset+slot->write_state.data_size, - (long long)gettime_ms()-slot->start_time, - (long long)(slot->write_state.file_offset+slot->write_state.data_size)/(gettime_ms()-slot->start_time), - slot->write_state.buffer_size); - rhizome_fetch_close(slot); - RETURN(-1); - } + RETURN(rhizome_write_complete(slot)); // slot is still open RETURN(0); @@ -1255,32 +1255,33 @@ int rhizome_received_content(unsigned char *bidprefix, int i; for(i=0;istate==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) { - if (!memcmp(slot->bid,bidprefix,16)) - { - if (slot->write_state.file_offset + slot->write_state.data_size==offset) { - if (!rhizome_write_content(slot,(char *)bytes,count)) - { - rhizome_fetch_mdp_touch_timeout(slot); - slot->mdpResponsesOutstanding--; - if (slot->mdpResponsesOutstanding==0) { - // We have received all responses, so immediately ask for more - rhizome_fetch_mdp_requestblocks(slot); - } - - // TODO: Try flushing out stuck packets that we have kept due to - // packet loss / out-of-order delivery. - } - RETURN(0); - } else { - // TODO: Implement out-of-order buffering so that lost packets - // don't cause wastage - } - RETURN(0); - } + if (slot->state!=RHIZOME_FETCH_RXFILEMDP + || version != slot->bidVersion + || memcmp(slot->bid,bidprefix,16)!=0) + continue; + + if (rhizome_random_write(&slot->write_state, offset, bytes, count)){ + if (config.debug.rhizome) + DEBUGF("Write failed!"); + RETURN (-1); } - } + + if (rhizome_write_complete(slot)){ + if (config.debug.rhizome) + DEBUGF("Complete failed!"); + RETURN(-1); + } + slot->last_write_time=gettime_ms(); + rhizome_fetch_mdp_touch_timeout(slot); + + slot->mdpResponsesOutstanding--; + if (slot->mdpResponsesOutstanding==0) { + // We have received all responses, so immediately ask for more + rhizome_fetch_mdp_requestblocks(slot); + } + RETURN(0); + } RETURN(-1); OUT(); @@ -1302,7 +1303,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) switch (slot->state) { case RHIZOME_FETCH_RXFILE: { /* Keep reading until we have the promised amount of data */ - char buffer[8192]; + unsigned char buffer[8192]; sigPipeFlag = 0; int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); /* If we got some data, see if we have found the end of the HTTP request */ @@ -1317,7 +1318,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) } else { if (config.debug.rhizome_rx) DEBUGF("Empty read, closing connection: received %"PRId64" of %"PRId64" bytes", - slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length); + slot->write_state.file_offset,slot->write_state.file_length); rhizome_fetch_switch_to_mdp(slot); return; } @@ -1375,7 +1376,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) slot->state = RHIZOME_FETCH_RXFILE; int content_bytes = slot->request + slot->request_len - parts.content_start; if (content_bytes > 0){ - rhizome_write_content(slot, parts.content_start, content_bytes); + rhizome_write_content(slot, (unsigned char*)parts.content_start, content_bytes); // reset inactivity timeout unschedule(&slot->alarm); slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout; diff --git a/rhizome_store.c b/rhizome_store.c index 4e1d3cc2..4f436b2c 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -123,15 +123,6 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6 SHA512_Init(&write->sha512_context); - write->buffer_size=write->file_length; - - if (write->buffer_size>RHIZOME_BUFFER_MAXIMUM_SIZE) - write->buffer_size=RHIZOME_BUFFER_MAXIMUM_SIZE; - - write->buffer=malloc(write->buffer_size); - if (!write->buffer) - return WHY("Unable to allocate write buffer"); - return 0; } @@ -213,18 +204,88 @@ int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffe write_state->file_offset+=data_size; if (config.debug.rhizome) DEBUGF("Written %"PRId64" of %"PRId64, write_state->file_offset, write_state->file_length); - RETURN(data_size); + RETURN(0); OUT(); } -/* Write write_state->buffer into the store - Note that we don't support random writes as the contents must be hashed in order - But we don't enforce linear writes yet. */ -int rhizome_flush(struct rhizome_write *write_state){ - int wrote = rhizome_write_buffer(write_state, write_state->buffer, write_state->data_size); - if (wrote == write_state->data_size) - write_state->data_size=0; - return wrote>=0?0:-1; +int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsigned char *buffer, int data_size){ + // search the out of order buffer list for the insert position + struct rhizome_write_buffer **ptr = &write_state->out_of_order; + if (offset + data_size > write_state->file_length) + data_size = write_state->file_length - offset; + if (data_size<=0) + return 0; + int64_t last_offset = write_state->file_offset; + while(1){ + // if existing data can be written, write it now + if (*ptr && (*ptr)->offset == write_state->file_offset){ + struct rhizome_write_buffer *n=*ptr; + if (config.debug.rhizome) + DEBUGF("Writing caching block %"PRId64", %d", n->offset, n->data_size); + if (rhizome_write_buffer(write_state, n->data, n->data_size)) + return -1; + + last_offset = write_state->file_offset; + *ptr=n->_next; + free(n); + continue; + } + + if (offset < last_offset){ + int64_t delta = last_offset - offset; + if (delta >= data_size) + return 0; + data_size -= delta; + offset+=delta; + buffer+=delta; + } + + if (data_size<=0) + return 0; + + if (!*ptr || offset < (*ptr)->offset){ + // found the insert position in the list + int64_t size = data_size; + + // allow for buffers to overlap, we may need to split the incoming buffer into multiple pieces. + if (*ptr && offset+size > (*ptr)->offset) + size = (*ptr)->offset - offset; + + if (offset == write_state->file_offset){ + if (rhizome_write_buffer(write_state, buffer, size)) + return -1; + // we need to go around the loop again to re-test if this buffer can now be written + }else{ + // impose a limit on the total amount of cached data + if (write_state->total_data_size + size > RHIZOME_BUFFER_MAXIMUM_SIZE) + size = RHIZOME_BUFFER_MAXIMUM_SIZE - write_state->total_data_size; + if (size<=0) + return 0; + + if (config.debug.rhizome) + DEBUGF("Caching block @%"PRId64", %"PRId64" received out of order", offset, size); + struct rhizome_write_buffer *i = emalloc(size + sizeof(struct rhizome_write_buffer)); + if (!i) + return -1; + i->offset = offset; + i->buffer_size = i->data_size = size; + bcopy(buffer, i->data, size); + i->_next = *ptr; + write_state->total_data_size += size; + *ptr = i; + // if there's any overlap of this buffer and the current one, we may need to add another buffer. + ptr = &((*ptr)->_next); + } + data_size -= size; + offset+=size; + buffer+=size; + continue; + } + + last_offset = (*ptr)->offset + (*ptr)->data_size; + ptr = &((*ptr)->_next); + } + return 0; } /* Expects file to be at least file_length in size, ignoring anything longer than that */ @@ -232,29 +293,29 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename){ FILE *f = fopen(filename, "r"); if (!f) return WHY_perror("fopen"); - + + unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE]; + int ret=0; while(write->file_offset < write->file_length){ - int size=write->buffer_size - write->data_size; + int size=RHIZOME_CRYPT_PAGE_SIZE; if (write->file_offset + size > write->file_length) size=write->file_length - write->file_offset; - int r = fread(write->buffer + write->data_size, 1, size, f); + int r = fread(buffer, 1, size, f); if (r==-1){ - WHY_perror("fread"); - fclose(f); - return -1; + ret = WHY_perror("fread"); + goto end; } - write->data_size+=r; DEBUGF("Read %d from file", r); - if (rhizome_flush(write)){ - fclose(f); - return -1; + if (rhizome_write_buffer(write, buffer, r)){ + ret=-1; + goto end; } } - +end: fclose(f); - return 0; + return ret; } int rhizome_store_delete(const char *id){ @@ -270,10 +331,6 @@ int rhizome_store_delete(const char *id){ } int rhizome_fail_write(struct rhizome_write *write){ - if (write->buffer) - free(write->buffer); - write->buffer=NULL; - if (write->blob_fd>=0){ if (config.debug.externalblobs) DEBUGF("Closing and removing fd %d", write->blob_fd); @@ -281,23 +338,21 @@ int rhizome_fail_write(struct rhizome_write *write){ write->blob_fd=-1; rhizome_store_delete(write->id); } - + // don't worry too much about sql failures. sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; - if (!config.rhizome.external_blobs) + if (write->blob_rowid>=0){ sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE rowid=%lld",write->blob_rowid); + write->blob_rowid=-1; + } sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILES WHERE id='%s'", write->id); - return 0; + return 0; } int rhizome_finish_write(struct rhizome_write *write){ - if (write->data_size>0){ - if (rhizome_flush(write)) - return -1; - } int fd = write->blob_fd; if (fd>=0){ if (config.debug.externalblobs) @@ -305,9 +360,6 @@ int rhizome_finish_write(struct rhizome_write *write){ close(fd); write->blob_fd=-1; } - if (write->buffer) - free(write->buffer); - write->buffer=NULL; char hash_out[SHA512_DIGEST_STRING_LENGTH+1]; SHA512_End(&write->sha512_context, hash_out); @@ -371,6 +423,7 @@ int rhizome_finish_write(struct rhizome_write *write){ } if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1) goto failure; + write->blob_rowid=-1; return 0; failure: @@ -645,7 +698,7 @@ int rhizome_open_decrypt_read(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhi // the contents as we go if (rhizome_derive_key(m, bsk)) { rhizome_read_close(read_state); - return -1; + return WHY("Unable to decrypt bundle, valid key not found"); } if (config.debug.rhizome) DEBUGF("Decrypting file contents"); @@ -698,19 +751,19 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write, if (length > write->file_length - write->file_offset) return WHY("Unable to pipe that much data"); + unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE]; while(length>0){ - int size=write->buffer_size - write->data_size; + int size=RHIZOME_CRYPT_PAGE_SIZE; if (size > length) size=length; - int r = rhizome_read(read, write->buffer + write->data_size, size); + int r = rhizome_read(read, buffer, size); if (r<0) return r; - write->data_size+=r; length -= r; DEBUGF("Piping %d bytes", r); - if (rhizome_flush(write)) + if (rhizome_write_buffer(write, buffer, r)) return -1; } diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 7ec33cac..468349ed 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -176,10 +176,10 @@ test_HTTPTransport() { doc_MDPTransport="Rhizome over MDP transport" setup_MDPTransport() { setup_common - set_instance +B - executeOk_servald config set rhizome.http.enable 0 + foreach_instance +A +B \ + executeOk_servald config \ + set rhizome.http.enable 0 set_instance +A - executeOk_servald config set rhizome.http.enable 0 rhizome_add_file file1 start_servald_instances +A +B foreach_instance +A assert_peers_are_instances +B @@ -230,6 +230,21 @@ test_FileTransferBigMDP() { bigfile_common_test } +doc_FileTransferUnreliableBigMDP="Big new bundle over unreliable MDP transport" +setup_FileTransferUnreliableBigMDP() { + setup_common + foreach_instance +A +B \ + executeOk_servald config \ + set rhizome.http.enable 0 \ + set interfaces.1.file dummy \ + set interfaces.1.drop_broadcasts 20 + setup_bigfile_common +} +test_FileTransferUnreliableBigMDP() { + bigfile_common_test +} + + doc_FileTransferBig="Big new bundle transfers to one node via HTTP" setup_FileTransferBig() { setup_common