Only transfer new journal content via MDP

This commit is contained in:
Jeremy Lakeman 2013-08-15 13:53:37 +09:30
parent 1051c11941
commit e322114850
4 changed files with 104 additions and 16 deletions

View File

@ -709,6 +709,7 @@ int rhizome_derive_key(rhizome_manifest *m, rhizome_bk_t *bsk);
int rhizome_open_write_journal(rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, uint64_t new_size);
int rhizome_append_journal_buffer(rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, unsigned char *buffer, int len);
int rhizome_append_journal_file(rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, const char *filename);
int rhizome_journal_pipe(struct rhizome_write *write, const char *fileHash, uint64_t start_offset, uint64_t length);
int rhizome_crypt_xor_block(unsigned char *buffer, int buffer_size, int64_t stream_offset,
const unsigned char *key, const unsigned char *nonce);

View File

@ -62,13 +62,14 @@ struct rhizome_fetch_slot {
/* Keep track of how much of the file we have read */
struct rhizome_write write_state;
int64_t last_write_time;
int64_t start_time;
time_ms_t last_write_time;
time_ms_t start_time;
/* HTTP transport specific elements */
char request[1024];
int request_len;
int request_ofs;
rhizome_manifest *previous;
/* HTTP streaming reception of manifests */
char manifest_buffer[1024];
@ -399,6 +400,7 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m)
return rhizome_add_manifest(m, m->ttl - 1 /* TTL */);
}
// begin fetching a bundle
static int schedule_fetch(struct rhizome_fetch_slot *slot)
{
IN();
@ -417,9 +419,29 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
the database. */
slot->manifest->dataFileName = NULL;
slot->manifest->dataFileUnlinkOnFree = 0;
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", slot->manifest->fileHexHash);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n", slot->manifest->fileHexHash);
if (slot->manifest->journalTail>=0){
// if we're fetching a journal bundle, work out how many bytes we have of a previous version
// and therefore what range of bytes we should ask for
slot->previous = rhizome_new_manifest();
const char *id = rhizome_manifest_get(slot->manifest, "id", NULL, 0);
if (rhizome_retrieve_manifest(id, slot->previous)){
rhizome_manifest_free(slot->previous);
slot->previous=NULL;
// check that the new journal is valid and has some overlapping bytes
}else if (slot->previous->journalTail > slot->manifest->journalTail
|| slot->previous->fileLength + slot->previous->journalTail < slot->manifest->journalTail){
rhizome_manifest_free(slot->previous);
slot->previous=NULL;
}
}
strbuf_puts(r, "\r\n");
if (strbuf_overrun(r))
RETURN(WHY("request overrun"));
slot->request_len = strbuf_len(r);
@ -935,6 +957,10 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
rhizome_manifest_free(slot->manifest);
slot->manifest = NULL;
if (slot->previous)
rhizome_manifest_free(slot->previous);
slot->previous = NULL;
if (slot->write_state.blob_fd>=0 ||
slot->write_state.blob_rowid>=0)
rhizome_fail_write(&slot->write_state);
@ -1050,6 +1076,34 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
OUT();
}
static int pipe_journal(struct rhizome_fetch_slot *slot){
if (!slot->previous)
return 0;
/* we need to work out the overlapping range that we can copy from the previous version
* then we can start to transfer only the new content in the journal
* old; [ tail |0 length]
* new; [ tail |0 length]
* [ | written | overlap | new content]
*/
uint64_t start = slot->manifest->journalTail - slot->previous->journalTail + slot->write_state.file_offset;
uint64_t length = slot->previous->fileLength - slot->manifest->journalTail - slot->write_state.file_offset;
// of course there might not be any overlap
if (start>=0 && start < slot->previous->fileLength && length>0){
if (config.debug.rhizome)
DEBUGF("Copying %"PRId64" bytes from previous journal", length);
rhizome_journal_pipe(&slot->write_state, slot->previous->fileHexHash,
start, length);
}
// and we don't need to do this again, so drop the manifest
rhizome_manifest_free(slot->previous);
slot->previous=NULL;
return 0;
}
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
{
/* In Rhizome Direct we use the same fetch slot system, but we aren't actually
@ -1088,6 +1142,9 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
slot->state=RHIZOME_FETCH_RXFILEMDP;
slot->last_write_time=gettime_ms();
pipe_journal(slot);
/* We are requesting a file. The http request may have already received
some of the file, so take that into account when setting up ring buffer.
Then send the request for the next block of data, and set our alarm to

View File

@ -952,6 +952,19 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write,
return 0;
}
int rhizome_journal_pipe(struct rhizome_write *write, const char *fileHash, uint64_t start_offset, uint64_t length)
{
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
if (rhizome_open_read(&read_state, fileHash, start_offset>0?0:1))
return -1;
read_state.offset = start_offset;
int ret = rhizome_pipe(&read_state, write, length);
rhizome_read_close(&read_state);
return ret;
}
// open an existing journal bundle, advance the head pointer, duplicate the existing content and get ready to add more.
int rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, rhizome_bk_t *bsk, uint64_t advance_by, uint64_t new_size)
{
@ -979,16 +992,8 @@ int rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m,
goto failure;
if (copy_length>0){
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
// don't bother to decrypt the existing journal payload
ret = rhizome_open_read(&read_state, m->fileHexHash, advance_by>0?0:1);
if (ret)
goto failure;
read_state.offset = advance_by;
ret = rhizome_pipe(&read_state, write, copy_length);
rhizome_read_close(&read_state);
// note that we don't need to bother decrypting the existing journal payload
ret = rhizome_journal_pipe(write, m->fileHexHash, advance_by, copy_length);
if (ret)
goto failure;
}

View File

@ -154,9 +154,9 @@ doc_HTTPTransport="Rhizome over HTTP transport"
setup_HTTPTransport() {
setup_common
set_instance +B
executeOk_servald config set rhizome.mdp.enable 0
executeOk_servald config set rhizome.mdp.enable 0
set_instance +A
executeOk_servald config set rhizome.mdp.enable 0
executeOk_servald config set rhizome.mdp.enable 0
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
@ -202,6 +202,31 @@ test_UnicastTransfer() {
receive_and_update_bundle
}
doc_journalMDP="Transfer and update a journal bundle via MDP"
setup_journalMDP() {
setup_common
foreach_instance +A +B \
executeOk_servald config \
set rhizome.http.enable 0
set_instance +A
create_file file1 64
create_file file2 64
executeOk_servald rhizome journal append $SIDA "" file1
extract_stdout_manifestid BID
extract_stdout_version VERSION
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_journalMDP() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +A
executeOk_servald rhizome journal append $SIDA $BID file2
extract_stdout_version VERSION2
set_instance +B
wait_until bundle_received_by $BID:$VERSION2 +B
assertGrep $instance_servald_log "Copying [0-9]\+ bytes from previous journal"
}
#common setup and test routines for transferring a 1MB file
setup_bigfile_common() {