Add new rhizome sync status API

This commit is contained in:
Jeremy Lakeman 2018-04-18 18:10:48 +09:30
parent 18bc162361
commit 44e73b969e
6 changed files with 229 additions and 17 deletions

View File

@ -1171,6 +1171,25 @@ The results will be a single json object with the following fields;
* `filesystem_free_bytes` - the measured free space of the filesystem.
### GET /restful/rhizome/syncstatus.json
Fetch statistics on the current status of rhizome transfers. The list is returned in the body of
the [response](#response) in [JSON table][] format with the following columns:
* `subscriber` - the [SID][] of the neighbour we are transferring with.
* `received_bundles` - count of the number of new bundles successfully received from this neighbour.
* `sent_bytes` - total bytes sent, including protocol overheads.
* `sending_bytes` - total size of data that has been requested by this neighbour, but not yet sent.
* `received_bytes` - total bytes received and processed, including protocol overheads.
* `requested_bytes` - total size of all data we will request from this neighbour.
-----
**Copyright 2015-2017 Serval Project Inc.**
![CC-BY-4.0](./cc-by-4.0.png)

View File

@ -266,6 +266,8 @@ typedef struct httpd_request
struct {
struct rhizome_space_report rhizome_space;
subscriber_iterator neighbour_iterator;
enum list_phase phase;
} status;
} u;

View File

@ -367,6 +367,17 @@ sqlite_retry_state sqlite_retry_state_init(int serverLimit, int serverSleep, int
#define SQLITE_RETRY_STATE_DEFAULT sqlite_retry_state_init(-1,-1,-1,-1)
// sync stats for a single neighbour
struct rhizome_sync_state {
unsigned received_bundles;
uint64_t sent_bytes;
uint64_t sending_bytes;
uint64_t received_bytes;
uint64_t requested_bytes;
};
int rhizome_sync_status(struct subscriber *peer, struct rhizome_sync_state *state);
struct rhizome_space_report {
uint64_t file_count;
uint64_t internal_bytes;

View File

@ -35,6 +35,7 @@ DECLARE_HANDLER("/restful/rhizome/insert", restful_rhizome_insert);
DECLARE_HANDLER("/restful/rhizome/import", restful_rhizome_import);
DECLARE_HANDLER("/restful/rhizome/append", restful_rhizome_append);
DECLARE_HANDLER("/restful/rhizome/storestatus.json", restful_rhizome_disk_status);
DECLARE_HANDLER("/restful/rhizome/syncstatus.json", restful_rhizome_sync_status);
DECLARE_HANDLER("/restful/rhizome/", restful_rhizome_);
static HTTP_RENDERER render_status_and_manifest_headers;
@ -1304,3 +1305,104 @@ static int restful_rhizome_disk_status(httpd_request *r, const char *remainder)
return 1;
}
static int rhizome_sync_status_content_chunk(struct http_request *hr, strbuf b)
{
httpd_request *r = (httpd_request *) hr;
const char *headers[] = {
"subscriber",
"received_bundles",
"sent_bytes",
"sending_bytes",
"received_bytes",
"requested_bytes"
};
switch(r->u.status.phase){
case LIST_HEADER:
strbuf_puts(b, "{\n\"header\":[");
unsigned i;
for (i = 0; i != NELS(headers); ++i) {
if (i)
strbuf_putc(b, ',');
strbuf_json_string(b, headers[i]);
}
strbuf_puts(b, "],\n\"rows\":[");
if (!strbuf_overrun(b))
r->u.status.phase = LIST_FIRST;
return 1;
case LIST_FIRST:
case LIST_ROWS:
while(1){
struct subscriber **subscriberp = subscriber_iterator_get_current(&r->u.status.neighbour_iterator);
if (!subscriberp){
r->u.status.phase = LIST_END;
return 1;
}
struct rhizome_sync_state state;
if (rhizome_sync_status(*subscriberp, &state)!=0){
subscriber_iterator_advance(&r->u.status.neighbour_iterator);
continue;
}
if (r->u.status.phase==LIST_ROWS)
strbuf_puts(b, ",");
strbuf_puts(b, "\n[");
strbuf_json_hex(b, (*subscriberp)->sid.binary, sizeof(sid_t));
strbuf_putc(b, ',');
strbuf_sprintf(b, "%u", state.received_bundles);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, state.sent_bytes);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, state.sending_bytes);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, state.received_bytes);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, state.requested_bytes);
strbuf_putc(b, ']');
if (!strbuf_overrun(b)){
r->u.status.phase=LIST_ROWS;
subscriber_iterator_advance(&r->u.status.neighbour_iterator);
}
return 1;
}
case LIST_END:
strbuf_puts(b, "\n]\n}\n");
if (!strbuf_overrun(b))
r->u.status.phase=LIST_DONE;
return 1;
case LIST_DONE:
return 0;
}
abort();
}
static int rhizome_sync_status_content(struct http_request *hr, unsigned char *buf, size_t bufsz, struct http_content_generator_result *result)
{
return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, rhizome_sync_status_content_chunk);
}
static void finalise_union_sync_status(httpd_request *r)
{
subscriber_iterator_free(&r->u.status.neighbour_iterator);
}
static int restful_rhizome_sync_status(httpd_request *r, const char *remainder)
{
if (*remainder || !is_rhizome_http_enabled())
return 404;
int ret = authorize_restful(&r->http);
if (ret)
return ret;
subscriber_iterator_start(&r->u.status.neighbour_iterator);
r->finalise_union = finalise_union_sync_status;
http_request_response_generated(&r->http, 200, &CONTENT_TYPE_JSON, rhizome_sync_status_content);
return 1;
}

View File

@ -53,6 +53,9 @@ struct transfers{
};
struct rhizome_sync_keys{
unsigned received_bundles;
uint64_t sent_bytes;
uint64_t received_bytes;
struct transfers *queue;
struct msp_server_state *connection;
};
@ -87,6 +90,31 @@ static const char *get_state_name(uint8_t state)
return "Unknown";
}
int rhizome_sync_status(struct subscriber *peer, struct rhizome_sync_state *state){
if (!peer->sync_keys_state)
return 1;
if (!(peer->reachable & REACHABLE_DIRECT))
return 1;
struct rhizome_sync_keys *sync_state = peer->sync_keys_state;
struct transfers *queue = sync_state->queue;
bzero(state, sizeof *state);
while(queue){
if ((queue->state&3) == STATE_SEND)
state->sending_bytes += queue->req_len;
else
state->requested_bytes += queue->req_len;
queue=queue->next;
}
state->received_bundles = sync_state->received_bundles;
state->sent_bytes = sync_state->sent_bytes;
state->received_bytes = sync_state->received_bytes;
return 0;
}
static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr)
{
DEBUGF(rhizome_sync_keys, "Clearing %s %s", get_state_name(ptr->state), alloca_sync_key(&ptr->key));
@ -312,6 +340,7 @@ static void sync_lookup_bar(struct subscriber *peer, struct rhizome_sync_keys *s
struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, &transfer->key, STATE_SEND_BAR, rank);
if (send_bar){
send_bar->req_len = sizeof(send_bar->bar);
rhizome_manifest_to_bar(m, &send_bar->bar);
free(transfer);
}else{
@ -359,6 +388,7 @@ static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sy
if (ob_overrun(payload)){
ob_rewind(payload);
sync_state->sent_bytes+=ob_position(payload);
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_clear(payload);
ob_limitsize(payload, sizeof(buff));
@ -472,6 +502,7 @@ static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sy
}
if (send_payload){
sync_state->sent_bytes+=ob_position(payload);
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
ob_clear(payload);
ob_limitsize(payload, sizeof(buff));
@ -489,8 +520,10 @@ static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sy
}
if (payload){
if (ob_position(payload))
if (ob_position(payload)){
sync_state->sent_bytes+=ob_position(payload);
msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload));
}
ob_free(payload);
}
@ -667,20 +700,22 @@ void sync_send_keys(struct sched_ent *alarm)
static int process_transfer_message(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload)
{
unsigned length = ob_remaining(payload);
int ret = 0;
while(ob_remaining(payload)){
ob_checkpoint(payload);
int msg_state = ob_get(payload);
if (msg_state<0)
return 0;
goto end;
sync_key_t key;
if (ob_get_bytes(payload, key.key, sizeof key)<0)
return 0;
goto end;
int rank=-1;
if (msg_state & STATE_REQ){
rank = ob_get(payload);
if (rank < 0)
return 0;
goto end;
}
DEBUGF(rhizome_sync_keys, "Processing sync message %s %s %d",
@ -689,7 +724,7 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
case STATE_SEND_BAR:{
rhizome_bar_t bar;
if (ob_get_bytes(payload, bar.binary, sizeof(rhizome_bar_t))<0)
return 0;
goto end;
if (!config.rhizome.fetch)
break;
@ -704,7 +739,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
}else if (status != RHIZOME_BUNDLE_STATUS_NEW){
// don't consume the payload
ob_rewind(payload);
return 1;
ret = 1;
goto end;
}
// send a request for the manifest
rank = rhizome_bar_log_size(&bar);
@ -715,7 +751,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
case STATE_REQ_MANIFEST:{
// queue the transmission of the manifest
find_and_update_transfer(peer, sync_state, &key, STATE_SEND_MANIFEST, rank);
struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_SEND_MANIFEST, rank);
transfer->req_len = DUMMY_MANIFEST_SIZE;
break;
}
@ -739,7 +776,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
if (!m){
// don't consume the payload
ob_rewind(payload);
return 1;
ret = 1;
goto end;
}
memcpy(m->manifestdata, data, len);
@ -767,7 +805,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
// don't consume the payload
rhizome_manifest_free(m);
ob_rewind(payload);
return 1;
ret = 1;
goto end;
}
// start writing the payload
@ -790,8 +829,11 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
rhizome_fail_write(write);
free(write);
ob_rewind(payload);
return 1;
ret = 1;
goto end;
}
if (add_status == RHIZOME_BUNDLE_STATUS_NEW)
sync_state->received_bundles++;
DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status));
}
@ -803,7 +845,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
rhizome_fail_write(write);
free(write);
ob_rewind(payload);
return 1;
ret = 1;
goto end;
default:
break;
@ -843,7 +886,9 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(m, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
if (add_state == RHIZOME_BUNDLE_STATUS_NEW)
sync_state->received_bundles++;
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state));
} else {
WHYF("Failed to complete payload %s %s", alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
@ -874,7 +919,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
ob_rewind(payload);
return 1;
ret = 1;
goto end;
}
enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(key.key, sizeof(sync_key_t), m);
@ -883,7 +929,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
// TODO Tidy up. We don't have this bundle anymore!
if (status != RHIZOME_BUNDLE_STATUS_NEW){
ob_rewind(payload);
return 1;
ret = 1;
goto end;
}
break;
}
@ -896,7 +943,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
rhizome_manifest_free(m);
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW){
ob_rewind(payload);
return 1;
ret = 1;
goto end;
}
break;
}
@ -933,6 +981,7 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
if (transfer->write->file_offset >= transfer->write->file_length){
// move this transfer to the global completing list
sync_state->received_bundles++;
transfer->state = STATE_COMPLETING;
*ptr = transfer->next;
transfer->next = completing;
@ -945,7 +994,12 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync
WHYF("Unknown message type %x", msg_state);
}
}
return 0;
end:
// Don't double count bytes that we will process later
length -= ob_remaining(payload);
sync_state->received_bytes += length;
return ret;
}

View File

@ -1172,4 +1172,28 @@ test_RhizomeStatus() {
assertJq response.json 'contains({"filesystem_free_bytes":2041856})'
}
doc_RhizomeSyncStatus="REST API Rhizome sync status"
setup_RhizomeSyncStatus() {
setup
rhizome_add_bundles "$SIDA" 1 10
set_instance +B
create_single_identity
rhizome_add_bundles "$SIDB" 1 10
start_servald_instances +B
}
test_RhizomeSyncStatus() {
set_instance +A
while true; do
rest_request GET "/restful/rhizome/syncstatus.json"
transform_list_json response.json array_of_objects.json
tfw_log "# checking if all bundles received"
[ $(jq "contains([{\"subscriber\":\"$SIDB\",\"received_bundles\":10,\"sending_bytes\":0,\"requested_bytes\":0}])" array_of_objects.json) = "true" ] && break
sleep 1
done
tfw_preserve array_of_objects.json
}
runTests "$@"