Deal with some database locking during transfers with the rhizome sync keys process.

Push back messages to re-process later
Queue and retry both the start and end of the transfer process
This commit is contained in:
Jeremy Lakeman 2017-02-27 15:46:39 +10:30
parent 4c538a7686
commit 9ec46f2279

View File

@ -20,7 +20,7 @@
#define STATE_BAR (4)
#define STATE_MANIFEST (8)
#define STATE_PAYLOAD (12)
#define STATE_PAYLOAD (0x0C)
#define STATE_NONE (0)
#define STATE_SEND_BAR (STATE_SEND|STATE_BAR)
@ -29,6 +29,8 @@
#define STATE_REQ_PAYLOAD (STATE_REQ|STATE_PAYLOAD)
#define STATE_SEND_PAYLOAD (STATE_SEND|STATE_PAYLOAD)
#define STATE_RECV_PAYLOAD (STATE_RECV|STATE_PAYLOAD)
#define STATE_COMPLETING (0x10)
#define STATE_LOOKUP_BAR (0x20)
// approx size of a signed manifest
#define DUMMY_MANIFEST_SIZE 256
@ -58,6 +60,8 @@ struct rhizome_sync_keys{
struct sync_state *sync_tree=NULL;
struct msp_server_state *sync_connections=NULL;
struct transfers *completing=NULL;
DEFINE_ALARM(sync_send);
static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){
@ -76,6 +80,7 @@ static const char *get_state_name(uint8_t state)
case STATE_REQ_PAYLOAD: return "REQ_PAYLOAD";
case STATE_SEND_PAYLOAD: return "SEND_PAYLOAD";
case STATE_RECV_PAYLOAD: return "RECV_PAYLOAD";
case STATE_COMPLETING: return "COMPLETING";
}
return "Unknown";
}
@ -91,6 +96,7 @@ static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr)
}
ptr->read=NULL;
break;
case STATE_COMPLETING:
case STATE_RECV_PAYLOAD:
if (ptr->write){
rhizome_fail_write(ptr->write);
@ -192,13 +198,98 @@ void sync_keys_status(struct sched_ent *alarm)
RESCHEDULE(alarm, next, next, next);
}
static void sync_send_peer(struct rhizome_sync_keys *sync_state)
static int sync_complete_transfers(){
// attempt to finish payload transfers and write manifests to the store
while(completing){
struct transfers *transfer = completing;
assert(transfer->state == STATE_COMPLETING);
enum rhizome_payload_status status = rhizome_finish_write(transfer->write);
if (status == RHIZOME_PAYLOAD_STATUS_BUSY)
return 1;
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&transfer->key), status);
free(transfer->write);
transfer->write = NULL;
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&transfer->key), rhizome_bundle_status_message_nonnull(add_state));
}
if (transfer->manifest)
rhizome_manifest_free(transfer->manifest);
transfer->manifest=NULL;
completing = transfer->next;
free(transfer);
}
return 0;
}
static int sync_manifest_rank(rhizome_manifest *m, struct subscriber *peer, uint8_t sending, uint64_t written_offset)
{
uint8_t bias = REACHABLE_BIAS;
int rank = log2ll(m->filesize - written_offset);
if (m->has_recipient){
struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0);
// if the recipient is routable and this bundle is heading the right way;
// give the bundle's rank a boost
if (recipient
&& (recipient->reachable & (REACHABLE | REACHABLE_SELF))
&& (sending == (recipient->next_hop == peer ? 1 : 0))){
DEBUGF(rhizome_sync_keys, "Boosting rank for %s to deliver to recipient %s",
alloca_tohex(m->manifesthash.binary, sizeof(sync_key_t)), // NOT SET???
alloca_tohex_sid_t(recipient->sid));
bias=0;
}
}
return rank + bias;
}
static void sync_lookup_bar(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct transfers **ptr){
// queue BAR for transmission based on the manifest details.
// add a rank bias if there is no reachable recipient, to prioritise messaging
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return;
struct transfers *transfer = *ptr;
enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(transfer->key.key, sizeof(sync_key_t), m);
switch(status){
case RHIZOME_BUNDLE_STATUS_SAME:
break;
case RHIZOME_BUNDLE_STATUS_BUSY:
case RHIZOME_BUNDLE_STATUS_NEW:
// TODO We don't have this bundle anymore!
default:
goto end;
}
int rank = sync_manifest_rank(m, peer, 1, 0);
*ptr = transfer->next;
struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, &transfer->key, STATE_SEND_BAR, rank);
free(transfer);
if (!send_bar)
goto end;
rhizome_manifest_to_bar(m, &send_bar->bar);
end:
rhizome_manifest_free(m);
}
static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sync_state)
{
size_t mtu = MSP_MESSAGE_SIZE; // FIX ME, use link mtu?
struct overlay_buffer *payload=NULL;
uint8_t buff[mtu];
// send requests for more data, stop when we hit MAX_REQUEST_BYTES
// Note that requests are ordered by rank,
// so we will still request a high rank item even if there is a low ranked item being received
@ -255,7 +346,11 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state)
// now send requested data
ptr = &sync_state->queue;
while((*ptr) && msp_can_send(sync_state->connection)){
if ((*ptr)->state == STATE_LOOKUP_BAR)
sync_lookup_bar(peer, sync_state, ptr); // might remove *ptr from the list
struct transfers *msg = *ptr;
if ((msg->state & 3) != STATE_SEND){
ptr = &msg->next;
continue;
@ -291,9 +386,10 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state)
ob_append_bytes(payload, m->manifestdata, m->manifest_all_bytes);
send_payload=1;
break;
default:
msg_complete = 0;
case RHIZOME_BUNDLE_STATUS_NEW:
// TODO we don't have this bundle anymore!
default:
ob_rewind(payload);
}
rhizome_manifest_free(m);
@ -376,9 +472,9 @@ void sync_send(struct sched_ent *alarm)
struct subscriber *peer = msp_remote_peer(connection);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
sync_state->connection = connection;
sync_send_peer(sync_state);
sync_send_peer(peer, sync_state);
}
while(1){
struct msp_server_state *connection = msp_next_closed(&iterator);
if (!connection)
@ -388,7 +484,7 @@ void sync_send(struct sched_ent *alarm)
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
DEBUGF(rhizome_sync_keys, "Connection closed %s", alloca_tohex_sid_t(peer->sid));
// drop all transfer records
while(sync_state->queue){
struct transfers *msg = sync_state->queue;
@ -404,6 +500,11 @@ void sync_send(struct sched_ent *alarm)
}
time_ms_t next_action = msp_iterator_close(&iterator);
if (sync_complete_transfers()==1){
time_ms_t try_again = gettime_ms()+100;
if (next_action > try_again)
next_action = try_again;
}
RESCHEDULE(alarm, next_action, next_action, next_action);
}
@ -419,28 +520,6 @@ static void sync_peer_has (void * UNUSED(context), void *peer_context, const syn
// noop, just wait for the BAR to arrive.
}
static int sync_manifest_rank(rhizome_manifest *m, struct subscriber *peer, uint8_t sending, uint64_t written_offset)
{
uint8_t bias = REACHABLE_BIAS;
int rank = log2ll(m->filesize - written_offset);
if (m->has_recipient){
struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0);
// if the recipient is routable and this bundle is heading the right way;
// give the bundle's rank a boost
if (recipient
&& (recipient->reachable & (REACHABLE | REACHABLE_SELF))
&& (sending == (recipient->next_hop == peer ? 1 : 0))){
DEBUGF(rhizome_sync_keys, "Boosting rank for %s to deliver to recipient %s",
alloca_tohex(m->manifesthash.binary, sizeof(sync_key_t)),
alloca_tohex_sid_t(recipient->sid));
bias=0;
}
}
return rank + bias;
}
static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key)
{
// pre-emptively announce the manifest?
@ -451,37 +530,15 @@ static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context,
alloca_tohex_sid_t(peer->sid),
alloca_sync_key(key));
// queue BAR for transmission based on the manifest details.
// add a rank bias if there is no reachable recipient to prioritise messaging
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return;
enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(key->key, sizeof(sync_key_t), m);
switch(status){
case RHIZOME_BUNDLE_STATUS_SAME:
break;
case RHIZOME_BUNDLE_STATUS_NEW:
// TODO We don't have this bundle anymore!
default:
goto end;
}
int rank = sync_manifest_rank(m, peer, 1, 0);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
if (!sync_state)
goto end;
return;
struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, key, STATE_SEND_BAR, rank);
if (!send_bar)
goto end;
struct transfers **send_bar = find_and_update_transfer(peer, sync_state, key, STATE_LOOKUP_BAR, 0);
if (!send_bar && !*send_bar)
return;
rhizome_manifest_to_bar(m, &send_bar->bar);
end:
rhizome_manifest_free(m);
sync_lookup_bar(peer, sync_state, send_bar);
}
static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key)
@ -585,12 +642,16 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
if (!config.rhizome.fetch)
break;
if (!rhizome_is_bar_interesting(&bar)){
int r = rhizome_is_bar_interesting(&bar);
if (r == 0){
DEBUGF(rhizome_sync_keys, "Ignoring BAR for %s, (Uninteresting)",
alloca_sync_key(&key));
break;
}
}else if (r==-1){
// don't consume the payload
ob_rewind(payload);
return;
}
// send a request for the manifest
rank = rhizome_bar_log_size(&bar);
struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_MANIFEST, rank);
@ -637,12 +698,18 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
rhizome_manifest_free(m);
break;
}
if (!rhizome_is_manifest_interesting(m)){
int r = rhizome_is_manifest_interesting(m);
if (r == 0){
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (Uninteresting)",
alloca_sync_key(&key));
rhizome_manifest_free(m);
break;
}else if (r == -1){
// don't consume the payload
rhizome_manifest_free(m);
ob_rewind(payload);
return;
}
// start writing the payload
@ -655,23 +722,40 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
}else{
status = rhizome_open_write(write, &m->filehash, m->filesize);
}
if (status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_status = rhizome_add_manifest_to_store(m, NULL);
DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status));
rhizome_manifest_free(m);
free(write);
break;
}else if (status!=RHIZOME_PAYLOAD_STATUS_NEW){
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
switch(status){
case RHIZOME_PAYLOAD_STATUS_STORED:{
enum rhizome_bundle_status add_status = rhizome_add_manifest_to_store(m, NULL);
if (add_status == RHIZOME_BUNDLE_STATUS_BUSY){
// don't consume the payload
rhizome_manifest_free(m);
ob_rewind(payload);
free(write);
return;
}
DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status));
}
break;
case RHIZOME_PAYLOAD_STATUS_BUSY:
// don't consume the payload
rhizome_manifest_free(m);
ob_rewind(payload);
free(write);
return;
default:
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
}
if (status!=RHIZOME_PAYLOAD_STATUS_NEW){
rhizome_manifest_free(m);
free(write);
break;
}
if (m->is_journal){
// if we're fetching a journal bundle, copy any bytes we have of a previous version
// and therefore work out what range of bytes we still need
@ -761,34 +845,25 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
}
struct transfers *transfer = *ptr;
transfer->req_len -= len;
uint8_t all_done = 0;
if (rhizome_write_buffer(transfer->write, buff, len)==-1){
WHYF("Write failed for %s!", alloca_sync_key(&key));
all_done=1;
}else{
DEBUGF(rhizome_sync_keys, "Wrote to %s %zu, now %zu of %zu",
alloca_sync_key(&key), len, transfer->write->file_offset, transfer->write->file_length);
if (transfer->write->file_offset >= transfer->write->file_length){
enum rhizome_payload_status status = rhizome_finish_write(transfer->write);
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status);
free(transfer->write);
transfer->write = NULL;
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state));
}
all_done=1;
}
}
if (all_done){
if (transfer->manifest)
rhizome_manifest_free(transfer->manifest);
transfer->manifest=NULL;
clear_transfer(transfer);
*ptr = transfer->next;
free(transfer);
}else{
DEBUGF(rhizome_sync_keys, "Wrote to %s %zu, now %zu of %zu",
alloca_sync_key(&key), len, transfer->write->file_offset, transfer->write->file_length);
if (transfer->write->file_offset >= transfer->write->file_length){
// move this transfer to the global completing list
transfer->state = STATE_COMPLETING;
*ptr = transfer->next;
transfer->next = completing;
completing = transfer;
}
}
break;
}
@ -848,15 +923,20 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
msp_consumed(connection_state, packet, recv_payload);
}
sync_send_peer(sync_state);
sync_send_peer(header->source, sync_state);
time_ms_t next_action = msp_next_action(connection_state);
if (sync_complete_transfers()==1){
time_ms_t try_again = gettime_ms() + 100;
if (next_action > try_again)
next_action = try_again;
}
struct sched_ent *alarm=&ALARM_STRUCT(sync_send);
if (alarm->alarm > next_action || !is_scheduled(alarm))
RESCHEDULE(alarm, next_action, next_action, next_action);
}
}
return 0;
}