From 2b991a09174422338b5702e1ffa756651ae0760e Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 10 May 2016 13:06:07 +0930 Subject: [PATCH] Lower mdp qos priority of new rhizome sync, bias bundles with no reachable receiver --- msp_server.c | 27 +++++++---- msp_server.h | 14 ++++-- rhizome_sync_keys.c | 107 ++++++++++++++++++++++++++++++-------------- 3 files changed, 103 insertions(+), 45 deletions(-) diff --git a/msp_server.c b/msp_server.c index 8c201db0..30c172af 100644 --- a/msp_server.c +++ b/msp_server.c @@ -16,12 +16,15 @@ struct msp_server_state{ mdp_port_t local_port; struct subscriber *remote_sid; mdp_port_t remote_port; + uint8_t ttl; + uint8_t qos; }; static struct msp_server_state *msp_create( struct msp_server_state **root, struct subscriber *remote_sid, mdp_port_t remote_port, - struct subscriber *local_sid, mdp_port_t local_port) + struct subscriber *local_sid, mdp_port_t local_port, + uint8_t ttl, uint8_t qos) { struct msp_server_state *state = (struct msp_server_state *)emalloc_zero(sizeof(struct msp_server_state)); msp_stream_init(&state->stream); @@ -29,6 +32,8 @@ static struct msp_server_state *msp_create( state->remote_port = remote_port; state->local_sid = local_sid; state->local_port = local_port; + state->ttl = ttl; + state->qos = qos; state->_next = (*root); (*root) = state; return state; @@ -37,7 +42,8 @@ static struct msp_server_state *msp_create( struct msp_server_state * msp_find_or_connect( struct msp_server_state **root, struct subscriber *remote_sid, mdp_port_t remote_port, - struct subscriber *local_sid, mdp_port_t local_port) + struct subscriber *local_sid, mdp_port_t local_port, + uint8_t qos) { struct msp_server_state *state = (*root); @@ -48,7 +54,7 @@ struct msp_server_state * msp_find_or_connect( } if (!state){ - state = msp_create(root, remote_sid, remote_port, local_sid, local_port); + state = msp_create(root, remote_sid, remote_port, local_sid, local_port, PAYLOAD_TTL_DEFAULT, qos); state->stream.state|=MSP_STATE_DATAOUT; // make sure we send a FIRST packet soon state->stream.next_action = state->stream.next_ack = gettime_ms()+10; @@ -112,7 +118,8 @@ static void send_frame(struct msp_server_state *state, struct overlay_buffer *pa response_header.source_port = state->local_port; response_header.destination = state->remote_sid; response_header.destination_port = state->remote_port; - + response_header.qos = state->qos; + response_header.ttl = state->ttl; overlay_send_frame(&response_header, payload); ob_free(payload); } @@ -230,8 +237,8 @@ struct msp_server_state * msp_find_and_process(struct msp_server_state **root, c int flags = ob_peek(payload); if (!state && (flags & FLAG_FIRST)) - state = msp_create(root, header->source, header->source_port, header->destination, header->destination_port); - + state = msp_create(root, header->source, header->source_port, header->destination, header->destination_port, PAYLOAD_TTL_DEFAULT, header->qos); + if (!state){ if (!(flags & FLAG_STOP)){ struct internal_mdp_header response_header; @@ -282,15 +289,19 @@ time_ms_t msp_last_packet(struct msp_server_state *state) return state->stream.rx.last_packet > state->stream.tx.last_packet ? state->stream.rx.last_packet : state->stream.tx.last_packet; } +unsigned msp_queued_packet_count(struct msp_server_state *state) +{ + return state->stream.rx.packet_count + state->stream.tx.packet_count; +} struct subscriber * msp_remote_peer(struct msp_server_state *state) { return state->remote_sid; } -int msp_get_error(struct msp_server_state *state) +msp_state_t msp_get_connection_state(struct msp_server_state *state) { - return (state->stream.state & (MSP_STATE_ERROR|MSP_STATE_STOPPED)) ? 1 : 0; + return state->stream.state; } int msp_can_send(struct msp_server_state *state) diff --git a/msp_server.h b/msp_server.h index a9d529cc..c2f04525 100644 --- a/msp_server.h +++ b/msp_server.h @@ -1,6 +1,7 @@ #ifndef __SERVAL_DNA__MSP_SERVER_H #define __SERVAL_DNA__MSP_SERVER_H +typedef uint16_t msp_state_t; struct msp_server_state; struct msp_packet; @@ -12,9 +13,15 @@ struct msp_iterator{ struct msp_server_state * msp_find_or_connect( struct msp_server_state **root, struct subscriber *remote_sid, mdp_port_t remote_port, - struct subscriber *local_sid, mdp_port_t local_port); + struct subscriber *local_sid, mdp_port_t local_port, + uint8_t qos +); -struct msp_server_state * msp_find_and_process(struct msp_server_state **root, const struct internal_mdp_header *header, struct overlay_buffer *payload); +struct msp_server_state * msp_find_and_process( + struct msp_server_state **root, + const struct internal_mdp_header *header, + struct overlay_buffer *payload +); struct msp_packet *msp_recv_next(struct msp_server_state *state); struct overlay_buffer *msp_unpack(struct msp_server_state *state, struct msp_packet *packet); @@ -24,7 +31,8 @@ time_ms_t msp_next_action(struct msp_server_state *state); time_ms_t msp_last_packet(struct msp_server_state *state); struct subscriber * msp_remote_peer(struct msp_server_state *state); int msp_can_send(struct msp_server_state *state); -int msp_get_error(struct msp_server_state *state); +msp_state_t msp_get_connection_state(struct msp_server_state *state); +unsigned msp_queued_packet_count(struct msp_server_state *state); int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator); struct msp_server_state * msp_process_next(struct msp_iterator *iterator); diff --git a/rhizome_sync_keys.c b/rhizome_sync_keys.c index e4cab620..e675f2fd 100644 --- a/rhizome_sync_keys.c +++ b/rhizome_sync_keys.c @@ -33,6 +33,8 @@ // approx size of a signed manifest #define DUMMY_MANIFEST_SIZE 256 +#define REACHABLE_BIAS 2 + struct transfers{ struct transfers *next; sync_key_t key; @@ -56,6 +58,7 @@ struct rhizome_sync_keys{ struct sync_state *sync_tree=NULL; struct msp_server_state *sync_connections=NULL; +DEFINE_ALARM(sync_send); static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){ if (!peer->sync_keys_state) @@ -100,8 +103,25 @@ static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr) } #define clear_transfer(P) _clear_transfer(__WHENCE__,P) -static struct transfers **find_and_update_transfer(struct rhizome_sync_keys *keys_state, const sync_key_t *key, uint8_t state, int rank) +static struct transfers **find_and_update_transfer(struct subscriber *peer, struct rhizome_sync_keys *keys_state, const sync_key_t *key, uint8_t state, int rank) { + if (rank>0xFF) + rank = 0xFF; + if (state){ + if (!keys_state->connection) + keys_state->connection = msp_find_or_connect(&sync_connections, + peer, MDP_PORT_RHIZOME_SYNC_KEYS, + my_subscriber, MDP_PORT_RHIZOME_SYNC_KEYS, + OQ_OPPORTUNISTIC); + + if (msp_can_send(keys_state->connection)){ + time_ms_t next_action = gettime_ms(); + struct sched_ent *alarm=&ALARM_STRUCT(sync_send); + if (next_action < alarm->alarm || !is_scheduled(alarm)) + RESCHEDULE(alarm, next_action, next_action, next_action); + } + } + struct transfers **ptr = &keys_state->queue; while(*ptr){ if (memcmp(key, &(*ptr)->key, sizeof(sync_key_t))==0){ @@ -135,7 +155,7 @@ static void sync_key_diffs(void *UNUSED(context), void *peer_context, const sync { struct subscriber *peer = (struct subscriber *)peer_context; struct rhizome_sync_keys *sync_keys = get_peer_sync_state(peer); - struct transfers **transfer = find_and_update_transfer(sync_keys, key, 0, -1); + struct transfers **transfer = find_and_update_transfer(peer, sync_keys, key, 0, -1); DEBUGF(rhizome_sync_keys, "Peer %s %s %s %s", alloca_tohex_sid_t(peer->sid), @@ -330,13 +350,14 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state) ob_free(payload); } - if (now - msp_last_packet(sync_state->connection) > 5000){ + if (msp_queued_packet_count(sync_state->connection)==0 && + (msp_get_connection_state(sync_state->connection) & MSP_STATE_RECEIVED_PACKET) && + now - msp_last_packet(sync_state->connection) > 5000){ DEBUGF(rhizome_sync_keys, "Closing idle connection"); msp_shutdown_stream(sync_state->connection); } } -DEFINE_ALARM(sync_send); void sync_send(struct sched_ent *alarm) { struct msp_iterator iterator; @@ -373,7 +394,7 @@ void sync_send(struct sched_ent *alarm) sync_state->connection = NULL; // eg connection timeout; drop all sync state - if (msp_get_error(connection)) + if (msp_get_connection_state(connection)& (MSP_STATE_ERROR|MSP_STATE_STOPPED)) sync_free_peer_state(sync_tree, peer); } @@ -403,27 +424,37 @@ static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context, alloca_tohex_sid_t(peer->sid), alloca_sync_key(key)); - // queue BAR for transmission - rhizome_bar_t bar; - if (rhizome_retrieve_bar_by_hash_prefix(key->key, sizeof(*key), &bar)==RHIZOME_BUNDLE_STATUS_SAME){ - uint8_t log_size = rhizome_bar_log_size(&bar); - struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); - - struct transfers *send_bar = *find_and_update_transfer(sync_state, key, STATE_SEND_BAR, log_size); - send_bar->bar = bar; - - if (!sync_state->connection) - sync_state->connection = msp_find_or_connect(&sync_connections, - peer, MDP_PORT_RHIZOME_SYNC_KEYS, - my_subscriber, MDP_PORT_RHIZOME_SYNC_KEYS); - - time_ms_t next_action = msp_next_action(sync_state->connection); - struct sched_ent *alarm=&ALARM_STRUCT(sync_send); - if (next_action < alarm->alarm || !is_scheduled(alarm)) - RESCHEDULE(alarm, next_action, next_action, next_action); + // queue BAR for transmission based on the manifest details. + // add a rank bias if there is no reachable recipient to prioritise messaging + + rhizome_manifest *m = rhizome_new_manifest(); + if (!m) + return; + + if (rhizome_retrieve_manifest_by_hash_prefix(key->key, sizeof(sync_key_t), m)!=RHIZOME_BUNDLE_STATUS_SAME) + goto end; + + uint8_t bias = REACHABLE_BIAS; + int rank = log2ll(m->filesize); + + if (m->has_recipient){ + struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0); + if (recipient && (recipient->reachable & (REACHABLE | REACHABLE_SELF))) + bias=0; } - - return; + + struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); + if (!sync_state) + goto end; + + struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, key, STATE_SEND_BAR, rank + bias); + if (!send_bar) + goto end; + + rhizome_manifest_to_bar(m, &send_bar->bar); + +end: + rhizome_manifest_free(m); } static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key) @@ -499,7 +530,7 @@ void sync_send_keys(struct sched_ent *alarm) } } -static void process_transfer_message(struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload) +static void process_transfer_message(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload) { while(ob_remaining(payload)){ ob_checkpoint(payload); @@ -535,14 +566,14 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc // send a request for the manifest rank = rhizome_bar_log_size(&bar); - struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_MANIFEST, rank); + struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_MANIFEST, rank); transfer->req_len = DUMMY_MANIFEST_SIZE; break; } case STATE_REQ_MANIFEST:{ // queue the transmission of the manifest - find_and_update_transfer(sync_state, &key, STATE_SEND_MANIFEST, rank); + find_and_update_transfer(peer, sync_state, &key, STATE_SEND_MANIFEST, rank); break; } @@ -645,11 +676,19 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc break; } } - + // TODO improve rank algo here; + // Note that we still need to deal with this manifest, we don't want to run out of RAM rank = log2ll(m->filesize - write->file_offset); - - struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_PAYLOAD, rank); + uint8_t bias = REACHABLE_BIAS; + + if (m->has_recipient){ + struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0); + if (recipient && (recipient->reachable & (REACHABLE | REACHABLE_SELF))) + bias=0; + } + + struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_PAYLOAD, rank + bias); transfer->manifest = m; transfer->req_len = m->filesize - write->file_offset; transfer->write = write; @@ -679,7 +718,7 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc } rhizome_manifest_free(m); - struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_SEND_PAYLOAD, rank); + struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_SEND_PAYLOAD, rank); transfer->read = read; transfer->req_len = length; read->offset = offset; @@ -689,7 +728,7 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc size_t len = ob_remaining(payload); uint8_t *buff = ob_get_bytes_ptr(payload, len); - struct transfers **ptr = find_and_update_transfer(sync_state, &key, STATE_RECV_PAYLOAD, -1); + struct transfers **ptr = find_and_update_transfer(peer, sync_state, &key, STATE_RECV_PAYLOAD, -1); if (!ptr){ WHYF("Ignoring message for %s, no transfer in progress!", alloca_sync_key(&key)); break; @@ -779,7 +818,7 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf break; struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet); if (recv_payload) - process_transfer_message(sync_state, recv_payload); + process_transfer_message(header->source, sync_state, recv_payload); msp_consumed(connection_state, packet, recv_payload); }