Cleanup sync state when complete, or peers move out of range

This commit is contained in:
Jeremy Lakeman 2016-03-30 11:53:07 +10:30
parent c5957e9c85
commit 6cedb2d0ac
4 changed files with 65 additions and 10 deletions

View File

@ -199,6 +199,20 @@ int msp_shutdown_stream(struct msp_server_state *state)
return 0;
}
void msp_stop_stream(struct msp_server_state *state)
{
if (state->stream.state & MSP_STATE_STOPPED)
return;
state->stream.state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED;
state->stream.state &= ~MSP_STATE_DATAOUT;
free_all_packets(&state->stream.tx);
uint8_t response = FLAG_STOP;
struct overlay_buffer *payload = ob_static(&response, 1);
ob_limitsize(payload, 1);
send_frame(state, payload);
}
struct msp_server_state * msp_find_and_process(struct msp_server_state **root, const struct internal_mdp_header *header, struct overlay_buffer *payload)
{
if (ob_remaining(payload)<1){
@ -263,11 +277,22 @@ time_ms_t msp_next_action(struct msp_server_state *state)
return state->stream.next_action;
}
time_ms_t msp_last_packet(struct msp_server_state *state)
{
return state->stream.rx.last_packet > state->stream.tx.last_packet ? state->stream.rx.last_packet : state->stream.tx.last_packet;
}
struct subscriber * msp_remote_peer(struct msp_server_state *state)
{
return state->remote_sid;
}
int msp_get_error(struct msp_server_state *state)
{
return (state->stream.state & (MSP_STATE_ERROR|MSP_STATE_STOPPED)) ? 1 : 0;
}
int msp_can_send(struct msp_server_state *state)
{
return (state->stream.state & MSP_STATE_DATAOUT) ? 1 : 0;

View File

@ -19,9 +19,12 @@ struct msp_server_state * msp_find_and_process(struct msp_server_state **root, c
struct msp_packet *msp_recv_next(struct msp_server_state *state);
struct overlay_buffer *msp_unpack(struct msp_server_state *state, struct msp_packet *packet);
void msp_consumed(struct msp_server_state *state, struct msp_packet *packet, struct overlay_buffer *payload);
time_ms_t msp_next_action(struct msp_server_state *state);
time_ms_t msp_last_packet(struct msp_server_state *state);
struct subscriber * msp_remote_peer(struct msp_server_state *state);
int msp_can_send(struct msp_server_state *state);
int msp_get_error(struct msp_server_state *state);
int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator);
struct msp_server_state * msp_process_next(struct msp_iterator *iterator);
@ -30,5 +33,6 @@ time_ms_t msp_iterator_close(struct msp_iterator *iterator);
int msp_send_packet(struct msp_server_state *state, const uint8_t *payload, size_t len);
int msp_shutdown_stream(struct msp_server_state *state);
void msp_stop_stream(struct msp_server_state *state);
#endif

View File

@ -184,7 +184,8 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state)
// so we will still request a high rank item even if there is a low ranked item being received
struct transfers **ptr = &sync_state->queue;
size_t requested_bytes = 0;
time_ms_t now = gettime_ms();
while((*ptr) && msp_can_send(sync_state->connection) && requested_bytes < MAX_REQUEST_BYTES){
struct transfers *msg = *ptr;
if (msg->state == STATE_RECV_PAYLOAD){
@ -328,6 +329,11 @@ static void sync_send_peer(struct rhizome_sync_keys *sync_state)
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_free(payload);
}
if (now - msp_last_packet(sync_state->connection) > 5000){
DEBUGF(rhizome_sync_keys, "Closing idle connection");
msp_shutdown_stream(sync_state->connection);
}
}
DEFINE_ALARM(sync_send);
@ -351,9 +357,24 @@ void sync_send(struct sched_ent *alarm)
struct msp_server_state *connection = msp_next_closed(&iterator);
if (!connection)
break;
struct subscriber *peer = msp_remote_peer(connection);
struct rhizome_sync_keys *sync_state = get_peer_sync_state(peer);
DEBUGF(rhizome_sync_keys, "Connection closed %s", alloca_tohex_sid_t(peer->sid));
// TODO if the msp connection breaks before sync complete, free the full sync state
// drop all transfer records
while(sync_state->queue){
struct transfers *msg = sync_state->queue;
sync_state->queue = msg->next;
clear_transfer(msg);
free(msg);
}
sync_state->connection = NULL;
// eg connection timeout; drop all sync state
if (msp_get_error(connection))
sync_free_peer_state(sync_tree, peer);
}
time_ms_t next_action = msp_iterator_close(&iterator);
@ -757,8 +778,8 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
if (!packet)
break;
struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet);
process_transfer_message(sync_state, recv_payload);
if (recv_payload)
process_transfer_message(sync_state, recv_payload);
msp_consumed(connection_state, packet, recv_payload);
}
@ -774,7 +795,7 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
return 0;
}
static void sync_neighbour_changed(struct subscriber *UNUSED(neighbour), uint8_t UNUSED(found), unsigned count)
static void sync_neighbour_changed(struct subscriber *neighbour, uint8_t found, unsigned count)
{
struct sched_ent *alarm = &ALARM_STRUCT(sync_send_keys);
@ -788,7 +809,13 @@ static void sync_neighbour_changed(struct subscriber *UNUSED(neighbour), uint8_t
DEBUG(rhizome_sync_keys,"Stop queueing messages");
RESCHEDULE(alarm, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
}
// nuke sync state for this peer now?
if (!found){
struct rhizome_sync_keys *sync_state = get_peer_sync_state(neighbour);
// if there's no connection, there shouldn't be any items in the transfer queue
if (sync_state->connection)
msp_stop_stream(sync_state->connection);
}
}
DEFINE_TRIGGER(nbr_change, sync_neighbour_changed);

View File

@ -853,7 +853,7 @@ setup_ManyFiles() {
bundlesC=()
bundlesD=()
for i in `seq 1 5`
for i in `seq 1 10`
do
tfw_log "Adding common file-$i"
tfw_nolog set_instance +A
@ -870,7 +870,7 @@ setup_ManyFiles() {
do
eval "bundles$i=()"
set_instance +$i
for j in `seq 1 3`
for j in `seq 1 10`
do
tfw_log "Adding file$i-$j"
create_file file$i-$j $(( $j * 500 ))
@ -885,7 +885,7 @@ setup_ManyFiles() {
}
test_ManyFiles() {
wait_until --timeout=15 bundle_received_by \
wait_until --timeout=30 bundle_received_by \
${bundlesB[*]} ${bundlesC[*]} ${bundlesD[*]} +A \
${bundlesA[*]} ${bundlesC[*]} ${bundlesD[*]} +B \
${bundlesA[*]} ${bundlesB[*]} ${bundlesD[*]} +C \
@ -894,4 +894,3 @@ test_ManyFiles() {
runTests "$@"