serval-dna/rhizome_sync_keys.c

911 lines
28 KiB
C
Raw Normal View History

#include "rhizome.h"
#include "overlay_address.h"
#include "overlay_buffer.h"
#include "overlay_packet.h"
#include "mdp_client.h"
#include "msp_server.h"
#include "log.h"
#include "debug.h"
#include "conf.h"
#include "sync_keys.h"
#include "fdqueue.h"
#include "overlay_interface.h"
#include "route_link.h"
#include "mem.h"
#define STATE_SEND (1)
#define STATE_REQ (2)
#define STATE_RECV (3)
#define STATE_BAR (4)
#define STATE_MANIFEST (8)
#define STATE_PAYLOAD (12)
#define STATE_NONE (0)
#define STATE_SEND_BAR (STATE_SEND|STATE_BAR)
#define STATE_REQ_MANIFEST (STATE_REQ|STATE_MANIFEST)
#define STATE_SEND_MANIFEST (STATE_SEND|STATE_MANIFEST)
#define STATE_REQ_PAYLOAD (STATE_REQ|STATE_PAYLOAD)
#define STATE_SEND_PAYLOAD (STATE_SEND|STATE_PAYLOAD)
#define STATE_RECV_PAYLOAD (STATE_RECV|STATE_PAYLOAD)
// approx size of a signed manifest
#define DUMMY_MANIFEST_SIZE 256
#define REACHABLE_BIAS 2
struct transfers{
struct transfers *next;
sync_key_t key;
uint8_t state;
uint8_t rank;
rhizome_manifest *manifest;
size_t req_len;
union{
struct rhizome_read *read;
struct rhizome_write *write;
rhizome_bar_t bar;
};
};
struct rhizome_sync_keys{
struct transfers *queue;
struct msp_server_state *connection;
};
#define MAX_REQUEST_BYTES (16*1024)
struct sync_state *sync_tree=NULL;
struct msp_server_state *sync_connections=NULL;
DEFINE_ALARM(sync_send);
static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){
if (!peer->sync_keys_state)
peer->sync_keys_state = emalloc_zero(sizeof(struct rhizome_sync_keys));
return peer->sync_keys_state;
}
static const char *get_state_name(uint8_t state)
{
switch(state){
case STATE_NONE: return "NONE";
case STATE_SEND_BAR: return "SEND_BAR";
case STATE_REQ_MANIFEST: return "REQ_MANIFEST";
case STATE_SEND_MANIFEST: return "SEND_MANIFEST";
case STATE_REQ_PAYLOAD: return "REQ_PAYLOAD";
case STATE_SEND_PAYLOAD: return "SEND_PAYLOAD";
case STATE_RECV_PAYLOAD: return "RECV_PAYLOAD";
}
return "Unknown";
}
static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr)
{
DEBUGF(rhizome_sync_keys, "Clearing %s %s", get_state_name(ptr->state), alloca_sync_key(&ptr->key));
switch (ptr->state){
case STATE_SEND_PAYLOAD:
if (ptr->read){
rhizome_read_close(ptr->read);
free(ptr->read);
}
ptr->read=NULL;
break;
case STATE_RECV_PAYLOAD:
if (ptr->write){
rhizome_fail_write(ptr->write);
free(ptr->write);
}
ptr->write=NULL;
break;
}
ptr->state=STATE_NONE;
}
#define clear_transfer(P) _clear_transfer(__WHENCE__,P)
static struct transfers **find_and_update_transfer(struct subscriber *peer, struct rhizome_sync_keys *keys_state, const sync_key_t *key, uint8_t state, int rank)
{
if (rank>0xFF)
rank = 0xFF;
if (state){
if (!keys_state->connection)
keys_state->connection = msp_find_or_connect(&sync_connections,
peer, MDP_PORT_RHIZOME_SYNC_KEYS,
get_my_subscriber(), MDP_PORT_RHIZOME_SYNC_KEYS,
OQ_OPPORTUNISTIC);
if (msp_can_send(keys_state->connection)){
time_ms_t next_action = gettime_ms();
struct sched_ent *alarm=&ALARM_STRUCT(sync_send);
if (next_action < alarm->alarm || !is_scheduled(alarm))
RESCHEDULE(alarm, next_action, next_action, next_action);
}
}
struct transfers **ptr = &keys_state->queue;
while(*ptr){
if (memcmp(key, &(*ptr)->key, sizeof(sync_key_t))==0){
if (state){
if ((*ptr)->state && (*ptr)->state!=state){
DEBUGF(rhizome_sync_keys, "Updating state from %s to %s %s",
get_state_name((*ptr)->state), get_state_name(state), alloca_sync_key(key));
clear_transfer(*ptr);
}
(*ptr)->state = state;
}
return ptr;
}
if (rank>=0 && (*ptr)->rank > rank)
break;
ptr = &(*ptr)->next;
}
if (rank<0)
return NULL;
struct transfers *ret = emalloc_zero(sizeof(struct transfers));
ret->key = *key;
ret->rank = rank;
ret->state = state;
ret->next = (*ptr);
(*ptr) = ret;
DEBUGF(rhizome_sync_keys, "Queued transfer message %s %s", get_state_name(ret->state), alloca_sync_key(key));
return ptr;
}
static void sync_key_diffs(void *UNUSED(context), void *peer_context, const sync_key_t *key, uint8_t ours)
{
struct subscriber *peer = (struct subscriber *)peer_context;
struct rhizome_sync_keys *sync_keys = get_peer_sync_state(peer);
struct transfers **transfer = find_and_update_transfer(peer, sync_keys, key, 0, -1);
DEBUGF(rhizome_sync_keys, "Peer %s %s %s %s",
alloca_tohex_sid_t(peer->sid),
ours?"missing":"has",
alloca_sync_key(key),
transfer?get_state_name((*transfer)->state):"No transfer");
if (transfer){
struct transfers *msg = *transfer;
switch(msg->state){
case STATE_REQ_PAYLOAD:
DEBUGF(rhizome_sync_keys, " - Requesting payload [%zu of %zu]", msg->write->file_offset, msg->write->file_length);
break;
case STATE_SEND_PAYLOAD:
DEBUGF(rhizome_sync_keys, " - Sending payload [%zu of %zu]", msg->read->offset, msg->read->length);
break;
case STATE_RECV_PAYLOAD:
DEBUGF(rhizome_sync_keys, " - Receiving payload [%zu of %zu]", msg->write->file_offset, msg->write->file_length);
break;
}
}
}
DEFINE_ALARM(sync_keys_status);
void sync_keys_status(struct sched_ent *alarm)
{
if (!IF_DEBUG(rhizome_sync_keys))
return;
DEBUGF(rhizome_sync_keys, "Sync state;");
sync_enum_differences(sync_tree, sync_key_diffs);
time_ms_t next = gettime_ms()+1000;
RESCHEDULE(alarm, next, next, next);
}
static void sync_send_peer(struct rhizome_sync_keys *sync_state)
{
size_t mtu = MSP_MESSAGE_SIZE; // FIX ME, use link mtu?
struct overlay_buffer *payload=NULL;
uint8_t buff[mtu];
// send requests for more data, stop when we hit MAX_REQUEST_BYTES
// Note that requests are ordered by rank,
// so we will still request a high rank item even if there is a low ranked item being received
struct transfers **ptr = &sync_state->queue;
size_t requested_bytes = 0;
time_ms_t now = gettime_ms();
while((*ptr) && msp_can_send(sync_state->connection) && requested_bytes < MAX_REQUEST_BYTES){
struct transfers *msg = *ptr;
if (msg->state == STATE_RECV_PAYLOAD){
requested_bytes+=msg->req_len;
}else if ((msg->state & 3) == STATE_REQ){
if (!payload){
payload = ob_static(buff, sizeof(buff));
ob_limitsize(payload, sizeof(buff));
}
DEBUGF(rhizome_sync_keys, "Sending sync messsage %s %s", get_state_name(msg->state), alloca_sync_key(&msg->key));
ob_append_byte(payload, msg->state);
ob_append_bytes(payload, msg->key.key, sizeof(msg->key));
ob_append_byte(payload, msg->rank);
// start from the specified file offset (eg journals, but one day perhaps resuming transfers)
if (msg->state == STATE_REQ_PAYLOAD){
ob_append_packed_ui64(payload, msg->write->file_offset);
ob_append_packed_ui64(payload, msg->req_len);
}
if (ob_overrun(payload)){
ob_rewind(payload);
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_clear(payload);
ob_limitsize(payload, sizeof(buff));
}else{
ob_checkpoint(payload);
requested_bytes+=msg->req_len;
if (msg->state == STATE_REQ_PAYLOAD){
// keep hold of the manifest pointer
msg->state = STATE_RECV_PAYLOAD;
}else{
*ptr = msg->next;
clear_transfer(msg);
if (msg->manifest)
rhizome_manifest_free(msg->manifest);
msg->manifest=NULL;
free(msg);
continue;
}
}
}
ptr = &msg->next;
}
// now send requested data
ptr = &sync_state->queue;
while((*ptr) && msp_can_send(sync_state->connection)){
struct transfers *msg = *ptr;
if ((msg->state & 3) != STATE_SEND){
ptr = &msg->next;
continue;
}
if (!payload){
payload = ob_static(buff, sizeof(buff));
ob_limitsize(payload, sizeof(buff));
}
uint8_t msg_complete=1;
uint8_t send_payload=0;
DEBUGF(rhizome_sync_keys, "Sending sync messsage %s %s", get_state_name(msg->state), alloca_sync_key(&msg->key));
ob_append_byte(payload, msg->state);
ob_append_bytes(payload, msg->key.key, sizeof(msg->key));
switch(msg->state){
case STATE_SEND_BAR:{
ob_append_bytes(payload, msg->bar.binary, sizeof(msg->bar));
break;
}
case STATE_SEND_MANIFEST:{
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
ob_rewind(payload);
assert(ob_position(payload));
msg_complete = 0;
}else{
enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(msg->key.key, sizeof(msg->key), m);
switch(status){
case RHIZOME_BUNDLE_STATUS_SAME:
// TODO fragment manifests
ob_append_bytes(payload, m->manifestdata, m->manifest_all_bytes);
send_payload=1;
break;
case RHIZOME_BUNDLE_STATUS_NEW:
// TODO we don't have this bundle anymore!
default:
ob_rewind(payload);
}
rhizome_manifest_free(m);
}
break;
}
case STATE_SEND_PAYLOAD:{
size_t max_len = ob_remaining(payload);
if (max_len > msg->req_len)
max_len = msg->req_len;
ssize_t payload_len = rhizome_read(msg->read, ob_current_ptr(payload), max_len);
if (payload_len==-1){
ob_rewind(payload);
}else{
ob_append_space(payload, payload_len);
send_payload=1;
}
DEBUGF(rhizome_sync_keys, "Sending %s %zd bytes (now %zd of %zd)",
alloca_sync_key(&msg->key), payload_len, msg->read->offset, msg->read->length);
msg->req_len -= payload_len;
if (msg->read->offset < msg->read->length && msg->req_len>0)
msg_complete=0;
break;
}
default:
FATALF("Unexpected state %x", msg->state);
}
if (ob_overrun(payload)){
ob_rewind(payload);
msg_complete=0;
send_payload=1;
}else{
ob_checkpoint(payload);
}
if (send_payload){
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_clear(payload);
ob_limitsize(payload, sizeof(buff));
}
if (msg_complete){
*ptr = msg->next;
clear_transfer(msg);
if (msg->manifest)
rhizome_manifest_free(msg->manifest);
msg->manifest=NULL;
free(msg);
}
// else, try to send another chunk of this payload immediately
}
if (payload){
if (ob_position(payload))
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_free(payload);
}
if (msp_queued_packet_count(sync_state->connection)==0 &&
(msp_get_connection_state(sync_state->connection) & MSP_STATE_RECEIVED_PACKET) &&
now - msp_last_packet(sync_state->connection) > 5000){
DEBUGF(rhizome_sync_keys, "Closing idle connection");
msp_shutdown_stream(sync_state->connection);
}
}
void sync_send(struct sched_ent *alarm)
{
struct msp_iterator iterator;
msp_iterator_open(&sync_connections, &iterator);
while(1){
struct msp_server_state *connection = msp_process_next(&iterator);
if (!connection)
break;
struct subscriber *peer = msp_remote_peer(connection);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
sync_state->connection = connection;
sync_send_peer(sync_state);
}
while(1){
struct msp_server_state *connection = msp_next_closed(&iterator);
if (!connection)
break;
struct subscriber *peer = msp_remote_peer(connection);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
DEBUGF(rhizome_sync_keys, "Connection closed %s", alloca_tohex_sid_t(peer->sid));
// drop all transfer records
while(sync_state->queue){
struct transfers *msg = sync_state->queue;
sync_state->queue = msg->next;
clear_transfer(msg);
free(msg);
}
sync_state->connection = NULL;
// eg connection timeout; drop all sync state
if (msp_get_connection_state(connection)& (MSP_STATE_ERROR|MSP_STATE_STOPPED))
sync_free_peer_state(sync_tree, peer);
}
time_ms_t next_action = msp_iterator_close(&iterator);
RESCHEDULE(alarm, next_action, next_action, next_action);
}
static void sync_peer_has (void * UNUSED(context), void *peer_context, const sync_key_t *key)
{
// request manifest? keep trying?
// remember & ignore expired manifest id's?
struct subscriber *peer = (struct subscriber *)peer_context;
DEBUGF(rhizome_sync_keys, "Neighbour %s has %s that we need",
alloca_tohex_sid_t(peer->sid),
alloca_sync_key(key));
// noop, just wait for the BAR to arrive.
}
static int sync_manifest_rank(rhizome_manifest *m, struct subscriber *peer, uint8_t sending, uint64_t written_offset)
{
uint8_t bias = REACHABLE_BIAS;
int rank = log2ll(m->filesize - written_offset);
if (m->has_recipient){
struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0);
// if the recipient is routable and this bundle is heading the right way;
// give the bundle's rank a boost
if (recipient
&& (recipient->reachable & (REACHABLE | REACHABLE_SELF))
&& (sending == (recipient->next_hop == peer ? 1 : 0))){
DEBUGF(rhizome_sync_keys, "Boosting rank for %s to deliver to recipient %s",
alloca_tohex(m->manifesthash.binary, sizeof(sync_key_t)),
alloca_tohex_sid_t(recipient->sid));
bias=0;
}
}
return rank + bias;
}
static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key)
{
// pre-emptively announce the manifest?
// use some form of stream socket to manage available bandwidth?
// build a default rank ordered list of manifests to announce?
struct subscriber *peer = (struct subscriber *)peer_context;
DEBUGF(rhizome_sync_keys, "Neighbour %s does not have %s that we do",
alloca_tohex_sid_t(peer->sid),
alloca_sync_key(key));
// queue BAR for transmission based on the manifest details.
// add a rank bias if there is no reachable recipient to prioritise messaging
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return;
enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(key->key, sizeof(sync_key_t), m);
switch(status){
case RHIZOME_BUNDLE_STATUS_SAME:
break;
case RHIZOME_BUNDLE_STATUS_NEW:
// TODO We don't have this bundle anymore!
default:
goto end;
}
int rank = sync_manifest_rank(m, peer, 1, 0);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
if (!sync_state)
goto end;
struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, key, STATE_SEND_BAR, rank);
if (!send_bar)
goto end;
rhizome_manifest_to_bar(m, &send_bar->bar);
end:
rhizome_manifest_free(m);
}
static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key)
{
// remove transfer state?
struct subscriber *peer = (struct subscriber *)peer_context;
DEBUGF(rhizome_sync_keys, "Neighbour %s has now received %s",
alloca_tohex_sid_t(peer->sid),
alloca_sync_key(key));
}
// this is probably fast enough. For huge stores, or slow storage media
// we might need to use an alarm to slowly build this tree
static void build_tree()
{
sync_tree = sync_alloc_state(NULL, sync_peer_has, sync_peer_does_not_have, sync_peer_now_has);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT manifest_hash FROM manifests "
"WHERE manifests.filehash IS NULL OR EXISTS(SELECT 1 FROM files WHERE files.id = manifests.filehash);");
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
const char *hash = (const char *) sqlite3_column_text(statement, 0);
rhizome_filehash_t manifest_hash;
if (str_to_rhizome_filehash_t(&manifest_hash, hash)==0){
sync_key_t key;
memcpy(key.key, manifest_hash.binary, sizeof(sync_key_t));
DEBUGF(rhizome_sync_keys, "Adding %s to tree",
alloca_sync_key(&key));
sync_add_key(sync_tree, &key, NULL);
}
}
sqlite3_finalize(statement);
}
DEFINE_ALARM(sync_send_keys);
void sync_send_keys(struct sched_ent *alarm)
{
if (!sync_tree)
build_tree();
uint8_t buff[MDP_MTU];
size_t len = sync_build_message(sync_tree, buff, sizeof buff);
if (len==0)
return;
if (IF_DEBUG(rhizome_sync_keys)){
DEBUG(rhizome_sync_keys,"Sending message");
//dump("Raw message", buff, len);
}
struct overlay_buffer *payload = ob_static(buff, sizeof(buff));
ob_limitsize(payload, len);
struct internal_mdp_header header;
bzero(&header, sizeof header);
header.crypt_flags = MDP_FLAG_NO_CRYPT | MDP_FLAG_NO_SIGN;
header.source = get_my_subscriber();
header.source_port = MDP_PORT_RHIZOME_SYNC_KEYS;
header.destination_port = MDP_PORT_RHIZOME_SYNC_KEYS;
header.qos = OQ_OPPORTUNISTIC;
header.ttl = 1;
overlay_send_frame(&header, payload);
time_ms_t now = gettime_ms();
if (sync_has_transmit_queued(sync_tree)){
DEBUG(rhizome_sync_keys,"Queueing next message for now");
RESCHEDULE(alarm, now, now, now);
}else{
DEBUG(rhizome_sync_keys,"Queueing next message for 5s");
RESCHEDULE(alarm, now+5000, now+30000, TIME_MS_NEVER_WILL);
}
}
static void process_transfer_message(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload)
{
while(ob_remaining(payload)){
ob_checkpoint(payload);
int msg_state = ob_get(payload);
if (msg_state<0)
return;
sync_key_t key;
if (ob_get_bytes(payload, key.key, sizeof key)<0)
return;
int rank=-1;
if (msg_state & STATE_REQ){
rank = ob_get(payload);
if (rank < 0)
return;
}
DEBUGF(rhizome_sync_keys, "Processing sync message %s %s %d",
get_state_name(msg_state), alloca_sync_key(&key), rank);
switch(msg_state){
case STATE_SEND_BAR:{
rhizome_bar_t bar;
if (ob_get_bytes(payload, bar.binary, sizeof(rhizome_bar_t))<0)
return;
if (!config.rhizome.fetch)
break;
if (!rhizome_is_bar_interesting(&bar)){
DEBUGF(rhizome_sync_keys, "Ignoring BAR for %s, (Uninteresting)",
alloca_sync_key(&key));
break;
}
// send a request for the manifest
rank = rhizome_bar_log_size(&bar);
struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_MANIFEST, rank);
transfer->req_len = DUMMY_MANIFEST_SIZE;
break;
}
case STATE_REQ_MANIFEST:{
// queue the transmission of the manifest
find_and_update_transfer(peer, sync_state, &key, STATE_SEND_MANIFEST, rank);
break;
}
case STATE_SEND_MANIFEST:{
// process the incoming manifest
size_t len = ob_remaining(payload);
uint8_t *data = ob_get_bytes_ptr(payload, len);
if (!config.rhizome.fetch)
break;
struct rhizome_manifest_summary summ;
if (!rhizome_manifest_inspect((char *)data, len, &summ)){
WHYF("Ignoring manifest for %s, (Malformed)",
alloca_sync_key(&key));
break;
}
// The manifest looks potentially interesting, so now do a full parse and validation.
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
// don't consume the payload
ob_rewind(payload);
return;
}
memcpy(m->manifestdata, data, len);
m->manifest_all_bytes = len;
if ( rhizome_manifest_parse(m) == -1
|| !rhizome_manifest_validate(m)
) {
WHYF("Ignoring manifest for %s, (Malformed)",
alloca_sync_key(&key));
rhizome_manifest_free(m);
break;
}
if (!rhizome_is_manifest_interesting(m)){
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (Uninteresting)",
alloca_sync_key(&key));
rhizome_manifest_free(m);
break;
}
// start writing the payload
enum rhizome_payload_status status;
struct rhizome_write *write = emalloc_zero(sizeof(struct rhizome_write));
if (m->filesize==0){
status = RHIZOME_PAYLOAD_STATUS_STORED;
}else{
status = rhizome_open_write(write, &m->filehash, m->filesize);
}
if (status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_status = rhizome_add_manifest_to_store(m, NULL);
DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status));
rhizome_manifest_free(m);
free(write);
break;
}else if (status!=RHIZOME_PAYLOAD_STATUS_NEW){
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
rhizome_manifest_free(m);
free(write);
break;
}
if (m->is_journal){
// if we're fetching a journal bundle, copy any bytes we have of a previous version
// and therefore work out what range of bytes we still need
rhizome_manifest *previous = rhizome_new_manifest();
if (rhizome_retrieve_manifest(&m->cryptoSignPublic, previous)==RHIZOME_BUNDLE_STATUS_SAME &&
previous->is_journal &&
previous->tail <= m->tail &&
previous->filesize + previous->tail > m->tail
){
uint64_t start = m->tail - previous->tail;
uint64_t length = previous->filesize - start;
// required by tests;
DEBUGF(rhizome_sync_keys, "%s Copying %"PRId64" bytes from previous journal", alloca_sync_key(&key), length);
rhizome_journal_pipe(write, &previous->filehash, start, length);
}
rhizome_manifest_free(previous);
if (write->file_offset >= m->filesize){
// no new content in the new version, we can import now
enum rhizome_payload_status status = rhizome_finish_write(write);
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status);
free(write);
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(m, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state));
}
rhizome_manifest_free(m);
break;
}
}
// TODO improve rank algo here;
// Note that we still need to deal with this manifest, we don't want to run out of RAM
rank = sync_manifest_rank(m, peer, 0, write->file_offset);
struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_PAYLOAD, rank);
transfer->manifest = m;
transfer->req_len = m->filesize - write->file_offset;
transfer->write = write;
break;
}
case STATE_REQ_PAYLOAD:{
// open the payload for reading
uint64_t offset = ob_get_packed_ui64(payload);
uint64_t length = ob_get_packed_ui64(payload);
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
ob_rewind(payload);
return;
}
enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(key.key, sizeof(sync_key_t), m);
if (status == RHIZOME_BUNDLE_STATUS_NEW){
// TODO We don't have this bundle anymore!
}
if (status != RHIZOME_BUNDLE_STATUS_SAME){
rhizome_manifest_free(m);
break;
}
struct rhizome_read *read = emalloc_zero(sizeof (struct rhizome_read));
if (rhizome_open_read(read, &m->filehash) != RHIZOME_PAYLOAD_STATUS_STORED){
free(read);
rhizome_manifest_free(m);
break;
}
rhizome_manifest_free(m);
struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_SEND_PAYLOAD, rank);
transfer->read = read;
transfer->req_len = length;
read->offset = offset;
break;
}
case STATE_SEND_PAYLOAD:{
size_t len = ob_remaining(payload);
uint8_t *buff = ob_get_bytes_ptr(payload, len);
struct transfers **ptr = find_and_update_transfer(peer, sync_state, &key, STATE_RECV_PAYLOAD, -1);
if (!ptr){
WHYF("Ignoring message for %s, no transfer in progress!", alloca_sync_key(&key));
break;
}
struct transfers *transfer = *ptr;
transfer->req_len -= len;
uint8_t all_done = 0;
if (rhizome_write_buffer(transfer->write, buff, len)==-1){
WHYF("Write failed for %s!", alloca_sync_key(&key));
all_done=1;
}else{
DEBUGF(rhizome_sync_keys, "Wrote to %s %zu, now %zu of %zu",
alloca_sync_key(&key), len, transfer->write->file_offset, transfer->write->file_length);
if (transfer->write->file_offset >= transfer->write->file_length){
enum rhizome_payload_status status = rhizome_finish_write(transfer->write);
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status);
free(transfer->write);
transfer->write = NULL;
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state));
}
all_done=1;
}
}
if (all_done){
if (transfer->manifest)
rhizome_manifest_free(transfer->manifest);
transfer->manifest=NULL;
clear_transfer(transfer);
*ptr = transfer->next;
free(transfer);
}
break;
}
default:
WHYF("Unknown message type %x", msg_state);
}
}
}
DEFINE_BINDING(MDP_PORT_RHIZOME_SYNC_KEYS, sync_keys_recv);
static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
if (header->source->reachable == REACHABLE_SELF || !is_rhizome_advertise_enabled())
return 0;
if (!sync_tree)
build_tree();
header->source->sync_version = 1;
if (!header->destination){
if (IF_DEBUG(rhizome_sync_keys)){
DEBUGF(rhizome_sync_keys,"Processing message from %s", alloca_tohex_sid_t(header->source->sid));
//dump("Raw message", ob_current_ptr(payload), ob_remaining(payload));
}
sync_recv_message(sync_tree, header->source, ob_current_ptr(payload), ob_remaining(payload));
if (sync_has_transmit_queued(sync_tree)){
struct sched_ent *alarm=&ALARM_STRUCT(sync_send_keys);
time_ms_t next = gettime_ms() + 5;
if (alarm->alarm > next || !is_scheduled(alarm)){
DEBUG(rhizome_sync_keys,"Queueing next message for 5ms");
RESCHEDULE(alarm, next, next, next);
}
}
if (IF_DEBUG(rhizome_sync_keys)){
struct sched_ent *alarm=&ALARM_STRUCT(sync_keys_status);
if (alarm->alarm == TIME_MS_NEVER_WILL){
time_ms_t next = gettime_ms() + 1000;
RESCHEDULE(alarm, next, next, next);
}
}
}else{
struct msp_server_state *connection_state = msp_find_and_process(&sync_connections, header, payload);
if (connection_state){
struct rhizome_sync_keys *sync_state = get_peer_sync_state(header->source);
sync_state->connection = connection_state;
while(1){
struct msp_packet *packet = msp_recv_next(connection_state);
if (!packet)
break;
struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet);
if (recv_payload)
process_transfer_message(header->source, sync_state, recv_payload);
msp_consumed(connection_state, packet, recv_payload);
}
sync_send_peer(sync_state);
time_ms_t next_action = msp_next_action(connection_state);
struct sched_ent *alarm=&ALARM_STRUCT(sync_send);
if (alarm->alarm > next_action || !is_scheduled(alarm))
RESCHEDULE(alarm, next_action, next_action, next_action);
}
}
return 0;
}
static void sync_neighbour_changed(struct subscriber *neighbour, uint8_t found, unsigned count)
{
struct sched_ent *alarm = &ALARM_STRUCT(sync_send_keys);
if (count>0 && is_rhizome_advertise_enabled()){
time_ms_t now = gettime_ms();
if (alarm->alarm == TIME_MS_NEVER_WILL){
DEBUG(rhizome_sync_keys,"Queueing next message now");
RESCHEDULE(alarm, now, now, TIME_MS_NEVER_WILL);
}
}else{
DEBUG(rhizome_sync_keys,"Stop queueing messages");
RESCHEDULE(alarm, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
}
if (!found){
struct rhizome_sync_keys *sync_state = get_peer_sync_state(neighbour);
// if there's no connection, there shouldn't be any items in the transfer queue
if (sync_state->connection)
msp_stop_stream(sync_state->connection);
}
}
DEFINE_TRIGGER(nbr_change, sync_neighbour_changed);
static void sync_bundle_add(rhizome_manifest *m)
{
if (!sync_tree){
DEBUG(rhizome_sync_keys, "Ignoring added manifest, tree not built yet");
return;
}
sync_key_t key;
memcpy(key.key, m->manifesthash.binary, sizeof(sync_key_t));
DEBUGF(rhizome_sync_keys, "Adding %s to tree",
alloca_sync_key(&key));
sync_add_key(sync_tree, &key, NULL);
if (link_has_neighbours()){
struct sched_ent *alarm = &ALARM_STRUCT(sync_send_keys);
time_ms_t next = gettime_ms()+5;
if (alarm->alarm > next || !is_scheduled(alarm)){
DEBUG(rhizome_sync_keys,"Queueing next message for 5ms");
RESCHEDULE(alarm, next, next, next);
}
}
}
DEFINE_TRIGGER(bundle_add, sync_bundle_add);