Lower mdp qos priority of new rhizome sync, bias bundles with no reachable receiver

This commit is contained in:
Jeremy Lakeman 2016-05-10 13:06:07 +09:30
parent 32dd9f7b15
commit 2b991a0917
3 changed files with 103 additions and 45 deletions

View File

@ -16,12 +16,15 @@ struct msp_server_state{
mdp_port_t local_port; mdp_port_t local_port;
struct subscriber *remote_sid; struct subscriber *remote_sid;
mdp_port_t remote_port; mdp_port_t remote_port;
uint8_t ttl;
uint8_t qos;
}; };
static struct msp_server_state *msp_create( static struct msp_server_state *msp_create(
struct msp_server_state **root, struct msp_server_state **root,
struct subscriber *remote_sid, mdp_port_t remote_port, struct subscriber *remote_sid, mdp_port_t remote_port,
struct subscriber *local_sid, mdp_port_t local_port) struct subscriber *local_sid, mdp_port_t local_port,
uint8_t ttl, uint8_t qos)
{ {
struct msp_server_state *state = (struct msp_server_state *)emalloc_zero(sizeof(struct msp_server_state)); struct msp_server_state *state = (struct msp_server_state *)emalloc_zero(sizeof(struct msp_server_state));
msp_stream_init(&state->stream); msp_stream_init(&state->stream);
@ -29,6 +32,8 @@ static struct msp_server_state *msp_create(
state->remote_port = remote_port; state->remote_port = remote_port;
state->local_sid = local_sid; state->local_sid = local_sid;
state->local_port = local_port; state->local_port = local_port;
state->ttl = ttl;
state->qos = qos;
state->_next = (*root); state->_next = (*root);
(*root) = state; (*root) = state;
return state; return state;
@ -37,7 +42,8 @@ static struct msp_server_state *msp_create(
struct msp_server_state * msp_find_or_connect( struct msp_server_state * msp_find_or_connect(
struct msp_server_state **root, struct msp_server_state **root,
struct subscriber *remote_sid, mdp_port_t remote_port, struct subscriber *remote_sid, mdp_port_t remote_port,
struct subscriber *local_sid, mdp_port_t local_port) struct subscriber *local_sid, mdp_port_t local_port,
uint8_t qos)
{ {
struct msp_server_state *state = (*root); struct msp_server_state *state = (*root);
@ -48,7 +54,7 @@ struct msp_server_state * msp_find_or_connect(
} }
if (!state){ if (!state){
state = msp_create(root, remote_sid, remote_port, local_sid, local_port); state = msp_create(root, remote_sid, remote_port, local_sid, local_port, PAYLOAD_TTL_DEFAULT, qos);
state->stream.state|=MSP_STATE_DATAOUT; state->stream.state|=MSP_STATE_DATAOUT;
// make sure we send a FIRST packet soon // make sure we send a FIRST packet soon
state->stream.next_action = state->stream.next_ack = gettime_ms()+10; state->stream.next_action = state->stream.next_ack = gettime_ms()+10;
@ -112,7 +118,8 @@ static void send_frame(struct msp_server_state *state, struct overlay_buffer *pa
response_header.source_port = state->local_port; response_header.source_port = state->local_port;
response_header.destination = state->remote_sid; response_header.destination = state->remote_sid;
response_header.destination_port = state->remote_port; response_header.destination_port = state->remote_port;
response_header.qos = state->qos;
response_header.ttl = state->ttl;
overlay_send_frame(&response_header, payload); overlay_send_frame(&response_header, payload);
ob_free(payload); ob_free(payload);
} }
@ -230,8 +237,8 @@ struct msp_server_state * msp_find_and_process(struct msp_server_state **root, c
int flags = ob_peek(payload); int flags = ob_peek(payload);
if (!state && (flags & FLAG_FIRST)) if (!state && (flags & FLAG_FIRST))
state = msp_create(root, header->source, header->source_port, header->destination, header->destination_port); state = msp_create(root, header->source, header->source_port, header->destination, header->destination_port, PAYLOAD_TTL_DEFAULT, header->qos);
if (!state){ if (!state){
if (!(flags & FLAG_STOP)){ if (!(flags & FLAG_STOP)){
struct internal_mdp_header response_header; struct internal_mdp_header response_header;
@ -282,15 +289,19 @@ time_ms_t msp_last_packet(struct msp_server_state *state)
return state->stream.rx.last_packet > state->stream.tx.last_packet ? state->stream.rx.last_packet : state->stream.tx.last_packet; return state->stream.rx.last_packet > state->stream.tx.last_packet ? state->stream.rx.last_packet : state->stream.tx.last_packet;
} }
unsigned msp_queued_packet_count(struct msp_server_state *state)
{
return state->stream.rx.packet_count + state->stream.tx.packet_count;
}
struct subscriber * msp_remote_peer(struct msp_server_state *state) struct subscriber * msp_remote_peer(struct msp_server_state *state)
{ {
return state->remote_sid; return state->remote_sid;
} }
int msp_get_error(struct msp_server_state *state) msp_state_t msp_get_connection_state(struct msp_server_state *state)
{ {
return (state->stream.state & (MSP_STATE_ERROR|MSP_STATE_STOPPED)) ? 1 : 0; return state->stream.state;
} }
int msp_can_send(struct msp_server_state *state) int msp_can_send(struct msp_server_state *state)

View File

@ -1,6 +1,7 @@
#ifndef __SERVAL_DNA__MSP_SERVER_H #ifndef __SERVAL_DNA__MSP_SERVER_H
#define __SERVAL_DNA__MSP_SERVER_H #define __SERVAL_DNA__MSP_SERVER_H
typedef uint16_t msp_state_t;
struct msp_server_state; struct msp_server_state;
struct msp_packet; struct msp_packet;
@ -12,9 +13,15 @@ struct msp_iterator{
struct msp_server_state * msp_find_or_connect( struct msp_server_state * msp_find_or_connect(
struct msp_server_state **root, struct msp_server_state **root,
struct subscriber *remote_sid, mdp_port_t remote_port, struct subscriber *remote_sid, mdp_port_t remote_port,
struct subscriber *local_sid, mdp_port_t local_port); struct subscriber *local_sid, mdp_port_t local_port,
uint8_t qos
);
struct msp_server_state * msp_find_and_process(struct msp_server_state **root, const struct internal_mdp_header *header, struct overlay_buffer *payload); struct msp_server_state * msp_find_and_process(
struct msp_server_state **root,
const struct internal_mdp_header *header,
struct overlay_buffer *payload
);
struct msp_packet *msp_recv_next(struct msp_server_state *state); struct msp_packet *msp_recv_next(struct msp_server_state *state);
struct overlay_buffer *msp_unpack(struct msp_server_state *state, struct msp_packet *packet); struct overlay_buffer *msp_unpack(struct msp_server_state *state, struct msp_packet *packet);
@ -24,7 +31,8 @@ time_ms_t msp_next_action(struct msp_server_state *state);
time_ms_t msp_last_packet(struct msp_server_state *state); time_ms_t msp_last_packet(struct msp_server_state *state);
struct subscriber * msp_remote_peer(struct msp_server_state *state); struct subscriber * msp_remote_peer(struct msp_server_state *state);
int msp_can_send(struct msp_server_state *state); int msp_can_send(struct msp_server_state *state);
int msp_get_error(struct msp_server_state *state); msp_state_t msp_get_connection_state(struct msp_server_state *state);
unsigned msp_queued_packet_count(struct msp_server_state *state);
int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator); int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator);
struct msp_server_state * msp_process_next(struct msp_iterator *iterator); struct msp_server_state * msp_process_next(struct msp_iterator *iterator);

View File

@ -33,6 +33,8 @@
// approx size of a signed manifest // approx size of a signed manifest
#define DUMMY_MANIFEST_SIZE 256 #define DUMMY_MANIFEST_SIZE 256
#define REACHABLE_BIAS 2
struct transfers{ struct transfers{
struct transfers *next; struct transfers *next;
sync_key_t key; sync_key_t key;
@ -56,6 +58,7 @@ struct rhizome_sync_keys{
struct sync_state *sync_tree=NULL; struct sync_state *sync_tree=NULL;
struct msp_server_state *sync_connections=NULL; struct msp_server_state *sync_connections=NULL;
DEFINE_ALARM(sync_send);
static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){ static struct rhizome_sync_keys *get_peer_sync_state(struct subscriber *peer){
if (!peer->sync_keys_state) if (!peer->sync_keys_state)
@ -100,8 +103,25 @@ static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr)
} }
#define clear_transfer(P) _clear_transfer(__WHENCE__,P) #define clear_transfer(P) _clear_transfer(__WHENCE__,P)
static struct transfers **find_and_update_transfer(struct rhizome_sync_keys *keys_state, const sync_key_t *key, uint8_t state, int rank) static struct transfers **find_and_update_transfer(struct subscriber *peer, struct rhizome_sync_keys *keys_state, const sync_key_t *key, uint8_t state, int rank)
{ {
if (rank>0xFF)
rank = 0xFF;
if (state){
if (!keys_state->connection)
keys_state->connection = msp_find_or_connect(&sync_connections,
peer, MDP_PORT_RHIZOME_SYNC_KEYS,
my_subscriber, MDP_PORT_RHIZOME_SYNC_KEYS,
OQ_OPPORTUNISTIC);
if (msp_can_send(keys_state->connection)){
time_ms_t next_action = gettime_ms();
struct sched_ent *alarm=&ALARM_STRUCT(sync_send);
if (next_action < alarm->alarm || !is_scheduled(alarm))
RESCHEDULE(alarm, next_action, next_action, next_action);
}
}
struct transfers **ptr = &keys_state->queue; struct transfers **ptr = &keys_state->queue;
while(*ptr){ while(*ptr){
if (memcmp(key, &(*ptr)->key, sizeof(sync_key_t))==0){ if (memcmp(key, &(*ptr)->key, sizeof(sync_key_t))==0){
@ -135,7 +155,7 @@ static void sync_key_diffs(void *UNUSED(context), void *peer_context, const sync
{ {
struct subscriber *peer = (struct subscriber *)peer_context; struct subscriber *peer = (struct subscriber *)peer_context;
struct rhizome_sync_keys *sync_keys = get_peer_sync_state(peer); struct rhizome_sync_keys *sync_keys = get_peer_sync_state(peer);
struct transfers **transfer = find_and_update_transfer(sync_keys, key, 0, -1); struct transfers **transfer = find_and_update_transfer(peer, sync_keys, key, 0, -1);
DEBUGF(rhizome_sync_keys, "Peer %s %s %s %s", DEBUGF(rhizome_sync_keys, "Peer %s %s %s %s",
alloca_tohex_sid_t(peer->sid), alloca_tohex_sid_t(peer->sid),
@ -330,13 +350,14 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state)
ob_free(payload); ob_free(payload);
} }
if (now - msp_last_packet(sync_state->connection) > 5000){ if (msp_queued_packet_count(sync_state->connection)==0 &&
(msp_get_connection_state(sync_state->connection) & MSP_STATE_RECEIVED_PACKET) &&
now - msp_last_packet(sync_state->connection) > 5000){
DEBUGF(rhizome_sync_keys, "Closing idle connection"); DEBUGF(rhizome_sync_keys, "Closing idle connection");
msp_shutdown_stream(sync_state->connection); msp_shutdown_stream(sync_state->connection);
} }
} }
DEFINE_ALARM(sync_send);
void sync_send(struct sched_ent *alarm) void sync_send(struct sched_ent *alarm)
{ {
struct msp_iterator iterator; struct msp_iterator iterator;
@ -373,7 +394,7 @@ void sync_send(struct sched_ent *alarm)
sync_state->connection = NULL; sync_state->connection = NULL;
// eg connection timeout; drop all sync state // eg connection timeout; drop all sync state
if (msp_get_error(connection)) if (msp_get_connection_state(connection)& (MSP_STATE_ERROR|MSP_STATE_STOPPED))
sync_free_peer_state(sync_tree, peer); sync_free_peer_state(sync_tree, peer);
} }
@ -403,27 +424,37 @@ static void sync_peer_does_not_have (void * UNUSED(context), void *peer_context,
alloca_tohex_sid_t(peer->sid), alloca_tohex_sid_t(peer->sid),
alloca_sync_key(key)); alloca_sync_key(key));
// queue BAR for transmission // queue BAR for transmission based on the manifest details.
rhizome_bar_t bar; // add a rank bias if there is no reachable recipient to prioritise messaging
if (rhizome_retrieve_bar_by_hash_prefix(key->key, sizeof(*key), &bar)==RHIZOME_BUNDLE_STATUS_SAME){
uint8_t log_size = rhizome_bar_log_size(&bar); rhizome_manifest *m = rhizome_new_manifest();
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer); if (!m)
return;
struct transfers *send_bar = *find_and_update_transfer(sync_state, key, STATE_SEND_BAR, log_size);
send_bar->bar = bar; if (rhizome_retrieve_manifest_by_hash_prefix(key->key, sizeof(sync_key_t), m)!=RHIZOME_BUNDLE_STATUS_SAME)
goto end;
if (!sync_state->connection)
sync_state->connection = msp_find_or_connect(&sync_connections, uint8_t bias = REACHABLE_BIAS;
peer, MDP_PORT_RHIZOME_SYNC_KEYS, int rank = log2ll(m->filesize);
my_subscriber, MDP_PORT_RHIZOME_SYNC_KEYS);
if (m->has_recipient){
time_ms_t next_action = msp_next_action(sync_state->connection); struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0);
struct sched_ent *alarm=&ALARM_STRUCT(sync_send); if (recipient && (recipient->reachable & (REACHABLE | REACHABLE_SELF)))
if (next_action < alarm->alarm || !is_scheduled(alarm)) bias=0;
RESCHEDULE(alarm, next_action, next_action, next_action);
} }
return; struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
if (!sync_state)
goto end;
struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, key, STATE_SEND_BAR, rank + bias);
if (!send_bar)
goto end;
rhizome_manifest_to_bar(m, &send_bar->bar);
end:
rhizome_manifest_free(m);
} }
static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key) static void sync_peer_now_has (void * UNUSED(context), void *peer_context, void * UNUSED(key_context), const sync_key_t *key)
@ -499,7 +530,7 @@ void sync_send_keys(struct sched_ent *alarm)
} }
} }
static void process_transfer_message(struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload) static void process_transfer_message(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload)
{ {
while(ob_remaining(payload)){ while(ob_remaining(payload)){
ob_checkpoint(payload); ob_checkpoint(payload);
@ -535,14 +566,14 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc
// send a request for the manifest // send a request for the manifest
rank = rhizome_bar_log_size(&bar); rank = rhizome_bar_log_size(&bar);
struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_MANIFEST, rank); struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_MANIFEST, rank);
transfer->req_len = DUMMY_MANIFEST_SIZE; transfer->req_len = DUMMY_MANIFEST_SIZE;
break; break;
} }
case STATE_REQ_MANIFEST:{ case STATE_REQ_MANIFEST:{
// queue the transmission of the manifest // queue the transmission of the manifest
find_and_update_transfer(sync_state, &key, STATE_SEND_MANIFEST, rank); find_and_update_transfer(peer, sync_state, &key, STATE_SEND_MANIFEST, rank);
break; break;
} }
@ -645,11 +676,19 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc
break; break;
} }
} }
// TODO improve rank algo here; // TODO improve rank algo here;
// Note that we still need to deal with this manifest, we don't want to run out of RAM
rank = log2ll(m->filesize - write->file_offset); rank = log2ll(m->filesize - write->file_offset);
uint8_t bias = REACHABLE_BIAS;
struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_REQ_PAYLOAD, rank);
if (m->has_recipient){
struct subscriber *recipient = find_subscriber(m->recipient.binary, sizeof m->recipient, 0);
if (recipient && (recipient->reachable & (REACHABLE | REACHABLE_SELF)))
bias=0;
}
struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_REQ_PAYLOAD, rank + bias);
transfer->manifest = m; transfer->manifest = m;
transfer->req_len = m->filesize - write->file_offset; transfer->req_len = m->filesize - write->file_offset;
transfer->write = write; transfer->write = write;
@ -679,7 +718,7 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc
} }
rhizome_manifest_free(m); rhizome_manifest_free(m);
struct transfers *transfer = *find_and_update_transfer(sync_state, &key, STATE_SEND_PAYLOAD, rank); struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_SEND_PAYLOAD, rank);
transfer->read = read; transfer->read = read;
transfer->req_len = length; transfer->req_len = length;
read->offset = offset; read->offset = offset;
@ -689,7 +728,7 @@ static void process_transfer_message(struct rhizome_sync_keys *sync_state, struc
size_t len = ob_remaining(payload); size_t len = ob_remaining(payload);
uint8_t *buff = ob_get_bytes_ptr(payload, len); uint8_t *buff = ob_get_bytes_ptr(payload, len);
struct transfers **ptr = find_and_update_transfer(sync_state, &key, STATE_RECV_PAYLOAD, -1); struct transfers **ptr = find_and_update_transfer(peer, sync_state, &key, STATE_RECV_PAYLOAD, -1);
if (!ptr){ if (!ptr){
WHYF("Ignoring message for %s, no transfer in progress!", alloca_sync_key(&key)); WHYF("Ignoring message for %s, no transfer in progress!", alloca_sync_key(&key));
break; break;
@ -779,7 +818,7 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
break; break;
struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet); struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet);
if (recv_payload) if (recv_payload)
process_transfer_message(sync_state, recv_payload); process_transfer_message(header->source, sync_state, recv_payload);
msp_consumed(connection_state, packet, recv_payload); msp_consumed(connection_state, packet, recv_payload);
} }