From c5957e9c85a46c510c5ab9918f822315cf1b6c32 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 22 Mar 2016 16:28:44 +1030 Subject: [PATCH] Transfer bundles via msp, triggered by key sync --- msp_common.h | 6 +- msp_server.c | 33 +- msp_server.h | 3 +- overlay_address.h | 1 + rhizome.h | 1 + rhizome_database.c | 46 +++ rhizome_fetch.c | 2 +- rhizome_sync_keys.c | 679 ++++++++++++++++++++++++++++++++++++++++-- sync_keys.c | 46 ++- sync_keys.h | 2 + tests/rhizomeprotocol | 97 +++--- 11 files changed, 827 insertions(+), 89 deletions(-) diff --git a/msp_common.h b/msp_common.h index e259bcbb..dcd14198 100644 --- a/msp_common.h +++ b/msp_common.h @@ -29,6 +29,7 @@ struct msp_window{ uint32_t rtt; uint16_t next_seq; // seq of next expected TX or RX packet. time_ms_t last_activity; + time_ms_t last_packet; struct msp_packet *_head, *_tail; }; @@ -48,7 +49,9 @@ static void msp_stream_init(struct msp_stream *stream) // TODO set base rtt to ensure that we send the first packet a few times before giving up stream->tx.base_rtt = stream->tx.rtt = 0xFFFFFFFF; stream->tx.last_activity = TIME_MS_NEVER_HAS; + stream->tx.last_packet = TIME_MS_NEVER_HAS; stream->rx.last_activity = TIME_MS_NEVER_HAS; + stream->rx.last_packet = TIME_MS_NEVER_HAS; stream->next_action = TIME_MS_NEVER_WILL; stream->timeout = gettime_ms() + 10000; stream->previous_ack = 0x7FFF; @@ -184,7 +187,7 @@ static size_t msp_write_preamble(uint8_t *header, struct msp_stream *stream, str DEBUGF(msp, "With packet flags %02x seq %02x len %zd", header[0], packet->seq, packet->len); - packet->sent = stream->tx.last_activity; + packet->sent = stream->tx.last_packet = stream->tx.last_activity; return MSP_PAYLOAD_PREAMBLE_SIZE; } @@ -333,6 +336,7 @@ static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload, stream->state |= MSP_STATE_RECEIVED_DATA; uint16_t seq = read_uint16(&payload[3]); + stream->rx.last_packet = stream->rx.last_activity; if (compare_wrapped_uint16(seq, stream->rx.next_seq)>=0){ if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1) stream->next_ack = now; diff --git a/msp_server.c b/msp_server.c index eed6a2df..72724acc 100644 --- a/msp_server.c +++ b/msp_server.c @@ -83,6 +83,26 @@ time_ms_t msp_iterator_close(struct msp_iterator *iterator) return next_action; } +struct msp_server_state * msp_next_closed(struct msp_iterator *iterator) +{ + struct msp_server_state *ptr = iterator->_next; + while(1){ + if (ptr){ + ptr = ptr->_next; + }else{ + ptr = *iterator->_root; + } + if (!ptr){ + iterator->_next = ptr; + return NULL; + } + if (ptr->stream.state & MSP_STATE_CLOSED){ + iterator->_next = ptr; + return ptr; + } + } +} + static void send_frame(struct msp_server_state *state, struct overlay_buffer *payload) { struct internal_mdp_header response_header; @@ -126,23 +146,26 @@ struct msp_server_state * msp_process_next(struct msp_iterator *iterator) while(1){ if (ptr){ struct msp_packet *packet = ptr->stream.tx._head; - ptr->stream.next_action = ptr->stream.timeout; + time_ms_t next_packet = TIME_MS_NEVER_WILL; + ptr->stream.next_action = ptr->stream.timeout; while(packet){ - if (packet->sent + RETRANSMIT_TIME < now) + if (packet->sent + RETRANSMIT_TIME <= now) // (re)transmit this packet send_packet(ptr, packet); - if (ptr->stream.next_action > packet->sent + RETRANSMIT_TIME) - ptr->stream.next_action = packet->sent + RETRANSMIT_TIME; + if (next_packet > packet->sent + RETRANSMIT_TIME) + next_packet = packet->sent + RETRANSMIT_TIME; packet=packet->_next; } // should we send an ack now without sending a payload? - if (now > ptr->stream.next_ack) + if (now >= ptr->stream.next_ack) send_ack(ptr); + if (ptr->stream.next_action > next_packet) + ptr->stream.next_action = next_packet; if (ptr->stream.next_action > ptr->stream.next_ack) ptr->stream.next_action = ptr->stream.next_ack; diff --git a/msp_server.h b/msp_server.h index 4c15cd54..4f70a4f3 100644 --- a/msp_server.h +++ b/msp_server.h @@ -24,8 +24,9 @@ struct subscriber * msp_remote_peer(struct msp_server_state *state); int msp_can_send(struct msp_server_state *state); int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator); -time_ms_t msp_iterator_close(struct msp_iterator *iterator); struct msp_server_state * msp_process_next(struct msp_iterator *iterator); +struct msp_server_state * msp_next_closed(struct msp_iterator *iterator); +time_ms_t msp_iterator_close(struct msp_iterator *iterator); int msp_send_packet(struct msp_server_state *state, const uint8_t *payload, size_t len); int msp_shutdown_stream(struct msp_server_state *state); diff --git a/overlay_address.h b/overlay_address.h index b4485558..44921f35 100644 --- a/overlay_address.h +++ b/overlay_address.h @@ -66,6 +66,7 @@ struct subscriber{ // rhizome sync state struct rhizome_sync *sync_state; + struct rhizome_sync_keys *sync_keys_state; uint8_t sync_version; // result of routing calculations; diff --git a/rhizome.h b/rhizome.h index d1385a5d..2133eccd 100644 --- a/rhizome.h +++ b/rhizome.h @@ -623,6 +623,7 @@ int rhizome_is_manifest_interesting(rhizome_manifest *m); enum rhizome_bundle_status rhizome_retrieve_manifest(const rhizome_bid_t *bid, rhizome_manifest *m); enum rhizome_bundle_status rhizome_retrieve_manifest_by_prefix(const unsigned char *prefix, unsigned prefix_len, rhizome_manifest *m); enum rhizome_bundle_status rhizome_retrieve_manifest_by_hash_prefix(const uint8_t *prefix, unsigned prefix_len, rhizome_manifest *m); +enum rhizome_bundle_status rhizome_retrieve_bar_by_hash_prefix(const uint8_t *prefix, unsigned prefix_len, rhizome_bar_t *bar); int rhizome_advertise_manifest(struct subscriber *dest, rhizome_manifest *m); int rhizome_mdp_send_block(struct subscriber *dest, const rhizome_bid_t *bid, uint64_t version, uint64_t fileOffset, uint32_t bitmap, uint16_t blockLength); int rhizome_delete_bundle(const rhizome_bid_t *bidp); diff --git a/rhizome_database.c b/rhizome_database.c index afbf446e..af86cf66 100644 --- a/rhizome_database.c +++ b/rhizome_database.c @@ -1778,6 +1778,52 @@ enum rhizome_bundle_status rhizome_retrieve_manifest_by_hash_prefix(const uint8_ return ret; } +enum rhizome_bundle_status rhizome_retrieve_bar_by_hash_prefix(const uint8_t *prefix, unsigned prefix_len, rhizome_bar_t *bar) +{ + sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; + const unsigned prefix_strlen = prefix_len * 2; + char like[prefix_strlen + 2]; + tohex(like, prefix_strlen, prefix); + like[prefix_strlen] = '%'; + like[prefix_strlen + 1] = '\0'; + sqlite3_stmt *statement = sqlite_prepare_bind(&retry, + "SELECT bar FROM manifests WHERE manifest_hash like ?", + TEXT, like, + END); + if (!statement) + return RHIZOME_BUNDLE_STATUS_ERROR; + enum rhizome_bundle_status ret; + + int r=sqlite_step_retry(&retry, statement); + if (sqlite_code_busy(r)){ + ret = RHIZOME_BUNDLE_STATUS_BUSY; + goto end; + } + if (!sqlite_code_ok(r)){ + ret = RHIZOME_BUNDLE_STATUS_ERROR; + goto end; + } + if (r!=SQLITE_ROW){ + ret = RHIZOME_BUNDLE_STATUS_NEW; + goto end; + } + + const uint8_t *db_bar = sqlite3_column_blob(statement, 0); + size_t bar_size = sqlite3_column_bytes(statement, 0); + + if (bar_size != RHIZOME_BAR_BYTES){ + ret = RHIZOME_BUNDLE_STATUS_ERROR; + goto end; + } + + bcopy(db_bar, bar, RHIZOME_BAR_BYTES); + ret = RHIZOME_BUNDLE_STATUS_SAME; + +end: + sqlite3_finalize(statement); + return ret; +} + static int rhizome_delete_manifest_retry(sqlite_retry_state *retry, const rhizome_bid_t *bidp) { sqlite3_stmt *statement = sqlite_prepare_bind(retry, diff --git a/rhizome_fetch.c b/rhizome_fetch.c index b97d73be..c6b19c5c 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -1110,7 +1110,7 @@ static int pipe_journal(struct rhizome_fetch_slot *slot){ assert(slot->previous->tail != RHIZOME_SIZE_UNSET); assert(slot->previous->filesize != RHIZOME_SIZE_UNSET); uint64_t start = slot->manifest->tail - slot->previous->tail + slot->write_state.file_offset; - uint64_t length = slot->previous->filesize - slot->manifest->tail - slot->write_state.file_offset; + uint64_t length = slot->previous->filesize - start; // of course there might not be any overlap if (start < slot->previous->filesize && length>0){ diff --git a/rhizome_sync_keys.c b/rhizome_sync_keys.c index f827d36b..d5bbb3c1 100644 --- a/rhizome_sync_keys.c +++ b/rhizome_sync_keys.c @@ -4,6 +4,7 @@ #include "overlay_buffer.h" #include "overlay_packet.h" #include "mdp_client.h" +#include "msp_server.h" #include "log.h" #include "debug.h" #include "conf.h" @@ -11,8 +12,353 @@ #include "fdqueue.h" #include "overlay_interface.h" #include "route_link.h" +#include "mem.h" + +#define STATE_SEND (1) +#define STATE_REQ (2) +#define STATE_RECV (3) + +#define STATE_BAR (4) +#define STATE_MANIFEST (8) +#define STATE_PAYLOAD (12) + +#define STATE_NONE (0) +#define STATE_SEND_BAR (STATE_SEND|STATE_BAR) +#define STATE_REQ_MANIFEST (STATE_REQ|STATE_MANIFEST) +#define STATE_SEND_MANIFEST (STATE_SEND|STATE_MANIFEST) +#define STATE_REQ_PAYLOAD (STATE_REQ|STATE_PAYLOAD) +#define STATE_SEND_PAYLOAD (STATE_SEND|STATE_PAYLOAD) +#define STATE_RECV_PAYLOAD (STATE_RECV|STATE_PAYLOAD) + +// approx size of a signed manifest +#define DUMMY_MANIFEST_SIZE 256 + +struct transfers{ + struct transfers *next; + sync_key_t key; + uint8_t state; + uint8_t rank; + rhizome_manifest *manifest; + size_t req_len; + union{ + struct rhizome_read *read; + struct rhizome_write *write; + rhizome_bar_t bar; + }; +}; + +struct rhizome_sync_keys{ + struct transfers *queue; + struct msp_server_state *connection; +}; + +#define MAX_REQUEST_BYTES (16*1024) struct sync_state *sync_tree=NULL; +struct msp_server_state *sync_connections=NULL; + +static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){ + if (!peer->sync_keys_state) + peer->sync_keys_state = emalloc_zero(sizeof(struct rhizome_sync_keys)); + return peer->sync_keys_state; +} + +static const char *get_state_name(uint8_t state) +{ + switch(state){ + case STATE_NONE: return "NONE"; + case STATE_SEND_BAR: return "SEND_BAR"; + case STATE_REQ_MANIFEST: return "REQ_MANIFEST"; + case STATE_SEND_MANIFEST: return "SEND_MANIFEST"; + case STATE_REQ_PAYLOAD: return "REQ_PAYLOAD"; + case STATE_SEND_PAYLOAD: return "SEND_PAYLOAD"; + case STATE_RECV_PAYLOAD: return "RECV_PAYLOAD"; + } + return "Unknown"; +} + +static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr) +{ + DEBUGF(rhizome_sync_keys, "Clearing %s %s", get_state_name(ptr->state), alloca_sync_key(&ptr->key)); + switch (ptr->state){ + case STATE_SEND_PAYLOAD: + if (ptr->read){ + rhizome_read_close(ptr->read); + free(ptr->read); + } + ptr->read=NULL; + break; + case STATE_RECV_PAYLOAD: + if (ptr->write){ + rhizome_fail_write(ptr->write); + free(ptr->write); + } + ptr->write=NULL; + break; + } + ptr->state=STATE_NONE; +} +#define clear_transfer(P) _clear_transfer(__WHENCE__,P) + +static struct transfers **find_and_update_transfer(struct rhizome_sync_keys *keys_state, const sync_key_t *key, uint8_t state, int rank) +{ + struct transfers **ptr = &keys_state->queue; + while(*ptr){ + if (memcmp(key, &(*ptr)->key, sizeof(sync_key_t))==0){ + if (state){ + if ((*ptr)->state && (*ptr)->state!=state){ + DEBUGF(rhizome_sync_keys, "Updating state from %s to %s %s", + get_state_name((*ptr)->state), get_state_name(state), alloca_sync_key(key)); + clear_transfer(*ptr); + } + (*ptr)->state = state; + } + return ptr; + } + if (rank>=0 && (*ptr)->rank > rank) + break; + ptr = &(*ptr)->next; + } + if (rank<0) + return NULL; + struct transfers *ret = emalloc_zero(sizeof(struct transfers)); + ret->key = *key; + ret->rank = rank; + ret->state = state; + ret->next = (*ptr); + (*ptr) = ret; + DEBUGF(rhizome_sync_keys, "Queued transfer message %s %s", get_state_name(ret->state), alloca_sync_key(key)); + return ptr; +} + +static void sync_key_diffs(void *UNUSED(context), void *peer_context, const sync_key_t *key, uint8_t ours) +{ + struct subscriber *peer = (struct subscriber *)peer_context; + struct rhizome_sync_keys *sync_keys = get_peer_sync_state(peer); + struct transfers **transfer = find_and_update_transfer(sync_keys, key, 0, -1); + + DEBUGF(rhizome_sync_keys, "Peer %s %s %s %s", + alloca_tohex_sid_t(peer->sid), + ours?"missing":"has", + alloca_sync_key(key), + transfer?get_state_name((*transfer)->state):"No transfer"); + + if (transfer){ + struct transfers *msg = *transfer; + switch(msg->state){ + case STATE_REQ_PAYLOAD: + DEBUGF(rhizome_sync_keys, " - Requesting payload [%zu of %zu]", msg->write->file_offset, msg->write->file_length); + break; + case STATE_SEND_PAYLOAD: + DEBUGF(rhizome_sync_keys, " - Sending payload [%zu of %zu]", msg->read->offset, msg->read->length); + break; + case STATE_RECV_PAYLOAD: + DEBUGF(rhizome_sync_keys, " - Receiving payload [%zu of %zu]", msg->write->file_offset, msg->write->file_length); + break; + } + } +} + +DEFINE_ALARM(sync_keys_status); +void sync_keys_status(struct sched_ent *alarm) +{ + if (!IF_DEBUG(rhizome_sync_keys)) + return; + + DEBUGF(rhizome_sync_keys, "Sync state;"); + sync_enum_differences(sync_tree, sync_key_diffs); + + time_ms_t next = gettime_ms()+1000; + RESCHEDULE(alarm, next, next, next); +} + +static void sync_send_peer(struct rhizome_sync_keys *sync_state) +{ + size_t mtu = MSP_MESSAGE_SIZE; // FIX ME, use link mtu? + + struct overlay_buffer *payload=NULL; + uint8_t buff[mtu]; + + // send requests for more data, stop when we hit MAX_REQUEST_BYTES + // Note that requests are ordered by rank, + // so we will still request a high rank item even if there is a low ranked item being received + struct transfers **ptr = &sync_state->queue; + size_t requested_bytes = 0; + + while((*ptr) && msp_can_send(sync_state->connection) && requested_bytes < MAX_REQUEST_BYTES){ + struct transfers *msg = *ptr; + if (msg->state == STATE_RECV_PAYLOAD){ + requested_bytes+=msg->req_len; + }else if ((msg->state & 3) == STATE_REQ){ + if (!payload){ + payload = ob_static(buff, sizeof(buff)); + ob_limitsize(payload, sizeof(buff)); + } + + DEBUGF(rhizome_sync_keys, "Sending sync messsage %s %s", get_state_name(msg->state), alloca_sync_key(&msg->key)); + ob_append_byte(payload, msg->state); + ob_append_bytes(payload, msg->key.key, sizeof(msg->key)); + ob_append_byte(payload, msg->rank); + + // start from the specified file offset (eg journals, but one day perhaps resuming transfers) + if (msg->state == STATE_REQ_PAYLOAD){ + ob_append_packed_ui64(payload, msg->write->file_offset); + ob_append_packed_ui64(payload, msg->req_len); + } + + if (ob_overrun(payload)){ + ob_rewind(payload); + msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload)); + ob_clear(payload); + ob_limitsize(payload, sizeof(buff)); + }else{ + ob_checkpoint(payload); + requested_bytes+=msg->req_len; + if (msg->state == STATE_REQ_PAYLOAD){ + // keep hold of the manifest pointer + msg->state = STATE_RECV_PAYLOAD; + }else{ + *ptr = msg->next; + clear_transfer(msg); + if (msg->manifest) + rhizome_manifest_free(msg->manifest); + msg->manifest=NULL; + free(msg); + continue; + } + } + } + ptr = &msg->next; + } + + // now send requested data + ptr = &sync_state->queue; + while((*ptr) && msp_can_send(sync_state->connection)){ + struct transfers *msg = *ptr; + if ((msg->state & 3) != STATE_SEND){ + ptr = &msg->next; + continue; + } + + if (!payload){ + payload = ob_static(buff, sizeof(buff)); + ob_limitsize(payload, sizeof(buff)); + } + + uint8_t msg_complete=1; + uint8_t send_payload=0; + DEBUGF(rhizome_sync_keys, "Sending sync messsage %s %s", get_state_name(msg->state), alloca_sync_key(&msg->key)); + ob_append_byte(payload, msg->state); + ob_append_bytes(payload, msg->key.key, sizeof(msg->key)); + + switch(msg->state){ + case STATE_SEND_BAR:{ + ob_append_bytes(payload, msg->bar.binary, sizeof(msg->bar)); + break; + } + case STATE_SEND_MANIFEST:{ + rhizome_manifest *m = rhizome_new_manifest(); + if (!m){ + ob_rewind(payload); + // TODO fragment manifests + assert(ob_position(payload)); + msg_complete = 0; + }else{ + if (rhizome_retrieve_manifest_by_hash_prefix(msg->key.key, sizeof(msg->key), m)==RHIZOME_BUNDLE_STATUS_SAME){ + ob_append_bytes(payload, m->manifestdata, m->manifest_all_bytes); + send_payload=1; + }else{ + ob_rewind(payload); + } + rhizome_manifest_free(m); + } + break; + } + case STATE_SEND_PAYLOAD:{ + size_t max_len = ob_remaining(payload); + if (max_len > msg->req_len) + max_len = msg->req_len; + ssize_t payload_len = rhizome_read(msg->read, ob_current_ptr(payload), max_len); + if (payload_len==-1){ + ob_rewind(payload); + }else{ + ob_append_space(payload, payload_len); + send_payload=1; + } + DEBUGF(rhizome_sync_keys, "Sending %s %zd bytes (now %zd of %zd)", + alloca_sync_key(&msg->key), payload_len, msg->read->offset, msg->read->length); + + msg->req_len -= payload_len; + if (msg->read->offset < msg->read->length && msg->req_len>0) + msg_complete=0; + + break; + } + default: + FATALF("Unexpected state %x", msg->state); + } + + if (ob_overrun(payload)){ + ob_rewind(payload); + msg_complete=0; + send_payload=1; + }else{ + ob_checkpoint(payload); + } + + if (send_payload){ + msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload)); + ob_clear(payload); + ob_limitsize(payload, sizeof(buff)); + } + + if (msg_complete){ + *ptr = msg->next; + clear_transfer(msg); + if (msg->manifest) + rhizome_manifest_free(msg->manifest); + msg->manifest=NULL; + free(msg); + } + // else, try to send another chunk of this payload immediately + } + + if (payload){ + if (ob_position(payload)) + msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload)); + ob_free(payload); + } +} + +DEFINE_ALARM(sync_send); +void sync_send(struct sched_ent *alarm) +{ + struct msp_iterator iterator; + msp_iterator_open(&sync_connections, &iterator); + + while(1){ + struct msp_server_state *connection = msp_process_next(&iterator); + if (!connection) + break; + + struct subscriber *peer = msp_remote_peer(connection); + struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); + sync_state->connection = connection; + sync_send_peer(sync_state); + } + + while(1){ + struct msp_server_state *connection = msp_next_closed(&iterator); + if (!connection) + break; + struct subscriber *peer = msp_remote_peer(connection); + DEBUGF(rhizome_sync_keys, "Connection closed %s", alloca_tohex_sid_t(peer->sid)); + // TODO if the msp connection breaks before sync complete, free the full sync state + } + + time_ms_t next_action = msp_iterator_close(&iterator); + RESCHEDULE(alarm, next_action, next_action, next_action); +} static void sync_peer_has (void * UNUSED(context), void *peer_context, const sync_key_t *key) { @@ -23,7 +369,7 @@ static void sync_peer_has (void * UNUSED(context), void *peer_context, const syn alloca_tohex_sid_t(peer->sid), alloca_sync_key(key)); - // TODO queue transfers, with retry + // noop, just wait for the BAR to arrive. } static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key) @@ -36,19 +382,27 @@ static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, alloca_tohex_sid_t(peer->sid), alloca_sync_key(key)); - // TODO queue these advertisements based on rank! - - rhizome_manifest *m = rhizome_new_manifest(); - if (!m) - return; - - if (rhizome_retrieve_manifest_by_hash_prefix(key->key, sizeof(*key), m)==RHIZOME_BUNDLE_STATUS_SAME){ - rhizome_advertise_manifest(peer, m); - // pre-emptively send the payload if it will fit in a single packet - if (m->filesize > 0 && m->filesize <= 1024) - rhizome_mdp_send_block(peer, &m->cryptoSignPublic, m->version, 0, 0, m->filesize); + // queue BAR for transmission + rhizome_bar_t bar; + if (rhizome_retrieve_bar_by_hash_prefix(key->key, sizeof(*key), &bar)==RHIZOME_BUNDLE_STATUS_SAME){ + uint8_t log_size = rhizome_bar_log_size(&bar); + struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); + + struct transfers *send_bar = *find_and_update_transfer(sync_state, key, STATE_SEND_BAR, log_size); + send_bar->bar = bar; + + if (!sync_state->connection) + sync_state->connection = msp_find_or_connect(&sync_connections, + peer, MDP_PORT_RHIZOME_SYNC_KEYS, + my_subscriber, MDP_PORT_RHIZOME_SYNC_KEYS); + + time_ms_t next_action = msp_next_action(sync_state->connection); + struct sched_ent *alarm=&ALARM_STRUCT(sync_send); + if (next_action < alarm->alarm || !is_scheduled(alarm)) + RESCHEDULE(alarm, next_action, next_action, next_action); } - rhizome_manifest_free(m); + + return; } static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key) @@ -83,8 +437,8 @@ static void build_tree() sqlite3_finalize(statement); } -DEFINE_ALARM(sync_send_data); -void sync_send_data(struct sched_ent *alarm) +DEFINE_ALARM(sync_send_keys); +void sync_send_keys(struct sched_ent *alarm) { if (!sync_tree) build_tree(); @@ -93,9 +447,11 @@ void sync_send_data(struct sched_ent *alarm) size_t len = sync_build_message(sync_tree, buff, sizeof buff); if (len==0) return; - - DEBUG(rhizome_sync_keys,"Sending message"); - dump("Raw message", buff, len); + + if (IF_DEBUG(rhizome_sync_keys)){ + DEBUG(rhizome_sync_keys,"Sending message"); + //dump("Raw message", buff, len); + } struct overlay_buffer *payload = ob_static(buff, sizeof(buff)); ob_limitsize(payload, len); @@ -114,14 +470,249 @@ void sync_send_data(struct sched_ent *alarm) time_ms_t now = gettime_ms(); if (sync_has_transmit_queued(sync_tree)){ - DEBUG(rhizome_sync_keys,"Queueing next message for 5ms"); - RESCHEDULE(alarm, now+5, now+5, now+5); + DEBUG(rhizome_sync_keys,"Queueing next message for now"); + RESCHEDULE(alarm, now, now, now); }else{ DEBUG(rhizome_sync_keys,"Queueing next message for 5s"); RESCHEDULE(alarm, now+5000, now+30000, TIME_MS_NEVER_WILL); } } +static void process_transfer_message(struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload) +{ + while(ob_remaining(payload)){ + ob_checkpoint(payload); + int msg_state = ob_get(payload); + if (msg_state<0) + return; + sync_key_t key; + if (ob_get_bytes(payload, key.key, sizeof key)<0) + return; + + int rank=-1; + if (msg_state & STATE_REQ){ + rank = ob_get(payload); + if (rank < 0) + return; + } + + DEBUGF(rhizome_sync_keys, "Processing sync message %s %s %d", + get_state_name(msg_state), alloca_sync_key(&key), rank); + switch(msg_state){ + case STATE_SEND_BAR:{ + rhizome_bar_t bar; + if (ob_get_bytes(payload, bar.binary, sizeof(rhizome_bar_t))<0) + return; + + if (!config.rhizome.fetch) + break; + if (!rhizome_is_bar_interesting(&bar)){ + DEBUGF(rhizome_sync_keys, "Ignoring BAR for %s, (Uninteresting)", + alloca_sync_key(&key)); + break; + } + + // send a request for the manifest + rank = rhizome_bar_log_size(&bar); + struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_MANIFEST, rank); + transfer->req_len = DUMMY_MANIFEST_SIZE; + break; + } + + case STATE_REQ_MANIFEST:{ + // queue the transmission of the manifest + find_and_update_transfer(sync_state, &key, STATE_SEND_MANIFEST, rank); + break; + } + + case STATE_SEND_MANIFEST:{ + // process the incoming manifest + size_t len = ob_remaining(payload); + uint8_t *data = ob_get_bytes_ptr(payload, len); + + if (!config.rhizome.fetch) + break; + + struct rhizome_manifest_summary summ; + if (!rhizome_manifest_inspect((char *)data, len, &summ)){ + WHYF("Ignoring manifest for %s, (Malformed)", + alloca_sync_key(&key)); + break; + } + + // The manifest looks potentially interesting, so now do a full parse and validation. + rhizome_manifest *m = rhizome_new_manifest(); + if (!m){ + // don't consume the payload + ob_rewind(payload); + return; + } + + memcpy(m->manifestdata, data, len); + m->manifest_all_bytes = len; + if ( rhizome_manifest_parse(m) == -1 + || !rhizome_manifest_validate(m) + ) { + WHYF("Ignoring manifest for %s, (Malformed)", + alloca_sync_key(&key)); + rhizome_manifest_free(m); + break; + } + + if (!rhizome_is_manifest_interesting(m)){ + DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (Uninteresting)", + alloca_sync_key(&key)); + rhizome_manifest_free(m); + break; + } + + // start writing the payload + + enum rhizome_payload_status status; + struct rhizome_write *write = emalloc_zero(sizeof(struct rhizome_write)); + + if (m->filesize==0){ + status = RHIZOME_PAYLOAD_STATUS_STORED; + }else{ + status = rhizome_open_write(write, &m->filehash, m->filesize); + } + + if (status == RHIZOME_PAYLOAD_STATUS_STORED){ + enum rhizome_bundle_status add_status = rhizome_add_manifest_to_store(m, NULL); + DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)", + alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status)); + rhizome_manifest_free(m); + free(write); + break; + }else if (status!=RHIZOME_PAYLOAD_STATUS_NEW){ + DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)", + alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status)); + rhizome_manifest_free(m); + free(write); + break; + } + + + if (m->is_journal){ + // if we're fetching a journal bundle, copy any bytes we have of a previous version + // and therefore work out what range of bytes we still need + rhizome_manifest *previous = rhizome_new_manifest(); + if (rhizome_retrieve_manifest(&m->cryptoSignPublic, previous)==RHIZOME_BUNDLE_STATUS_SAME && + previous->is_journal && + previous->tail <= m->tail && + previous->filesize + previous->tail > m->tail + ){ + uint64_t start = m->tail - previous->tail; + uint64_t length = previous->filesize - start; + // required by tests; + DEBUGF(rhizome_sync_keys, "%s Copying %"PRId64" bytes from previous journal", alloca_sync_key(&key), length); + rhizome_journal_pipe(write, &previous->filehash, start, length); + } + rhizome_manifest_free(previous); + + if (write->file_offset >= m->filesize){ + // no new content in the new version, we can import now + enum rhizome_payload_status status = rhizome_finish_write(write); + DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status); + free(write); + if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){ + enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(m, NULL); + DEBUGF(rhizome_sync_keys, "Import %s = %s", + alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state)); + } + rhizome_manifest_free(m); + break; + } + } + + // TODO improve rank algo here; + rank = log2ll(m->filesize - write->file_offset); + + struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_PAYLOAD, rank); + transfer->manifest = m; + transfer->req_len = m->filesize - write->file_offset; + transfer->write = write; + break; + } + case STATE_REQ_PAYLOAD:{ + // open the payload for reading + uint64_t offset = ob_get_packed_ui64(payload); + uint64_t length = ob_get_packed_ui64(payload); + + rhizome_manifest *m = rhizome_new_manifest(); + if (!m){ + ob_rewind(payload); + return; + } + if (rhizome_retrieve_manifest_by_hash_prefix(key.key, sizeof(sync_key_t), m)!=RHIZOME_BUNDLE_STATUS_SAME){ + rhizome_manifest_free(m); + break; + } + + struct rhizome_read *read = emalloc_zero(sizeof (struct rhizome_read)); + + if (rhizome_open_read(read, &m->filehash) != RHIZOME_PAYLOAD_STATUS_STORED){ + free(read); + rhizome_manifest_free(m); + break; + } + rhizome_manifest_free(m); + + struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_SEND_PAYLOAD, rank); + transfer->read = read; + transfer->req_len = length; + read->offset = offset; + break; + } + case STATE_SEND_PAYLOAD:{ + size_t len = ob_remaining(payload); + uint8_t *buff = ob_get_bytes_ptr(payload, len); + + struct transfers **ptr = find_and_update_transfer(sync_state, &key, STATE_RECV_PAYLOAD, -1); + if (!ptr){ + WHYF("Ignoring message for %s, no transfer in progress!", alloca_sync_key(&key)); + break; + } + struct transfers *transfer = *ptr; + transfer->req_len -= len; + uint8_t all_done = 0; + if (rhizome_write_buffer(transfer->write, buff, len)==-1){ + WHYF("Write failed for %s!", alloca_sync_key(&key)); + all_done=1; + }else{ + DEBUGF(rhizome_sync_keys, "Wrote to %s %zu, now %zu of %zu", + alloca_sync_key(&key), len, transfer->write->file_offset, transfer->write->file_length); + if (transfer->write->file_offset >= transfer->write->file_length){ + enum rhizome_payload_status status = rhizome_finish_write(transfer->write); + DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status); + free(transfer->write); + transfer->write = NULL; + if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){ + enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL); + DEBUGF(rhizome_sync_keys, "Import %s = %s", + alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state)); + } + all_done=1; + } + } + + if (all_done){ + if (transfer->manifest) + rhizome_manifest_free(transfer->manifest); + transfer->manifest=NULL; + clear_transfer(transfer); + *ptr = transfer->next; + free(transfer); + } + break; + } + default: + WHYF("Unknown message type %x", msg_state); + } + } +} + + DEFINE_BINDING(MDP_PORT_RHIZOME_SYNC_KEYS, sync_keys_recv); static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buffer *payload) { @@ -134,19 +725,50 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf header->source->sync_version = 1; if (!header->destination){ - DEBUGF(rhizome_sync_keys,"Processing message from %s", alloca_tohex_sid_t(header->source->sid)); - dump("Raw message", ob_current_ptr(payload), ob_remaining(payload)); + if (IF_DEBUG(rhizome_sync_keys)){ + DEBUGF(rhizome_sync_keys,"Processing message from %s", alloca_tohex_sid_t(header->source->sid)); + //dump("Raw message", ob_current_ptr(payload), ob_remaining(payload)); + } sync_recv_message(sync_tree, header->source, ob_current_ptr(payload), ob_remaining(payload)); if (sync_has_transmit_queued(sync_tree)){ - struct sched_ent *alarm=&ALARM_STRUCT(sync_send_data); + struct sched_ent *alarm=&ALARM_STRUCT(sync_send_keys); time_ms_t next = gettime_ms() + 5; - if (alarm->alarm > next){ + if (alarm->alarm > next || !is_scheduled(alarm)){ DEBUG(rhizome_sync_keys,"Queueing next message for 5ms"); RESCHEDULE(alarm, next, next, next); } } + + if (IF_DEBUG(rhizome_sync_keys)){ + struct sched_ent *alarm=&ALARM_STRUCT(sync_keys_status); + if (alarm->alarm == TIME_MS_NEVER_WILL){ + time_ms_t next = gettime_ms() + 1000; + RESCHEDULE(alarm, next, next, next); + } + } }else{ - // TODO + struct msp_server_state *connection_state = msp_find_and_process(&sync_connections, header, payload); + if (connection_state){ + struct rhizome_sync_keys *sync_state = get_peer_sync_state(header->source); + sync_state->connection = connection_state; + + while(1){ + struct msp_packet *packet = msp_recv_next(connection_state); + if (!packet) + break; + struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet); + + process_transfer_message(sync_state, recv_payload); + msp_consumed(connection_state, packet, recv_payload); + } + + sync_send_peer(sync_state); + time_ms_t next_action = msp_next_action(connection_state); + + struct sched_ent *alarm=&ALARM_STRUCT(sync_send); + if (alarm->alarm > next_action || !is_scheduled(alarm)) + RESCHEDULE(alarm, next_action, next_action, next_action); + } } return 0; @@ -154,7 +776,7 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf static void sync_neighbour_changed(struct subscriber *UNUSED(neighbour), uint8_t UNUSED(found), unsigned count) { - struct sched_ent *alarm = &ALARM_STRUCT(sync_send_data); + struct sched_ent *alarm = &ALARM_STRUCT(sync_send_keys); if (count>0 && is_rhizome_advertise_enabled()){ time_ms_t now = gettime_ms(); @@ -166,6 +788,7 @@ static void sync_neighbour_changed(struct subscriber *UNUSED(neighbour), uint8_t DEBUG(rhizome_sync_keys,"Stop queueing messages"); RESCHEDULE(alarm, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL); } + // nuke sync state for this peer now? } DEFINE_TRIGGER(nbr_change, sync_neighbour_changed); @@ -183,9 +806,9 @@ static void sync_bundle_add(rhizome_manifest *m) sync_add_key(sync_tree, &key, NULL); if (link_has_neighbours()){ - struct sched_ent *alarm = &ALARM_STRUCT(sync_send_data); + struct sched_ent *alarm = &ALARM_STRUCT(sync_send_keys); time_ms_t next = gettime_ms()+5; - if (alarm->alarm > next){ + if (alarm->alarm > next || !is_scheduled(alarm)){ DEBUG(rhizome_sync_keys,"Queueing next message for 5ms"); RESCHEDULE(alarm, next, next, next); } diff --git a/sync_keys.c b/sync_keys.c index f95696a6..c5b51657 100644 --- a/sync_keys.c +++ b/sync_keys.c @@ -146,7 +146,8 @@ static void xor_children(struct node *node, key_message_t *dest) if (node->message.prefix_len == KEY_LEN_BITS){ sync_xor(&node->message.key, dest); }else{ - for (unsigned i=0;ichildren[i]) xor_children(node->children[i], dest); } @@ -222,8 +223,8 @@ static void free_node(struct sync_state *state, struct node *node) { if (!node) return; - - for (unsigned i=0;ichildren[i]); if (node->transmit_next){ @@ -284,7 +285,8 @@ static void remove_key(struct sync_state *state, struct node **root, const sync_ node = NULL; // If *parent has <= 1 child now, we need to remove *parent as well - for (unsigned i=0;ichildren[i]){ if (node) return; @@ -556,7 +558,8 @@ static void peer_missing_leaf_nodes( if (peer_is_missing(state, peer, node, allow_remove)) queue_node(state, node, 1); }else{ - for (unsigned i=0;ichildren[i]) peer_missing_leaf_nodes(state, peer, node->children[i], NODE_CHILDREN, allow_remove); } @@ -608,7 +611,8 @@ static unsigned peer_has_received_all(struct sync_state *state, struct sync_peer // duplicate the child pointers, as removing an immediate child key *will* also free this peer node. struct node *children[NODE_CHILDREN]; memcpy(children, peer_node->children, sizeof(children)); - for (unsigned i=0;iroot; uint8_t prefix_len = 0; uint8_t is_blank = 1; - for (unsigned i=(peer_message.prefix_len>>3)+1;i>3)+1;ichildren[i]) queue_node(state, node->children[i], 0); } @@ -879,3 +885,27 @@ int sync_recv_message(struct sync_state *state, void *peer_context, const uint8_ return 0; } +static void enum_diffs(struct sync_state *state, struct sync_peer_state *peer_state, struct node *node, + void (*callback)(void *context, void *peer_context, const sync_key_t *key, uint8_t theirs)) +{ + if (!node) + return; + if (node->message.prefix_len == KEY_LEN_BITS){ + callback(state->context, peer_state->peer_context, &node->message.key, node->message.stored); + }else{ + unsigned i; + for (i=0;ichildren[i], callback); + } + } +} + +void sync_enum_differences(struct sync_state *state, + void (*callback)(void *context, void *peer_context, const sync_key_t *key, uint8_t theirs)) +{ + struct sync_peer_state *peer_state = state->peers; + while(peer_state){ + enum_diffs(state, peer_state, peer_state->root, callback); + peer_state = peer_state->next; + } +} \ No newline at end of file diff --git a/sync_keys.h b/sync_keys.h index 719269af..129ec71c 100644 --- a/sync_keys.h +++ b/sync_keys.h @@ -39,5 +39,7 @@ size_t sync_build_message(struct sync_state *state, uint8_t *buff, size_t len); // process a message received from a peer. int sync_recv_message(struct sync_state *state, void *peer_context, const uint8_t *buff, size_t len); +void sync_enum_differences(struct sync_state *state, + void (*callback)(void *context, void *peer_context, const sync_key_t *key, uint8_t theirs)); #endif \ No newline at end of file diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index 44e09435..ca77f9c0 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -46,7 +46,8 @@ default_config() { set debug.rhizome_ads on \ set debug.rhizome_tx on \ set debug.rhizome_rx on \ - set debug.rhizome_sync_keys on + set debug.rhizome_sync_keys on \ + set debug.msp on } # Called by start_servald_instances for each instance. @@ -90,50 +91,6 @@ test_FileTransfer() { receive_and_update_bundle } -doc_ManyFiles="Synchronise many small files, with some files in common" -setup_ManyFiles() { - setup_common - bundlesA=() - bundlesB=() - - for i in `seq 1 20` - do - tfw_log "Adding common file-$i" - set_instance +A - create_file file-$i 1000 - executeOk_servald rhizome add file "${!sidvar}" file-$i file-$i.manifest - set_instance +B - executeOk_servald rhizome import bundle file-$i file-$i.manifest - done - - for i in `seq 1 10` - do - set_instance +A - tfw_log "Adding fileA-$i" - create_file fileA-$i 1000 - tfw_nolog executeOk_servald rhizome add file "${!sidvar}" fileA-$i fileA-$i.manifest - tfw_nolog extract_stdout_manifestid BID - tfw_nolog extract_stdout_version VERSION - bundlesA+=($BID:$VERSION) - - set_instance +B - tfw_log "Adding fileB-$i" - create_file fileB-$i 1000 - tfw_nolog executeOk_servald rhizome add file "${!sidvar}" fileB-$i fileB-$i.manifest - tfw_nolog extract_stdout_manifestid BID - tfw_nolog extract_stdout_version VERSION - bundlesB+=($BID:$VERSION) - done - - start_servald_instances +A +B - foreach_instance +A assert_peers_are_instances +B - foreach_instance +B assert_peers_are_instances +A -} - -test_ManyFiles() { - wait_until bundle_received_by ${bundlesA[*]} +B ${bundlesB[*]} +A -} - doc_EncryptedTransfer="Encrypted payload can be opened by destination" setup_EncryptedTransfer() { setup_common @@ -887,4 +844,54 @@ teardown_SimulatedRadio2() { tfw_cat "$SERVALD_VAR/radioerr" } +doc_ManyFiles="Synchronise many small files, with some files in common" +setup_ManyFiles() { + setup_servald + foreach_instance +A +B +C +D create_single_identity + bundlesA=() + bundlesB=() + bundlesC=() + bundlesD=() + + for i in `seq 1 5` + do + tfw_log "Adding common file-$i" + tfw_nolog set_instance +A + create_file file-$i 100 + tfw_nolog executeOk_servald rhizome add file "${!sidvar}" file-$i file-$i.manifest + tfw_nolog set_instance +B + tfw_nolog executeOk_servald rhizome import bundle file-$i file-$i.manifest + tfw_nolog set_instance +C + tfw_nolog executeOk_servald rhizome import bundle file-$i file-$i.manifest + tfw_nolog set_instance +D + tfw_nolog executeOk_servald rhizome import bundle file-$i file-$i.manifest + done + for i in A B C D + do + eval "bundles$i=()" + set_instance +$i + for j in `seq 1 3` + do + tfw_log "Adding file$i-$j" + create_file file$i-$j $(( $j * 500 )) + tfw_nolog executeOk_servald rhizome add file "${!sidvar}" file$i-$j file$i-$j.manifest + tfw_nolog extract_stdout_manifestid BID + tfw_nolog extract_stdout_version VERSION + eval "bundles$i+=(\$BID:\$VERSION)" + done + done + + start_servald_instances +A +B +C +D +} + +test_ManyFiles() { + wait_until --timeout=15 bundle_received_by \ + ${bundlesB[*]} ${bundlesC[*]} ${bundlesD[*]} +A \ + ${bundlesA[*]} ${bundlesC[*]} ${bundlesD[*]} +B \ + ${bundlesA[*]} ${bundlesB[*]} ${bundlesD[*]} +C \ + ${bundlesA[*]} ${bundlesB[*]} ${bundlesC[*]} +D +} + runTests "$@" + +