From e32211485092455cb65470ce73a591afe5803a67 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Thu, 15 Aug 2013 13:53:37 +0930 Subject: [PATCH] Only transfer new journal content via MDP --- rhizome.h | 1 + rhizome_fetch.c | 65 ++++++++++++++++++++++++++++++++++++++++--- rhizome_store.c | 25 ++++++++++------- tests/rhizomeprotocol | 29 +++++++++++++++++-- 4 files changed, 104 insertions(+), 16 deletions(-) diff --git a/rhizome.h b/rhizome.h index e8caec0f..9bcbe16c 100644 --- a/rhizome.h +++ b/rhizome.h @@ -709,6 +709,7 @@ int rhizome_derive_key(rhizome_manifest *m, rhizome_bk_t *bsk); int rhizome_open_write_journal(rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, uint64_t new_size); int rhizome_append_journal_buffer(rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, unsigned char *buffer, int len); int rhizome_append_journal_file(rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, const char *filename); +int rhizome_journal_pipe(struct rhizome_write *write, const char *fileHash, uint64_t start_offset, uint64_t length); int rhizome_crypt_xor_block(unsigned char *buffer, int buffer_size, int64_t stream_offset, const unsigned char *key, const unsigned char *nonce); diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 0bf3d20a..2a764e44 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -62,13 +62,14 @@ struct rhizome_fetch_slot { /* Keep track of how much of the file we have read */ struct rhizome_write write_state; - int64_t last_write_time; - int64_t start_time; + time_ms_t last_write_time; + time_ms_t start_time; /* HTTP transport specific elements */ char request[1024]; int request_len; int request_ofs; + rhizome_manifest *previous; /* HTTP streaming reception of manifests */ char manifest_buffer[1024]; @@ -399,6 +400,7 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m) return rhizome_add_manifest(m, m->ttl - 1 /* TTL */); } +// begin fetching a bundle static int schedule_fetch(struct rhizome_fetch_slot *slot) { IN(); @@ -417,9 +419,29 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) the database. */ slot->manifest->dataFileName = NULL; slot->manifest->dataFileUnlinkOnFree = 0; - + strbuf r = strbuf_local(slot->request, sizeof slot->request); - strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", slot->manifest->fileHexHash); + strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n", slot->manifest->fileHexHash); + + if (slot->manifest->journalTail>=0){ + // if we're fetching a journal bundle, work out how many bytes we have of a previous version + // and therefore what range of bytes we should ask for + slot->previous = rhizome_new_manifest(); + const char *id = rhizome_manifest_get(slot->manifest, "id", NULL, 0); + if (rhizome_retrieve_manifest(id, slot->previous)){ + rhizome_manifest_free(slot->previous); + slot->previous=NULL; + + // check that the new journal is valid and has some overlapping bytes + }else if (slot->previous->journalTail > slot->manifest->journalTail + || slot->previous->fileLength + slot->previous->journalTail < slot->manifest->journalTail){ + rhizome_manifest_free(slot->previous); + slot->previous=NULL; + } + } + + strbuf_puts(r, "\r\n"); + if (strbuf_overrun(r)) RETURN(WHY("request overrun")); slot->request_len = strbuf_len(r); @@ -935,6 +957,10 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) rhizome_manifest_free(slot->manifest); slot->manifest = NULL; + if (slot->previous) + rhizome_manifest_free(slot->previous); + slot->previous = NULL; + if (slot->write_state.blob_fd>=0 || slot->write_state.blob_rowid>=0) rhizome_fail_write(&slot->write_state); @@ -1050,6 +1076,34 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) OUT(); } +static int pipe_journal(struct rhizome_fetch_slot *slot){ + if (!slot->previous) + return 0; + + /* we need to work out the overlapping range that we can copy from the previous version + * then we can start to transfer only the new content in the journal + * old; [ tail |0 length] + * new; [ tail |0 length] + * [ | written | overlap | new content] + */ + + uint64_t start = slot->manifest->journalTail - slot->previous->journalTail + slot->write_state.file_offset; + uint64_t length = slot->previous->fileLength - slot->manifest->journalTail - slot->write_state.file_offset; + + // of course there might not be any overlap + if (start>=0 && start < slot->previous->fileLength && length>0){ + if (config.debug.rhizome) + DEBUGF("Copying %"PRId64" bytes from previous journal", length); + rhizome_journal_pipe(&slot->write_state, slot->previous->fileHexHash, + start, length); + } + + // and we don't need to do this again, so drop the manifest + rhizome_manifest_free(slot->previous); + slot->previous=NULL; + return 0; +} + static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) { /* In Rhizome Direct we use the same fetch slot system, but we aren't actually @@ -1088,6 +1142,9 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) slot->state=RHIZOME_FETCH_RXFILEMDP; slot->last_write_time=gettime_ms(); + + pipe_journal(slot); + /* We are requesting a file. The http request may have already received some of the file, so take that into account when setting up ring buffer. Then send the request for the next block of data, and set our alarm to diff --git a/rhizome_store.c b/rhizome_store.c index 92a674a6..f398579b 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -952,6 +952,19 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write, return 0; } +int rhizome_journal_pipe(struct rhizome_write *write, const char *fileHash, uint64_t start_offset, uint64_t length) +{ + struct rhizome_read read_state; + bzero(&read_state, sizeof read_state); + if (rhizome_open_read(&read_state, fileHash, start_offset>0?0:1)) + return -1; + + read_state.offset = start_offset; + int ret = rhizome_pipe(&read_state, write, length); + rhizome_read_close(&read_state); + return ret; +} + // open an existing journal bundle, advance the head pointer, duplicate the existing content and get ready to add more. int rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, uint64_t new_size) { @@ -979,16 +992,8 @@ int rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, goto failure; if (copy_length>0){ - struct rhizome_read read_state; - bzero(&read_state, sizeof read_state); - // don't bother to decrypt the existing journal payload - ret = rhizome_open_read(&read_state, m->fileHexHash, advance_by>0?0:1); - if (ret) - goto failure; - - read_state.offset = advance_by; - ret = rhizome_pipe(&read_state, write, copy_length); - rhizome_read_close(&read_state); + // note that we don't need to bother decrypting the existing journal payload + ret = rhizome_journal_pipe(write, m->fileHexHash, advance_by, copy_length); if (ret) goto failure; } diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index a73f5807..45a38140 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -154,9 +154,9 @@ doc_HTTPTransport="Rhizome over HTTP transport" setup_HTTPTransport() { setup_common set_instance +B - executeOk_servald config set rhizome.mdp.enable 0 + executeOk_servald config set rhizome.mdp.enable 0 set_instance +A - executeOk_servald config set rhizome.mdp.enable 0 + executeOk_servald config set rhizome.mdp.enable 0 rhizome_add_file file1 start_servald_instances +A +B foreach_instance +A assert_peers_are_instances +B @@ -202,6 +202,31 @@ test_UnicastTransfer() { receive_and_update_bundle } +doc_journalMDP="Transfer and update a journal bundle via MDP" +setup_journalMDP() { + setup_common + foreach_instance +A +B \ + executeOk_servald config \ + set rhizome.http.enable 0 + set_instance +A + create_file file1 64 + create_file file2 64 + executeOk_servald rhizome journal append $SIDA "" file1 + extract_stdout_manifestid BID + extract_stdout_version VERSION + start_servald_instances +A +B + foreach_instance +A assert_peers_are_instances +B + foreach_instance +B assert_peers_are_instances +A +} +test_journalMDP() { + wait_until bundle_received_by $BID:$VERSION +B + set_instance +A + executeOk_servald rhizome journal append $SIDA $BID file2 + extract_stdout_version VERSION2 + set_instance +B + wait_until bundle_received_by $BID:$VERSION2 +B + assertGrep $instance_servald_log "Copying [0-9]\+ bytes from previous journal" +} #common setup and test routines for transferring a 1MB file setup_bigfile_common() {