From c739555e8c883750ad25e67adb2762260dc0433e Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 1 Mar 2017 10:36:53 +1030 Subject: [PATCH] Ensure we call rhizome_fail_write when the store was busy --- rhizome.c | 2 ++ rhizome_store.c | 14 +++++++++++--- rhizome_sync_keys.c | 45 +++++++++++++++++++++------------------------ 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/rhizome.c b/rhizome.c index fda7971a..578fa231 100644 --- a/rhizome.c +++ b/rhizome.c @@ -522,6 +522,8 @@ enum rhizome_bundle_status rhizome_bundle_import_files(rhizome_manifest *m, rhiz } pstatus = rhizome_finish_write(&write); + if (pstatus == RHIZOME_PAYLOAD_STATUS_BUSY) + rhizome_fail_write(&write); } } diff --git a/rhizome_store.c b/rhizome_store.c index 7f907bf0..c36869c3 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -908,8 +908,12 @@ enum rhizome_payload_status rhizome_import_payload_from_file(rhizome_manifest *m rhizome_fail_write(&write); return RHIZOME_PAYLOAD_STATUS_ERROR; } - - return rhizome_finish_write(&write); + + status = rhizome_finish_write(&write); + if (status == RHIZOME_PAYLOAD_STATUS_BUSY) + rhizome_fail_write(&write); + + return status; } // store a whole payload from a single buffer @@ -938,7 +942,11 @@ enum rhizome_payload_status rhizome_import_buffer(rhizome_manifest *m, uint8_t * return RHIZOME_PAYLOAD_STATUS_ERROR; } - return rhizome_finish_write(&write); + status = rhizome_finish_write(&write); + if (status == RHIZOME_PAYLOAD_STATUS_BUSY) + rhizome_fail_write(&write); + + return status; } /* Checks the size of the file with the given path as a candidate payload for an existing manifest. diff --git a/rhizome_sync_keys.c b/rhizome_sync_keys.c index 3f78d25f..fce1b2b3 100644 --- a/rhizome_sync_keys.c +++ b/rhizome_sync_keys.c @@ -256,30 +256,21 @@ static void sync_lookup_bar(struct subscriber *peer, struct rhizome_sync_keys *s struct transfers *transfer = *ptr; enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(transfer->key.key, sizeof(sync_key_t), m); - switch(status){ - case RHIZOME_BUNDLE_STATUS_SAME: - break; - case RHIZOME_BUNDLE_STATUS_BUSY: - case RHIZOME_BUNDLE_STATUS_NEW: - // TODO We don't have this bundle anymore! + if (status == RHIZOME_BUNDLE_STATUS_SAME){ + int rank = sync_manifest_rank(m, peer, 1, 0); - default: - goto end; + *ptr = transfer->next; + struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, &transfer->key, STATE_SEND_BAR, rank); + + if (send_bar){ + rhizome_manifest_to_bar(m, &send_bar->bar); + free(transfer); + }else{ + *ptr = transfer; + } } - int rank = sync_manifest_rank(m, peer, 1, 0); - - *ptr = transfer->next; - struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, &transfer->key, STATE_SEND_BAR, rank); - free(transfer); - - if (!send_bar) - goto end; - - rhizome_manifest_to_bar(m, &send_bar->bar); - -end: rhizome_manifest_free(m); } @@ -729,8 +720,9 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn if (add_status == RHIZOME_BUNDLE_STATUS_BUSY){ // don't consume the payload rhizome_manifest_free(m); - ob_rewind(payload); + rhizome_fail_write(write); free(write); + ob_rewind(payload); return; } DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)", @@ -741,8 +733,9 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn case RHIZOME_PAYLOAD_STATUS_BUSY: // don't consume the payload rhizome_manifest_free(m); - ob_rewind(payload); + rhizome_fail_write(write); free(write); + ob_rewind(payload); return; default: @@ -752,6 +745,7 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn if (status!=RHIZOME_PAYLOAD_STATUS_NEW){ rhizome_manifest_free(m); + rhizome_fail_write(write); free(write); break; } @@ -776,13 +770,16 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn if (write->file_offset >= m->filesize){ // no new content in the new version, we can import now enum rhizome_payload_status status = rhizome_finish_write(write); + DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status); - free(write); + if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){ enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(m, NULL); DEBUGF(rhizome_sync_keys, "Import %s = %s", alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state)); - } + } else + rhizome_fail_write(write); + free(write); rhizome_manifest_free(m); break; }