Transfer bundles via msp, triggered by key sync

This commit is contained in:
Jeremy Lakeman 2016-03-22 16:28:44 +10:30
parent 6b7aa800e4
commit c5957e9c85
11 changed files with 827 additions and 89 deletions

View File

@ -29,6 +29,7 @@ struct msp_window{
uint32_t rtt;
uint16_t next_seq; // seq of next expected TX or RX packet.
time_ms_t last_activity;
time_ms_t last_packet;
struct msp_packet *_head, *_tail;
};
@ -48,7 +49,9 @@ static void msp_stream_init(struct msp_stream *stream)
// TODO set base rtt to ensure that we send the first packet a few times before giving up
stream->tx.base_rtt = stream->tx.rtt = 0xFFFFFFFF;
stream->tx.last_activity = TIME_MS_NEVER_HAS;
stream->tx.last_packet = TIME_MS_NEVER_HAS;
stream->rx.last_activity = TIME_MS_NEVER_HAS;
stream->rx.last_packet = TIME_MS_NEVER_HAS;
stream->next_action = TIME_MS_NEVER_WILL;
stream->timeout = gettime_ms() + 10000;
stream->previous_ack = 0x7FFF;
@ -184,7 +187,7 @@ static size_t msp_write_preamble(uint8_t *header, struct msp_stream *stream, str
DEBUGF(msp, "With packet flags %02x seq %02x len %zd",
header[0], packet->seq, packet->len);
packet->sent = stream->tx.last_activity;
packet->sent = stream->tx.last_packet = stream->tx.last_activity;
return MSP_PAYLOAD_PREAMBLE_SIZE;
}
@ -333,6 +336,7 @@ static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload,
stream->state |= MSP_STATE_RECEIVED_DATA;
uint16_t seq = read_uint16(&payload[3]);
stream->rx.last_packet = stream->rx.last_activity;
if (compare_wrapped_uint16(seq, stream->rx.next_seq)>=0){
if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1)
stream->next_ack = now;

View File

@ -83,6 +83,26 @@ time_ms_t msp_iterator_close(struct msp_iterator *iterator)
return next_action;
}
struct msp_server_state * msp_next_closed(struct msp_iterator *iterator)
{
struct msp_server_state *ptr = iterator->_next;
while(1){
if (ptr){
ptr = ptr->_next;
}else{
ptr = *iterator->_root;
}
if (!ptr){
iterator->_next = ptr;
return NULL;
}
if (ptr->stream.state & MSP_STATE_CLOSED){
iterator->_next = ptr;
return ptr;
}
}
}
static void send_frame(struct msp_server_state *state, struct overlay_buffer *payload)
{
struct internal_mdp_header response_header;
@ -126,23 +146,26 @@ struct msp_server_state * msp_process_next(struct msp_iterator *iterator)
while(1){
if (ptr){
struct msp_packet *packet = ptr->stream.tx._head;
ptr->stream.next_action = ptr->stream.timeout;
time_ms_t next_packet = TIME_MS_NEVER_WILL;
ptr->stream.next_action = ptr->stream.timeout;
while(packet){
if (packet->sent + RETRANSMIT_TIME < now)
if (packet->sent + RETRANSMIT_TIME <= now)
// (re)transmit this packet
send_packet(ptr, packet);
if (ptr->stream.next_action > packet->sent + RETRANSMIT_TIME)
ptr->stream.next_action = packet->sent + RETRANSMIT_TIME;
if (next_packet > packet->sent + RETRANSMIT_TIME)
next_packet = packet->sent + RETRANSMIT_TIME;
packet=packet->_next;
}
// should we send an ack now without sending a payload?
if (now > ptr->stream.next_ack)
if (now >= ptr->stream.next_ack)
send_ack(ptr);
if (ptr->stream.next_action > next_packet)
ptr->stream.next_action = next_packet;
if (ptr->stream.next_action > ptr->stream.next_ack)
ptr->stream.next_action = ptr->stream.next_ack;

View File

@ -24,8 +24,9 @@ struct subscriber * msp_remote_peer(struct msp_server_state *state);
int msp_can_send(struct msp_server_state *state);
int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator);
time_ms_t msp_iterator_close(struct msp_iterator *iterator);
struct msp_server_state * msp_process_next(struct msp_iterator *iterator);
struct msp_server_state * msp_next_closed(struct msp_iterator *iterator);
time_ms_t msp_iterator_close(struct msp_iterator *iterator);
int msp_send_packet(struct msp_server_state *state, const uint8_t *payload, size_t len);
int msp_shutdown_stream(struct msp_server_state *state);

View File

@ -66,6 +66,7 @@ struct subscriber{
// rhizome sync state
struct rhizome_sync *sync_state;
struct rhizome_sync_keys *sync_keys_state;
uint8_t sync_version;
// result of routing calculations;

View File

@ -623,6 +623,7 @@ int rhizome_is_manifest_interesting(rhizome_manifest *m);
enum rhizome_bundle_status rhizome_retrieve_manifest(const rhizome_bid_t *bid, rhizome_manifest *m);
enum rhizome_bundle_status rhizome_retrieve_manifest_by_prefix(const unsigned char *prefix, unsigned prefix_len, rhizome_manifest *m);
enum rhizome_bundle_status rhizome_retrieve_manifest_by_hash_prefix(const uint8_t *prefix, unsigned prefix_len, rhizome_manifest *m);
enum rhizome_bundle_status rhizome_retrieve_bar_by_hash_prefix(const uint8_t *prefix, unsigned prefix_len, rhizome_bar_t *bar);
int rhizome_advertise_manifest(struct subscriber *dest, rhizome_manifest *m);
int rhizome_mdp_send_block(struct subscriber *dest, const rhizome_bid_t *bid, uint64_t version, uint64_t fileOffset, uint32_t bitmap, uint16_t blockLength);
int rhizome_delete_bundle(const rhizome_bid_t *bidp);

View File

@ -1778,6 +1778,52 @@ enum rhizome_bundle_status rhizome_retrieve_manifest_by_hash_prefix(const uint8_
return ret;
}
enum rhizome_bundle_status rhizome_retrieve_bar_by_hash_prefix(const uint8_t *prefix, unsigned prefix_len, rhizome_bar_t *bar)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
const unsigned prefix_strlen = prefix_len * 2;
char like[prefix_strlen + 2];
tohex(like, prefix_strlen, prefix);
like[prefix_strlen] = '%';
like[prefix_strlen + 1] = '\0';
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
"SELECT bar FROM manifests WHERE manifest_hash like ?",
TEXT, like,
END);
if (!statement)
return RHIZOME_BUNDLE_STATUS_ERROR;
enum rhizome_bundle_status ret;
int r=sqlite_step_retry(&retry, statement);
if (sqlite_code_busy(r)){
ret = RHIZOME_BUNDLE_STATUS_BUSY;
goto end;
}
if (!sqlite_code_ok(r)){
ret = RHIZOME_BUNDLE_STATUS_ERROR;
goto end;
}
if (r!=SQLITE_ROW){
ret = RHIZOME_BUNDLE_STATUS_NEW;
goto end;
}
const uint8_t *db_bar = sqlite3_column_blob(statement, 0);
size_t bar_size = sqlite3_column_bytes(statement, 0);
if (bar_size != RHIZOME_BAR_BYTES){
ret = RHIZOME_BUNDLE_STATUS_ERROR;
goto end;
}
bcopy(db_bar, bar, RHIZOME_BAR_BYTES);
ret = RHIZOME_BUNDLE_STATUS_SAME;
end:
sqlite3_finalize(statement);
return ret;
}
static int rhizome_delete_manifest_retry(sqlite_retry_state *retry, const rhizome_bid_t *bidp)
{
sqlite3_stmt *statement = sqlite_prepare_bind(retry,

View File

@ -1110,7 +1110,7 @@ static int pipe_journal(struct rhizome_fetch_slot *slot){
assert(slot->previous->tail != RHIZOME_SIZE_UNSET);
assert(slot->previous->filesize != RHIZOME_SIZE_UNSET);
uint64_t start = slot->manifest->tail - slot->previous->tail + slot->write_state.file_offset;
uint64_t length = slot->previous->filesize - slot->manifest->tail - slot->write_state.file_offset;
uint64_t length = slot->previous->filesize - start;
// of course there might not be any overlap
if (start < slot->previous->filesize && length>0){

View File

@ -4,6 +4,7 @@
#include "overlay_buffer.h"
#include "overlay_packet.h"
#include "mdp_client.h"
#include "msp_server.h"
#include "log.h"
#include "debug.h"
#include "conf.h"
@ -11,8 +12,353 @@
#include "fdqueue.h"
#include "overlay_interface.h"
#include "route_link.h"
#include "mem.h"
#define STATE_SEND (1)
#define STATE_REQ (2)
#define STATE_RECV (3)
#define STATE_BAR (4)
#define STATE_MANIFEST (8)
#define STATE_PAYLOAD (12)
#define STATE_NONE (0)
#define STATE_SEND_BAR (STATE_SEND|STATE_BAR)
#define STATE_REQ_MANIFEST (STATE_REQ|STATE_MANIFEST)
#define STATE_SEND_MANIFEST (STATE_SEND|STATE_MANIFEST)
#define STATE_REQ_PAYLOAD (STATE_REQ|STATE_PAYLOAD)
#define STATE_SEND_PAYLOAD (STATE_SEND|STATE_PAYLOAD)
#define STATE_RECV_PAYLOAD (STATE_RECV|STATE_PAYLOAD)
// approx size of a signed manifest
#define DUMMY_MANIFEST_SIZE 256
struct transfers{
struct transfers *next;
sync_key_t key;
uint8_t state;
uint8_t rank;
rhizome_manifest *manifest;
size_t req_len;
union{
struct rhizome_read *read;
struct rhizome_write *write;
rhizome_bar_t bar;
};
};
struct rhizome_sync_keys{
struct transfers *queue;
struct msp_server_state *connection;
};
#define MAX_REQUEST_BYTES (16*1024)
struct sync_state *sync_tree=NULL;
struct msp_server_state *sync_connections=NULL;
static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){
if (!peer->sync_keys_state)
peer->sync_keys_state = emalloc_zero(sizeof(struct rhizome_sync_keys));
return peer->sync_keys_state;
}
static const char *get_state_name(uint8_t state)
{
switch(state){
case STATE_NONE: return "NONE";
case STATE_SEND_BAR: return "SEND_BAR";
case STATE_REQ_MANIFEST: return "REQ_MANIFEST";
case STATE_SEND_MANIFEST: return "SEND_MANIFEST";
case STATE_REQ_PAYLOAD: return "REQ_PAYLOAD";
case STATE_SEND_PAYLOAD: return "SEND_PAYLOAD";
case STATE_RECV_PAYLOAD: return "RECV_PAYLOAD";
}
return "Unknown";
}
static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr)
{
DEBUGF(rhizome_sync_keys, "Clearing %s %s", get_state_name(ptr->state), alloca_sync_key(&ptr->key));
switch (ptr->state){
case STATE_SEND_PAYLOAD:
if (ptr->read){
rhizome_read_close(ptr->read);
free(ptr->read);
}
ptr->read=NULL;
break;
case STATE_RECV_PAYLOAD:
if (ptr->write){
rhizome_fail_write(ptr->write);
free(ptr->write);
}
ptr->write=NULL;
break;
}
ptr->state=STATE_NONE;
}
#define clear_transfer(P) _clear_transfer(__WHENCE__,P)
static struct transfers **find_and_update_transfer(struct rhizome_sync_keys *keys_state, const sync_key_t *key, uint8_t state, int rank)
{
struct transfers **ptr = &keys_state->queue;
while(*ptr){
if (memcmp(key, &(*ptr)->key, sizeof(sync_key_t))==0){
if (state){
if ((*ptr)->state && (*ptr)->state!=state){
DEBUGF(rhizome_sync_keys, "Updating state from %s to %s %s",
get_state_name((*ptr)->state), get_state_name(state), alloca_sync_key(key));
clear_transfer(*ptr);
}
(*ptr)->state = state;
}
return ptr;
}
if (rank>=0 && (*ptr)->rank > rank)
break;
ptr = &(*ptr)->next;
}
if (rank<0)
return NULL;
struct transfers *ret = emalloc_zero(sizeof(struct transfers));
ret->key = *key;
ret->rank = rank;
ret->state = state;
ret->next = (*ptr);
(*ptr) = ret;
DEBUGF(rhizome_sync_keys, "Queued transfer message %s %s", get_state_name(ret->state), alloca_sync_key(key));
return ptr;
}
static void sync_key_diffs(void *UNUSED(context), void *peer_context, const sync_key_t *key, uint8_t ours)
{
struct subscriber *peer = (struct subscriber *)peer_context;
struct rhizome_sync_keys *sync_keys = get_peer_sync_state(peer);
struct transfers **transfer = find_and_update_transfer(sync_keys, key, 0, -1);
DEBUGF(rhizome_sync_keys, "Peer %s %s %s %s",
alloca_tohex_sid_t(peer->sid),
ours?"missing":"has",
alloca_sync_key(key),
transfer?get_state_name((*transfer)->state):"No transfer");
if (transfer){
struct transfers *msg = *transfer;
switch(msg->state){
case STATE_REQ_PAYLOAD:
DEBUGF(rhizome_sync_keys, " - Requesting payload [%zu of %zu]", msg->write->file_offset, msg->write->file_length);
break;
case STATE_SEND_PAYLOAD:
DEBUGF(rhizome_sync_keys, " - Sending payload [%zu of %zu]", msg->read->offset, msg->read->length);
break;
case STATE_RECV_PAYLOAD:
DEBUGF(rhizome_sync_keys, " - Receiving payload [%zu of %zu]", msg->write->file_offset, msg->write->file_length);
break;
}
}
}
DEFINE_ALARM(sync_keys_status);
void sync_keys_status(struct sched_ent *alarm)
{
if (!IF_DEBUG(rhizome_sync_keys))
return;
DEBUGF(rhizome_sync_keys, "Sync state;");
sync_enum_differences(sync_tree, sync_key_diffs);
time_ms_t next = gettime_ms()+1000;
RESCHEDULE(alarm, next, next, next);
}
static void sync_send_peer(struct rhizome_sync_keys *sync_state)
{
size_t mtu = MSP_MESSAGE_SIZE; // FIX ME, use link mtu?
struct overlay_buffer *payload=NULL;
uint8_t buff[mtu];
// send requests for more data, stop when we hit MAX_REQUEST_BYTES
// Note that requests are ordered by rank,
// so we will still request a high rank item even if there is a low ranked item being received
struct transfers **ptr = &sync_state->queue;
size_t requested_bytes = 0;
while((*ptr) && msp_can_send(sync_state->connection) && requested_bytes < MAX_REQUEST_BYTES){
struct transfers *msg = *ptr;
if (msg->state == STATE_RECV_PAYLOAD){
requested_bytes+=msg->req_len;
}else if ((msg->state & 3) == STATE_REQ){
if (!payload){
payload = ob_static(buff, sizeof(buff));
ob_limitsize(payload, sizeof(buff));
}
DEBUGF(rhizome_sync_keys, "Sending sync messsage %s %s", get_state_name(msg->state), alloca_sync_key(&msg->key));
ob_append_byte(payload, msg->state);
ob_append_bytes(payload, msg->key.key, sizeof(msg->key));
ob_append_byte(payload, msg->rank);
// start from the specified file offset (eg journals, but one day perhaps resuming transfers)
if (msg->state == STATE_REQ_PAYLOAD){
ob_append_packed_ui64(payload, msg->write->file_offset);
ob_append_packed_ui64(payload, msg->req_len);
}
if (ob_overrun(payload)){
ob_rewind(payload);
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_clear(payload);
ob_limitsize(payload, sizeof(buff));
}else{
ob_checkpoint(payload);
requested_bytes+=msg->req_len;
if (msg->state == STATE_REQ_PAYLOAD){
// keep hold of the manifest pointer
msg->state = STATE_RECV_PAYLOAD;
}else{
*ptr = msg->next;
clear_transfer(msg);
if (msg->manifest)
rhizome_manifest_free(msg->manifest);
msg->manifest=NULL;
free(msg);
continue;
}
}
}
ptr = &msg->next;
}
// now send requested data
ptr = &sync_state->queue;
while((*ptr) && msp_can_send(sync_state->connection)){
struct transfers *msg = *ptr;
if ((msg->state & 3) != STATE_SEND){
ptr = &msg->next;
continue;
}
if (!payload){
payload = ob_static(buff, sizeof(buff));
ob_limitsize(payload, sizeof(buff));
}
uint8_t msg_complete=1;
uint8_t send_payload=0;
DEBUGF(rhizome_sync_keys, "Sending sync messsage %s %s", get_state_name(msg->state), alloca_sync_key(&msg->key));
ob_append_byte(payload, msg->state);
ob_append_bytes(payload, msg->key.key, sizeof(msg->key));
switch(msg->state){
case STATE_SEND_BAR:{
ob_append_bytes(payload, msg->bar.binary, sizeof(msg->bar));
break;
}
case STATE_SEND_MANIFEST:{
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
ob_rewind(payload);
// TODO fragment manifests
assert(ob_position(payload));
msg_complete = 0;
}else{
if (rhizome_retrieve_manifest_by_hash_prefix(msg->key.key, sizeof(msg->key), m)==RHIZOME_BUNDLE_STATUS_SAME){
ob_append_bytes(payload, m->manifestdata, m->manifest_all_bytes);
send_payload=1;
}else{
ob_rewind(payload);
}
rhizome_manifest_free(m);
}
break;
}
case STATE_SEND_PAYLOAD:{
size_t max_len = ob_remaining(payload);
if (max_len > msg->req_len)
max_len = msg->req_len;
ssize_t payload_len = rhizome_read(msg->read, ob_current_ptr(payload), max_len);
if (payload_len==-1){
ob_rewind(payload);
}else{
ob_append_space(payload, payload_len);
send_payload=1;
}
DEBUGF(rhizome_sync_keys, "Sending %s %zd bytes (now %zd of %zd)",
alloca_sync_key(&msg->key), payload_len, msg->read->offset, msg->read->length);
msg->req_len -= payload_len;
if (msg->read->offset < msg->read->length && msg->req_len>0)
msg_complete=0;
break;
}
default:
FATALF("Unexpected state %x", msg->state);
}
if (ob_overrun(payload)){
ob_rewind(payload);
msg_complete=0;
send_payload=1;
}else{
ob_checkpoint(payload);
}
if (send_payload){
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_clear(payload);
ob_limitsize(payload, sizeof(buff));
}
if (msg_complete){
*ptr = msg->next;
clear_transfer(msg);
if (msg->manifest)
rhizome_manifest_free(msg->manifest);
msg->manifest=NULL;
free(msg);
}
// else, try to send another chunk of this payload immediately
}
if (payload){
if (ob_position(payload))
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_free(payload);
}
}
DEFINE_ALARM(sync_send);
void sync_send(struct sched_ent *alarm)
{
struct msp_iterator iterator;
msp_iterator_open(&sync_connections, &iterator);
while(1){
struct msp_server_state *connection = msp_process_next(&iterator);
if (!connection)
break;
struct subscriber *peer = msp_remote_peer(connection);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
sync_state->connection = connection;
sync_send_peer(sync_state);
}
while(1){
struct msp_server_state *connection = msp_next_closed(&iterator);
if (!connection)
break;
struct subscriber *peer = msp_remote_peer(connection);
DEBUGF(rhizome_sync_keys, "Connection closed %s", alloca_tohex_sid_t(peer->sid));
// TODO if the msp connection breaks before sync complete, free the full sync state
}
time_ms_t next_action = msp_iterator_close(&iterator);
RESCHEDULE(alarm, next_action, next_action, next_action);
}
static void sync_peer_has (void * UNUSED(context), void *peer_context, const sync_key_t *key)
{
@ -23,7 +369,7 @@ static void sync_peer_has (void * UNUSED(context), void *peer_context, const syn
alloca_tohex_sid_t(peer->sid),
alloca_sync_key(key));
// TODO queue transfers, with retry
// noop, just wait for the BAR to arrive.
}
static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key)
@ -36,19 +382,27 @@ static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context,
alloca_tohex_sid_t(peer->sid),
alloca_sync_key(key));
// TODO queue these advertisements based on rank!
// queue BAR for transmission
rhizome_bar_t bar;
if (rhizome_retrieve_bar_by_hash_prefix(key->key, sizeof(*key), &bar)==RHIZOME_BUNDLE_STATUS_SAME){
uint8_t log_size = rhizome_bar_log_size(&bar);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return;
struct transfers *send_bar = *find_and_update_transfer(sync_state, key, STATE_SEND_BAR, log_size);
send_bar->bar = bar;
if (rhizome_retrieve_manifest_by_hash_prefix(key->key, sizeof(*key), m)==RHIZOME_BUNDLE_STATUS_SAME){
rhizome_advertise_manifest(peer, m);
// pre-emptively send the payload if it will fit in a single packet
if (m->filesize > 0 && m->filesize <= 1024)
rhizome_mdp_send_block(peer, &m->cryptoSignPublic, m->version, 0, 0, m->filesize);
if (!sync_state->connection)
sync_state->connection = msp_find_or_connect(&sync_connections,
peer, MDP_PORT_RHIZOME_SYNC_KEYS,
my_subscriber, MDP_PORT_RHIZOME_SYNC_KEYS);
time_ms_t next_action = msp_next_action(sync_state->connection);
struct sched_ent *alarm=&ALARM_STRUCT(sync_send);
if (next_action < alarm->alarm || !is_scheduled(alarm))
RESCHEDULE(alarm, next_action, next_action, next_action);
}
rhizome_manifest_free(m);
return;
}
static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key)
@ -83,8 +437,8 @@ static void build_tree()
sqlite3_finalize(statement);
}
DEFINE_ALARM(sync_send_data);
void sync_send_data(struct sched_ent *alarm)
DEFINE_ALARM(sync_send_keys);
void sync_send_keys(struct sched_ent *alarm)
{
if (!sync_tree)
build_tree();
@ -94,8 +448,10 @@ void sync_send_data(struct sched_ent *alarm)
if (len==0)
return;
if (IF_DEBUG(rhizome_sync_keys)){
DEBUG(rhizome_sync_keys,"Sending message");
dump("Raw message", buff, len);
//dump("Raw message", buff, len);
}
struct overlay_buffer *payload = ob_static(buff, sizeof(buff));
ob_limitsize(payload, len);
@ -114,14 +470,249 @@ void sync_send_data(struct sched_ent *alarm)
time_ms_t now = gettime_ms();
if (sync_has_transmit_queued(sync_tree)){
DEBUG(rhizome_sync_keys,"Queueing next message for 5ms");
RESCHEDULE(alarm, now+5, now+5, now+5);
DEBUG(rhizome_sync_keys,"Queueing next message for now");
RESCHEDULE(alarm, now, now, now);
}else{
DEBUG(rhizome_sync_keys,"Queueing next message for 5s");
RESCHEDULE(alarm, now+5000, now+30000, TIME_MS_NEVER_WILL);
}
}
static void process_transfer_message(struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload)
{
while(ob_remaining(payload)){
ob_checkpoint(payload);
int msg_state = ob_get(payload);
if (msg_state<0)
return;
sync_key_t key;
if (ob_get_bytes(payload, key.key, sizeof key)<0)
return;
int rank=-1;
if (msg_state & STATE_REQ){
rank = ob_get(payload);
if (rank < 0)
return;
}
DEBUGF(rhizome_sync_keys, "Processing sync message %s %s %d",
get_state_name(msg_state), alloca_sync_key(&key), rank);
switch(msg_state){
case STATE_SEND_BAR:{
rhizome_bar_t bar;
if (ob_get_bytes(payload, bar.binary, sizeof(rhizome_bar_t))<0)
return;
if (!config.rhizome.fetch)
break;
if (!rhizome_is_bar_interesting(&bar)){
DEBUGF(rhizome_sync_keys, "Ignoring BAR for %s, (Uninteresting)",
alloca_sync_key(&key));
break;
}
// send a request for the manifest
rank = rhizome_bar_log_size(&bar);
struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_MANIFEST, rank);
transfer->req_len = DUMMY_MANIFEST_SIZE;
break;
}
case STATE_REQ_MANIFEST:{
// queue the transmission of the manifest
find_and_update_transfer(sync_state, &key, STATE_SEND_MANIFEST, rank);
break;
}
case STATE_SEND_MANIFEST:{
// process the incoming manifest
size_t len = ob_remaining(payload);
uint8_t *data = ob_get_bytes_ptr(payload, len);
if (!config.rhizome.fetch)
break;
struct rhizome_manifest_summary summ;
if (!rhizome_manifest_inspect((char *)data, len, &summ)){
WHYF("Ignoring manifest for %s, (Malformed)",
alloca_sync_key(&key));
break;
}
// The manifest looks potentially interesting, so now do a full parse and validation.
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
// don't consume the payload
ob_rewind(payload);
return;
}
memcpy(m->manifestdata, data, len);
m->manifest_all_bytes = len;
if ( rhizome_manifest_parse(m) == -1
|| !rhizome_manifest_validate(m)
) {
WHYF("Ignoring manifest for %s, (Malformed)",
alloca_sync_key(&key));
rhizome_manifest_free(m);
break;
}
if (!rhizome_is_manifest_interesting(m)){
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (Uninteresting)",
alloca_sync_key(&key));
rhizome_manifest_free(m);
break;
}
// start writing the payload
enum rhizome_payload_status status;
struct rhizome_write *write = emalloc_zero(sizeof(struct rhizome_write));
if (m->filesize==0){
status = RHIZOME_PAYLOAD_STATUS_STORED;
}else{
status = rhizome_open_write(write, &m->filehash, m->filesize);
}
if (status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_status = rhizome_add_manifest_to_store(m, NULL);
DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status));
rhizome_manifest_free(m);
free(write);
break;
}else if (status!=RHIZOME_PAYLOAD_STATUS_NEW){
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
rhizome_manifest_free(m);
free(write);
break;
}
if (m->is_journal){
// if we're fetching a journal bundle, copy any bytes we have of a previous version
// and therefore work out what range of bytes we still need
rhizome_manifest *previous = rhizome_new_manifest();
if (rhizome_retrieve_manifest(&m->cryptoSignPublic, previous)==RHIZOME_BUNDLE_STATUS_SAME &&
previous->is_journal &&
previous->tail <= m->tail &&
previous->filesize + previous->tail > m->tail
){
uint64_t start = m->tail - previous->tail;
uint64_t length = previous->filesize - start;
// required by tests;
DEBUGF(rhizome_sync_keys, "%s Copying %"PRId64" bytes from previous journal", alloca_sync_key(&key), length);
rhizome_journal_pipe(write, &previous->filehash, start, length);
}
rhizome_manifest_free(previous);
if (write->file_offset >= m->filesize){
// no new content in the new version, we can import now
enum rhizome_payload_status status = rhizome_finish_write(write);
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status);
free(write);
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(m, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state));
}
rhizome_manifest_free(m);
break;
}
}
// TODO improve rank algo here;
rank = log2ll(m->filesize - write->file_offset);
struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_PAYLOAD, rank);
transfer->manifest = m;
transfer->req_len = m->filesize - write->file_offset;
transfer->write = write;
break;
}
case STATE_REQ_PAYLOAD:{
// open the payload for reading
uint64_t offset = ob_get_packed_ui64(payload);
uint64_t length = ob_get_packed_ui64(payload);
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
ob_rewind(payload);
return;
}
if (rhizome_retrieve_manifest_by_hash_prefix(key.key, sizeof(sync_key_t), m)!=RHIZOME_BUNDLE_STATUS_SAME){
rhizome_manifest_free(m);
break;
}
struct rhizome_read *read = emalloc_zero(sizeof (struct rhizome_read));
if (rhizome_open_read(read, &m->filehash) != RHIZOME_PAYLOAD_STATUS_STORED){
free(read);
rhizome_manifest_free(m);
break;
}
rhizome_manifest_free(m);
struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_SEND_PAYLOAD, rank);
transfer->read = read;
transfer->req_len = length;
read->offset = offset;
break;
}
case STATE_SEND_PAYLOAD:{
size_t len = ob_remaining(payload);
uint8_t *buff = ob_get_bytes_ptr(payload, len);
struct transfers **ptr = find_and_update_transfer(sync_state, &key, STATE_RECV_PAYLOAD, -1);
if (!ptr){
WHYF("Ignoring message for %s, no transfer in progress!", alloca_sync_key(&key));
break;
}
struct transfers *transfer = *ptr;
transfer->req_len -= len;
uint8_t all_done = 0;
if (rhizome_write_buffer(transfer->write, buff, len)==-1){
WHYF("Write failed for %s!", alloca_sync_key(&key));
all_done=1;
}else{
DEBUGF(rhizome_sync_keys, "Wrote to %s %zu, now %zu of %zu",
alloca_sync_key(&key), len, transfer->write->file_offset, transfer->write->file_length);
if (transfer->write->file_offset >= transfer->write->file_length){
enum rhizome_payload_status status = rhizome_finish_write(transfer->write);
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status);
free(transfer->write);
transfer->write = NULL;
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state));
}
all_done=1;
}
}
if (all_done){
if (transfer->manifest)
rhizome_manifest_free(transfer->manifest);
transfer->manifest=NULL;
clear_transfer(transfer);
*ptr = transfer->next;
free(transfer);
}
break;
}
default:
WHYF("Unknown message type %x", msg_state);
}
}
}
DEFINE_BINDING(MDP_PORT_RHIZOME_SYNC_KEYS, sync_keys_recv);
static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
@ -134,19 +725,50 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
header->source->sync_version = 1;
if (!header->destination){
if (IF_DEBUG(rhizome_sync_keys)){
DEBUGF(rhizome_sync_keys,"Processing message from %s", alloca_tohex_sid_t(header->source->sid));
dump("Raw message", ob_current_ptr(payload), ob_remaining(payload));
//dump("Raw message", ob_current_ptr(payload), ob_remaining(payload));
}
sync_recv_message(sync_tree, header->source, ob_current_ptr(payload), ob_remaining(payload));
if (sync_has_transmit_queued(sync_tree)){
struct sched_ent *alarm=&ALARM_STRUCT(sync_send_data);
struct sched_ent *alarm=&ALARM_STRUCT(sync_send_keys);
time_ms_t next = gettime_ms() + 5;
if (alarm->alarm > next){
if (alarm->alarm > next || !is_scheduled(alarm)){
DEBUG(rhizome_sync_keys,"Queueing next message for 5ms");
RESCHEDULE(alarm, next, next, next);
}
}
if (IF_DEBUG(rhizome_sync_keys)){
struct sched_ent *alarm=&ALARM_STRUCT(sync_keys_status);
if (alarm->alarm == TIME_MS_NEVER_WILL){
time_ms_t next = gettime_ms() + 1000;
RESCHEDULE(alarm, next, next, next);
}
}
}else{
// TODO
struct msp_server_state *connection_state = msp_find_and_process(&sync_connections, header, payload);
if (connection_state){
struct rhizome_sync_keys *sync_state = get_peer_sync_state(header->source);
sync_state->connection = connection_state;
while(1){
struct msp_packet *packet = msp_recv_next(connection_state);
if (!packet)
break;
struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet);
process_transfer_message(sync_state, recv_payload);
msp_consumed(connection_state, packet, recv_payload);
}
sync_send_peer(sync_state);
time_ms_t next_action = msp_next_action(connection_state);
struct sched_ent *alarm=&ALARM_STRUCT(sync_send);
if (alarm->alarm > next_action || !is_scheduled(alarm))
RESCHEDULE(alarm, next_action, next_action, next_action);
}
}
return 0;
@ -154,7 +776,7 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
static void sync_neighbour_changed(struct subscriber *UNUSED(neighbour), uint8_t UNUSED(found), unsigned count)
{
struct sched_ent *alarm = &ALARM_STRUCT(sync_send_data);
struct sched_ent *alarm = &ALARM_STRUCT(sync_send_keys);
if (count>0 && is_rhizome_advertise_enabled()){
time_ms_t now = gettime_ms();
@ -166,6 +788,7 @@ static void sync_neighbour_changed(struct subscriber *UNUSED(neighbour), uint8_t
DEBUG(rhizome_sync_keys,"Stop queueing messages");
RESCHEDULE(alarm, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
}
// nuke sync state for this peer now?
}
DEFINE_TRIGGER(nbr_change, sync_neighbour_changed);
@ -183,9 +806,9 @@ static void sync_bundle_add(rhizome_manifest *m)
sync_add_key(sync_tree, &key, NULL);
if (link_has_neighbours()){
struct sched_ent *alarm = &ALARM_STRUCT(sync_send_data);
struct sched_ent *alarm = &ALARM_STRUCT(sync_send_keys);
time_ms_t next = gettime_ms()+5;
if (alarm->alarm > next){
if (alarm->alarm > next || !is_scheduled(alarm)){
DEBUG(rhizome_sync_keys,"Queueing next message for 5ms");
RESCHEDULE(alarm, next, next, next);
}

View File

@ -146,7 +146,8 @@ static void xor_children(struct node *node, key_message_t *dest)
if (node->message.prefix_len == KEY_LEN_BITS){
sync_xor(&node->message.key, dest);
}else{
for (unsigned i=0;i<NODE_CHILDREN;i++){
unsigned i;
for (i=0;i<NODE_CHILDREN;i++){
if (node->children[i])
xor_children(node->children[i], dest);
}
@ -222,8 +223,8 @@ static void free_node(struct sync_state *state, struct node *node)
{
if (!node)
return;
for (unsigned i=0;i<NODE_CHILDREN;i++)
unsigned i;
for (i=0;i<NODE_CHILDREN;i++)
free_node(state, node->children[i]);
if (node->transmit_next){
@ -284,7 +285,8 @@ static void remove_key(struct sync_state *state, struct node **root, const sync_
node = NULL;
// If *parent has <= 1 child now, we need to remove *parent as well
for (unsigned i=0;i<NODE_CHILDREN;i++){
unsigned i;
for (i=0;i<NODE_CHILDREN;i++){
if ((*parent)->children[i]){
if (node)
return;
@ -556,7 +558,8 @@ static void peer_missing_leaf_nodes(
if (peer_is_missing(state, peer, node, allow_remove))
queue_node(state, node, 1);
}else{
for (unsigned i=0;i<NODE_CHILDREN;i++){
unsigned i;
for (i=0;i<NODE_CHILDREN;i++){
if (i!=except && node->children[i])
peer_missing_leaf_nodes(state, peer, node->children[i], NODE_CHILDREN, allow_remove);
}
@ -608,7 +611,8 @@ static unsigned peer_has_received_all(struct sync_state *state, struct sync_peer
// duplicate the child pointers, as removing an immediate child key *will* also free this peer node.
struct node *children[NODE_CHILDREN];
memcpy(children, peer_node->children, sizeof(children));
for (unsigned i=0;i<NODE_CHILDREN;i++)
unsigned i;
for (i=0;i<NODE_CHILDREN;i++)
ret+=peer_has_received_all(state, peer_state, children[i]);
}
return ret;
@ -701,7 +705,8 @@ static int recv_key(struct sync_state *state, struct sync_peer_state *peer_state
struct node *node = state->root;
uint8_t prefix_len = 0;
uint8_t is_blank = 1;
for (unsigned i=(peer_message.prefix_len>>3)+1;i<KEY_LEN && is_blank;i++)
unsigned i;
for (i=(peer_message.prefix_len>>3)+1;i<KEY_LEN && is_blank;i++)
if (peer_message.key.key[i])
is_blank = 0;
@ -772,7 +777,8 @@ static int recv_key(struct sync_state *state, struct sync_peer_state *peer_state
}
// queue the transmission of all child nodes of this node
for (unsigned i=0;i<NODE_CHILDREN;i++){
unsigned i;
for (i=0;i<NODE_CHILDREN;i++){
if (node->children[i])
queue_node(state, node->children[i], 0);
}
@ -879,3 +885,27 @@ int sync_recv_message(struct sync_state *state, void *peer_context, const uint8_
return 0;
}
static void enum_diffs(struct sync_state *state, struct sync_peer_state *peer_state, struct node *node,
void (*callback)(void *context, void *peer_context, const sync_key_t *key, uint8_t theirs))
{
if (!node)
return;
if (node->message.prefix_len == KEY_LEN_BITS){
callback(state->context, peer_state->peer_context, &node->message.key, node->message.stored);
}else{
unsigned i;
for (i=0;i<NODE_CHILDREN;i++){
enum_diffs(state, peer_state, node->children[i], callback);
}
}
}
void sync_enum_differences(struct sync_state *state,
void (*callback)(void *context, void *peer_context, const sync_key_t *key, uint8_t theirs))
{
struct sync_peer_state *peer_state = state->peers;
while(peer_state){
enum_diffs(state, peer_state, peer_state->root, callback);
peer_state = peer_state->next;
}
}

View File

@ -39,5 +39,7 @@ size_t sync_build_message(struct sync_state *state, uint8_t *buff, size_t len);
// process a message received from a peer.
int sync_recv_message(struct sync_state *state, void *peer_context, const uint8_t *buff, size_t len);
void sync_enum_differences(struct sync_state *state,
void (*callback)(void *context, void *peer_context, const sync_key_t *key, uint8_t theirs));
#endif

View File

@ -46,7 +46,8 @@ default_config() {
set debug.rhizome_ads on \
set debug.rhizome_tx on \
set debug.rhizome_rx on \
set debug.rhizome_sync_keys on
set debug.rhizome_sync_keys on \
set debug.msp on
}
# Called by start_servald_instances for each instance.
@ -90,50 +91,6 @@ test_FileTransfer() {
receive_and_update_bundle
}
doc_ManyFiles="Synchronise many small files, with some files in common"
setup_ManyFiles() {
setup_common
bundlesA=()
bundlesB=()
for i in `seq 1 20`
do
tfw_log "Adding common file-$i"
set_instance +A
create_file file-$i 1000
executeOk_servald rhizome add file "${!sidvar}" file-$i file-$i.manifest
set_instance +B
executeOk_servald rhizome import bundle file-$i file-$i.manifest
done
for i in `seq 1 10`
do
set_instance +A
tfw_log "Adding fileA-$i"
create_file fileA-$i 1000
tfw_nolog executeOk_servald rhizome add file "${!sidvar}" fileA-$i fileA-$i.manifest
tfw_nolog extract_stdout_manifestid BID
tfw_nolog extract_stdout_version VERSION
bundlesA+=($BID:$VERSION)
set_instance +B
tfw_log "Adding fileB-$i"
create_file fileB-$i 1000
tfw_nolog executeOk_servald rhizome add file "${!sidvar}" fileB-$i fileB-$i.manifest
tfw_nolog extract_stdout_manifestid BID
tfw_nolog extract_stdout_version VERSION
bundlesB+=($BID:$VERSION)
done
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_ManyFiles() {
wait_until bundle_received_by ${bundlesA[*]} +B ${bundlesB[*]} +A
}
doc_EncryptedTransfer="Encrypted payload can be opened by destination"
setup_EncryptedTransfer() {
setup_common
@ -887,4 +844,54 @@ teardown_SimulatedRadio2() {
tfw_cat "$SERVALD_VAR/radioerr"
}
doc_ManyFiles="Synchronise many small files, with some files in common"
setup_ManyFiles() {
setup_servald
foreach_instance +A +B +C +D create_single_identity
bundlesA=()
bundlesB=()
bundlesC=()
bundlesD=()
for i in `seq 1 5`
do
tfw_log "Adding common file-$i"
tfw_nolog set_instance +A
create_file file-$i 100
tfw_nolog executeOk_servald rhizome add file "${!sidvar}" file-$i file-$i.manifest
tfw_nolog set_instance +B
tfw_nolog executeOk_servald rhizome import bundle file-$i file-$i.manifest
tfw_nolog set_instance +C
tfw_nolog executeOk_servald rhizome import bundle file-$i file-$i.manifest
tfw_nolog set_instance +D
tfw_nolog executeOk_servald rhizome import bundle file-$i file-$i.manifest
done
for i in A B C D
do
eval "bundles$i=()"
set_instance +$i
for j in `seq 1 3`
do
tfw_log "Adding file$i-$j"
create_file file$i-$j $(( $j * 500 ))
tfw_nolog executeOk_servald rhizome add file "${!sidvar}" file$i-$j file$i-$j.manifest
tfw_nolog extract_stdout_manifestid BID
tfw_nolog extract_stdout_version VERSION
eval "bundles$i+=(\$BID:\$VERSION)"
done
done
start_servald_instances +A +B +C +D
}
test_ManyFiles() {
wait_until --timeout=15 bundle_received_by \
${bundlesB[*]} ${bundlesC[*]} ${bundlesD[*]} +A \
${bundlesA[*]} ${bundlesC[*]} ${bundlesD[*]} +B \
${bundlesA[*]} ${bundlesB[*]} ${bundlesD[*]} +C \
${bundlesA[*]} ${bundlesB[*]} ${bundlesC[*]} +D
}
runTests "$@"