From 9ec46f2279d0d294f2e3db76e9c3f4fc1feced4c Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Mon, 27 Feb 2017 15:46:39 +1030 Subject: [PATCH] Deal with some database locking during transfers with the rhizome sync keys process. Push back messages to re-process later Queue and retry both the start and end of the transfer process --- rhizome_sync_keys.c | 272 ++++++++++++++++++++++++++++---------------- 1 file changed, 176 insertions(+), 96 deletions(-) diff --git a/rhizome_sync_keys.c b/rhizome_sync_keys.c index 1e321fe1..3f78d25f 100644 --- a/rhizome_sync_keys.c +++ b/rhizome_sync_keys.c @@ -20,7 +20,7 @@ #define STATE_BAR (4) #define STATE_MANIFEST (8) -#define STATE_PAYLOAD (12) +#define STATE_PAYLOAD (0x0C) #define STATE_NONE (0) #define STATE_SEND_BAR (STATE_SEND|STATE_BAR) @@ -29,6 +29,8 @@ #define STATE_REQ_PAYLOAD (STATE_REQ|STATE_PAYLOAD) #define STATE_SEND_PAYLOAD (STATE_SEND|STATE_PAYLOAD) #define STATE_RECV_PAYLOAD (STATE_RECV|STATE_PAYLOAD) +#define STATE_COMPLETING (0x10) +#define STATE_LOOKUP_BAR (0x20) // approx size of a signed manifest #define DUMMY_MANIFEST_SIZE 256 @@ -58,6 +60,8 @@ struct rhizome_sync_keys{ struct sync_state *sync_tree=NULL; struct msp_server_state *sync_connections=NULL; +struct transfers *completing=NULL; + DEFINE_ALARM(sync_send); static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){ @@ -76,6 +80,7 @@ static const char *get_state_name(uint8_t state) case STATE_REQ_PAYLOAD: return "REQ_PAYLOAD"; case STATE_SEND_PAYLOAD: return "SEND_PAYLOAD"; case STATE_RECV_PAYLOAD: return "RECV_PAYLOAD"; + case STATE_COMPLETING: return "COMPLETING"; } return "Unknown"; } @@ -91,6 +96,7 @@ static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr) } ptr->read=NULL; break; + case STATE_COMPLETING: case STATE_RECV_PAYLOAD: if (ptr->write){ rhizome_fail_write(ptr->write); @@ -192,13 +198,98 @@ void sync_keys_status(struct sched_ent *alarm) RESCHEDULE(alarm, next, next, next); } -static void sync_send_peer(struct rhizome_sync_keys *sync_state) +static int sync_complete_transfers(){ + // attempt to finish payload transfers and write manifests to the store + while(completing){ + struct transfers *transfer = completing; + assert(transfer->state == STATE_COMPLETING); + enum rhizome_payload_status status = rhizome_finish_write(transfer->write); + if (status == RHIZOME_PAYLOAD_STATUS_BUSY) + return 1; + + DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&transfer->key), status); + free(transfer->write); + transfer->write = NULL; + if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){ + enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL); + DEBUGF(rhizome_sync_keys, "Import %s = %s", + alloca_sync_key(&transfer->key), rhizome_bundle_status_message_nonnull(add_state)); + } + if (transfer->manifest) + rhizome_manifest_free(transfer->manifest); + transfer->manifest=NULL; + + completing = transfer->next; + free(transfer); + } + return 0; +} + +static int sync_manifest_rank(rhizome_manifest *m, struct subscriber *peer, uint8_t sending, uint64_t written_offset) +{ + uint8_t bias = REACHABLE_BIAS; + int rank = log2ll(m->filesize - written_offset); + + if (m->has_recipient){ + struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0); + // if the recipient is routable and this bundle is heading the right way; + // give the bundle's rank a boost + if (recipient + && (recipient->reachable & (REACHABLE | REACHABLE_SELF)) + && (sending == (recipient->next_hop == peer ? 1 : 0))){ + DEBUGF(rhizome_sync_keys, "Boosting rank for %s to deliver to recipient %s", + alloca_tohex(m->manifesthash.binary, sizeof(sync_key_t)), // NOT SET??? + alloca_tohex_sid_t(recipient->sid)); + bias=0; + } + } + + return rank + bias; +} + +static void sync_lookup_bar(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct transfers **ptr){ + // queue BAR for transmission based on the manifest details. + // add a rank bias if there is no reachable recipient, to prioritise messaging + rhizome_manifest *m = rhizome_new_manifest(); + if (!m) + return; + + struct transfers *transfer = *ptr; + enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(transfer->key.key, sizeof(sync_key_t), m); + switch(status){ + case RHIZOME_BUNDLE_STATUS_SAME: + break; + + case RHIZOME_BUNDLE_STATUS_BUSY: + case RHIZOME_BUNDLE_STATUS_NEW: + // TODO We don't have this bundle anymore! + + default: + goto end; + } + + int rank = sync_manifest_rank(m, peer, 1, 0); + + *ptr = transfer->next; + struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, &transfer->key, STATE_SEND_BAR, rank); + free(transfer); + + if (!send_bar) + goto end; + + rhizome_manifest_to_bar(m, &send_bar->bar); + +end: + rhizome_manifest_free(m); +} + +static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sync_state) { size_t mtu = MSP_MESSAGE_SIZE; // FIX ME, use link mtu? struct overlay_buffer *payload=NULL; uint8_t buff[mtu]; - + // send requests for more data, stop when we hit MAX_REQUEST_BYTES // Note that requests are ordered by rank, // so we will still request a high rank item even if there is a low ranked item being received @@ -255,7 +346,11 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state) // now send requested data ptr = &sync_state->queue; while((*ptr) && msp_can_send(sync_state->connection)){ + if ((*ptr)->state == STATE_LOOKUP_BAR) + sync_lookup_bar(peer, sync_state, ptr); // might remove *ptr from the list + struct transfers *msg = *ptr; + if ((msg->state & 3) != STATE_SEND){ ptr = &msg->next; continue; @@ -291,9 +386,10 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state) ob_append_bytes(payload, m->manifestdata, m->manifest_all_bytes); send_payload=1; break; + default: + msg_complete = 0; case RHIZOME_BUNDLE_STATUS_NEW: // TODO we don't have this bundle anymore! - default: ob_rewind(payload); } rhizome_manifest_free(m); @@ -376,9 +472,9 @@ void sync_send(struct sched_ent *alarm) struct subscriber *peer = msp_remote_peer(connection); struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); sync_state->connection = connection; - sync_send_peer(sync_state); + sync_send_peer(peer, sync_state); } - + while(1){ struct msp_server_state *connection = msp_next_closed(&iterator); if (!connection) @@ -388,7 +484,7 @@ void sync_send(struct sched_ent *alarm) struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); DEBUGF(rhizome_sync_keys, "Connection closed %s", alloca_tohex_sid_t(peer->sid)); - + // drop all transfer records while(sync_state->queue){ struct transfers *msg = sync_state->queue; @@ -404,6 +500,11 @@ void sync_send(struct sched_ent *alarm) } time_ms_t next_action = msp_iterator_close(&iterator); + if (sync_complete_transfers()==1){ + time_ms_t try_again = gettime_ms()+100; + if (next_action > try_again) + next_action = try_again; + } RESCHEDULE(alarm, next_action, next_action, next_action); } @@ -419,28 +520,6 @@ static void sync_peer_has (void * UNUSED(context), void *peer_context, const syn // noop, just wait for the BAR to arrive. } -static int sync_manifest_rank(rhizome_manifest *m, struct subscriber *peer, uint8_t sending, uint64_t written_offset) -{ - uint8_t bias = REACHABLE_BIAS; - int rank = log2ll(m->filesize - written_offset); - - if (m->has_recipient){ - struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0); - // if the recipient is routable and this bundle is heading the right way; - // give the bundle's rank a boost - if (recipient - && (recipient->reachable & (REACHABLE | REACHABLE_SELF)) - && (sending == (recipient->next_hop == peer ? 1 : 0))){ - DEBUGF(rhizome_sync_keys, "Boosting rank for %s to deliver to recipient %s", - alloca_tohex(m->manifesthash.binary, sizeof(sync_key_t)), - alloca_tohex_sid_t(recipient->sid)); - bias=0; - } - } - - return rank + bias; -} - static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key) { // pre-emptively announce the manifest? @@ -451,37 +530,15 @@ static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, alloca_tohex_sid_t(peer->sid), alloca_sync_key(key)); - // queue BAR for transmission based on the manifest details. - // add a rank bias if there is no reachable recipient to prioritise messaging - - rhizome_manifest *m = rhizome_new_manifest(); - if (!m) - return; - - enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(key->key, sizeof(sync_key_t), m); - switch(status){ - case RHIZOME_BUNDLE_STATUS_SAME: - break; - case RHIZOME_BUNDLE_STATUS_NEW: - // TODO We don't have this bundle anymore! - default: - goto end; - } - - int rank = sync_manifest_rank(m, peer, 1, 0); - struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); if (!sync_state) - goto end; + return; - struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, key, STATE_SEND_BAR, rank); - if (!send_bar) - goto end; + struct transfers **send_bar = find_and_update_transfer(peer, sync_state, key, STATE_LOOKUP_BAR, 0); + if (!send_bar && !*send_bar) + return; - rhizome_manifest_to_bar(m, &send_bar->bar); - -end: - rhizome_manifest_free(m); + sync_lookup_bar(peer, sync_state, send_bar); } static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key) @@ -585,12 +642,16 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn if (!config.rhizome.fetch) break; - if (!rhizome_is_bar_interesting(&bar)){ + int r = rhizome_is_bar_interesting(&bar); + if (r == 0){ DEBUGF(rhizome_sync_keys, "Ignoring BAR for %s, (Uninteresting)", alloca_sync_key(&key)); break; - } - + }else if (r==-1){ + // don't consume the payload + ob_rewind(payload); + return; + } // send a request for the manifest rank = rhizome_bar_log_size(&bar); struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_MANIFEST, rank); @@ -637,12 +698,18 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn rhizome_manifest_free(m); break; } - - if (!rhizome_is_manifest_interesting(m)){ + + int r = rhizome_is_manifest_interesting(m); + if (r == 0){ DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (Uninteresting)", alloca_sync_key(&key)); rhizome_manifest_free(m); break; + }else if (r == -1){ + // don't consume the payload + rhizome_manifest_free(m); + ob_rewind(payload); + return; } // start writing the payload @@ -655,23 +722,40 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn }else{ status = rhizome_open_write(write, &m->filehash, m->filesize); } - - if (status == RHIZOME_PAYLOAD_STATUS_STORED){ - enum rhizome_bundle_status add_status = rhizome_add_manifest_to_store(m, NULL); - DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)", - alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status)); - rhizome_manifest_free(m); - free(write); - break; - }else if (status!=RHIZOME_PAYLOAD_STATUS_NEW){ - DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)", - alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status)); + + switch(status){ + case RHIZOME_PAYLOAD_STATUS_STORED:{ + enum rhizome_bundle_status add_status = rhizome_add_manifest_to_store(m, NULL); + if (add_status == RHIZOME_BUNDLE_STATUS_BUSY){ + // don't consume the payload + rhizome_manifest_free(m); + ob_rewind(payload); + free(write); + return; + } + DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)", + alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status)); + } + break; + + case RHIZOME_PAYLOAD_STATUS_BUSY: + // don't consume the payload + rhizome_manifest_free(m); + ob_rewind(payload); + free(write); + return; + + default: + DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)", + alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status)); + } + + if (status!=RHIZOME_PAYLOAD_STATUS_NEW){ rhizome_manifest_free(m); free(write); break; } - if (m->is_journal){ // if we're fetching a journal bundle, copy any bytes we have of a previous version // and therefore work out what range of bytes we still need @@ -761,34 +845,25 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn } struct transfers *transfer = *ptr; transfer->req_len -= len; - uint8_t all_done = 0; if (rhizome_write_buffer(transfer->write, buff, len)==-1){ WHYF("Write failed for %s!", alloca_sync_key(&key)); - all_done=1; - }else{ - DEBUGF(rhizome_sync_keys, "Wrote to %s %zu, now %zu of %zu", - alloca_sync_key(&key), len, transfer->write->file_offset, transfer->write->file_length); - if (transfer->write->file_offset >= transfer->write->file_length){ - enum rhizome_payload_status status = rhizome_finish_write(transfer->write); - DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status); - free(transfer->write); - transfer->write = NULL; - if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){ - enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL); - DEBUGF(rhizome_sync_keys, "Import %s = %s", - alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state)); - } - all_done=1; - } - } - - if (all_done){ if (transfer->manifest) rhizome_manifest_free(transfer->manifest); transfer->manifest=NULL; clear_transfer(transfer); *ptr = transfer->next; free(transfer); + }else{ + DEBUGF(rhizome_sync_keys, "Wrote to %s %zu, now %zu of %zu", + alloca_sync_key(&key), len, transfer->write->file_offset, transfer->write->file_length); + + if (transfer->write->file_offset >= transfer->write->file_length){ + // move this transfer to the global completing list + transfer->state = STATE_COMPLETING; + *ptr = transfer->next; + transfer->next = completing; + completing = transfer; + } } break; } @@ -848,15 +923,20 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf msp_consumed(connection_state, packet, recv_payload); } - sync_send_peer(sync_state); + sync_send_peer(header->source, sync_state); + time_ms_t next_action = msp_next_action(connection_state); - + if (sync_complete_transfers()==1){ + time_ms_t try_again = gettime_ms() + 100; + if (next_action > try_again) + next_action = try_again; + } + struct sched_ent *alarm=&ALARM_STRUCT(sync_send); if (alarm->alarm > next_action || !is_scheduled(alarm)) RESCHEDULE(alarm, next_action, next_action, next_action); } } - return 0; }