From 44e73b969eadcc40fa60b8d86c612e33b62d539a Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 18 Apr 2018 18:10:48 +0930 Subject: [PATCH] Add new rhizome sync status API --- doc/REST-API-Rhizome.md | 19 ++++++++ httpd.h | 2 + rhizome.h | 11 +++++ rhizome_restful.c | 102 ++++++++++++++++++++++++++++++++++++++++ rhizome_sync_keys.c | 88 +++++++++++++++++++++++++++------- tests/rhizomerestful | 24 ++++++++++ 6 files changed, 229 insertions(+), 17 deletions(-) diff --git a/doc/REST-API-Rhizome.md b/doc/REST-API-Rhizome.md index a6062f0a..b442b5da 100644 --- a/doc/REST-API-Rhizome.md +++ b/doc/REST-API-Rhizome.md @@ -1171,6 +1171,25 @@ The results will be a single json object with the following fields; * `filesystem_free_bytes` - the measured free space of the filesystem. +### GET /restful/rhizome/syncstatus.json + +Fetch statistics on the current status of rhizome transfers. The list is returned in the body of +the [response](#response) in [JSON table][] format with the following columns: + +* `subscriber` - the [SID][] of the neighbour we are transferring with. + +* `received_bundles` - count of the number of new bundles successfully received from this neighbour. + +* `sent_bytes` - total bytes sent, including protocol overheads. + +* `sending_bytes` - total size of data that has been requested by this neighbour, but not yet sent. + +* `received_bytes` - total bytes received and processed, including protocol overheads. + +* `requested_bytes` - total size of all data we will request from this neighbour. + + + ----- **Copyright 2015-2017 Serval Project Inc.** ![CC-BY-4.0](./cc-by-4.0.png) diff --git a/httpd.h b/httpd.h index 2ec445ba..11b538c5 100644 --- a/httpd.h +++ b/httpd.h @@ -266,6 +266,8 @@ typedef struct httpd_request struct { struct rhizome_space_report rhizome_space; + subscriber_iterator neighbour_iterator; + enum list_phase phase; } status; } u; diff --git a/rhizome.h b/rhizome.h index 4b3f769c..ad7d1163 100644 --- a/rhizome.h +++ b/rhizome.h @@ -367,6 +367,17 @@ sqlite_retry_state sqlite_retry_state_init(int serverLimit, int serverSleep, int #define SQLITE_RETRY_STATE_DEFAULT sqlite_retry_state_init(-1,-1,-1,-1) +// sync stats for a single neighbour +struct rhizome_sync_state { + unsigned received_bundles; + uint64_t sent_bytes; + uint64_t sending_bytes; + uint64_t received_bytes; + uint64_t requested_bytes; +}; + +int rhizome_sync_status(struct subscriber *peer, struct rhizome_sync_state *state); + struct rhizome_space_report { uint64_t file_count; uint64_t internal_bytes; diff --git a/rhizome_restful.c b/rhizome_restful.c index 4a263688..a83d5986 100644 --- a/rhizome_restful.c +++ b/rhizome_restful.c @@ -35,6 +35,7 @@ DECLARE_HANDLER("/restful/rhizome/insert", restful_rhizome_insert); DECLARE_HANDLER("/restful/rhizome/import", restful_rhizome_import); DECLARE_HANDLER("/restful/rhizome/append", restful_rhizome_append); DECLARE_HANDLER("/restful/rhizome/storestatus.json", restful_rhizome_disk_status); +DECLARE_HANDLER("/restful/rhizome/syncstatus.json", restful_rhizome_sync_status); DECLARE_HANDLER("/restful/rhizome/", restful_rhizome_); static HTTP_RENDERER render_status_and_manifest_headers; @@ -1304,3 +1305,104 @@ static int restful_rhizome_disk_status(httpd_request *r, const char *remainder) return 1; } +static int rhizome_sync_status_content_chunk(struct http_request *hr, strbuf b) +{ + httpd_request *r = (httpd_request *) hr; + const char *headers[] = { + "subscriber", + "received_bundles", + "sent_bytes", + "sending_bytes", + "received_bytes", + "requested_bytes" + }; + switch(r->u.status.phase){ + case LIST_HEADER: + strbuf_puts(b, "{\n\"header\":["); + unsigned i; + for (i = 0; i != NELS(headers); ++i) { + if (i) + strbuf_putc(b, ','); + strbuf_json_string(b, headers[i]); + } + strbuf_puts(b, "],\n\"rows\":["); + if (!strbuf_overrun(b)) + r->u.status.phase = LIST_FIRST; + return 1; + + case LIST_FIRST: + case LIST_ROWS: + + while(1){ + struct subscriber **subscriberp = subscriber_iterator_get_current(&r->u.status.neighbour_iterator); + if (!subscriberp){ + r->u.status.phase = LIST_END; + return 1; + } + struct rhizome_sync_state state; + if (rhizome_sync_status(*subscriberp, &state)!=0){ + subscriber_iterator_advance(&r->u.status.neighbour_iterator); + continue; + } + + if (r->u.status.phase==LIST_ROWS) + strbuf_puts(b, ","); + strbuf_puts(b, "\n["); + + strbuf_json_hex(b, (*subscriberp)->sid.binary, sizeof(sid_t)); + strbuf_putc(b, ','); + strbuf_sprintf(b, "%u", state.received_bundles); + strbuf_putc(b, ','); + strbuf_sprintf(b, "%"PRIu64, state.sent_bytes); + strbuf_putc(b, ','); + strbuf_sprintf(b, "%"PRIu64, state.sending_bytes); + strbuf_putc(b, ','); + strbuf_sprintf(b, "%"PRIu64, state.received_bytes); + strbuf_putc(b, ','); + strbuf_sprintf(b, "%"PRIu64, state.requested_bytes); + strbuf_putc(b, ']'); + + if (!strbuf_overrun(b)){ + r->u.status.phase=LIST_ROWS; + subscriber_iterator_advance(&r->u.status.neighbour_iterator); + } + return 1; + } + + case LIST_END: + strbuf_puts(b, "\n]\n}\n"); + if (!strbuf_overrun(b)) + r->u.status.phase=LIST_DONE; + + return 1; + + case LIST_DONE: + return 0; + } + abort(); +} + +static int rhizome_sync_status_content(struct http_request *hr, unsigned char *buf, size_t bufsz, struct http_content_generator_result *result) +{ + return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, rhizome_sync_status_content_chunk); +} + +static void finalise_union_sync_status(httpd_request *r) +{ + subscriber_iterator_free(&r->u.status.neighbour_iterator); +} + +static int restful_rhizome_sync_status(httpd_request *r, const char *remainder) +{ + if (*remainder || !is_rhizome_http_enabled()) + return 404; + int ret = authorize_restful(&r->http); + if (ret) + return ret; + + subscriber_iterator_start(&r->u.status.neighbour_iterator); + r->finalise_union = finalise_union_sync_status; + + http_request_response_generated(&r->http, 200, &CONTENT_TYPE_JSON, rhizome_sync_status_content); + return 1; +} diff --git a/rhizome_sync_keys.c b/rhizome_sync_keys.c index d8d16bfe..c81473ef 100644 --- a/rhizome_sync_keys.c +++ b/rhizome_sync_keys.c @@ -53,6 +53,9 @@ struct transfers{ }; struct rhizome_sync_keys{ + unsigned received_bundles; + uint64_t sent_bytes; + uint64_t received_bytes; struct transfers *queue; struct msp_server_state *connection; }; @@ -87,6 +90,31 @@ static const char *get_state_name(uint8_t state) return "Unknown"; } +int rhizome_sync_status(struct subscriber *peer, struct rhizome_sync_state *state){ + if (!peer->sync_keys_state) + return 1; + if (!(peer->reachable & REACHABLE_DIRECT)) + return 1; + + struct rhizome_sync_keys *sync_state = peer->sync_keys_state; + struct transfers *queue = sync_state->queue; + + bzero(state, sizeof *state); + + while(queue){ + if ((queue->state&3) == STATE_SEND) + state->sending_bytes += queue->req_len; + else + state->requested_bytes += queue->req_len; + queue=queue->next; + } + state->received_bundles = sync_state->received_bundles; + state->sent_bytes = sync_state->sent_bytes; + state->received_bytes = sync_state->received_bytes; + + return 0; +} + static void _clear_transfer(struct __sourceloc __whence, struct transfers *ptr) { DEBUGF(rhizome_sync_keys, "Clearing %s %s", get_state_name(ptr->state), alloca_sync_key(&ptr->key)); @@ -312,6 +340,7 @@ static void sync_lookup_bar(struct subscriber *peer, struct rhizome_sync_keys *s struct transfers *send_bar = *find_and_update_transfer(peer, sync_state, &transfer->key, STATE_SEND_BAR, rank); if (send_bar){ + send_bar->req_len = sizeof(send_bar->bar); rhizome_manifest_to_bar(m, &send_bar->bar); free(transfer); }else{ @@ -359,6 +388,7 @@ static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sy if (ob_overrun(payload)){ ob_rewind(payload); + sync_state->sent_bytes+=ob_position(payload); msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload)); ob_clear(payload); ob_limitsize(payload, sizeof(buff)); @@ -472,6 +502,7 @@ static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sy } if (send_payload){ + sync_state->sent_bytes+=ob_position(payload); msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload)); ob_clear(payload); ob_limitsize(payload, sizeof(buff)); @@ -489,8 +520,10 @@ static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sy } if (payload){ - if (ob_position(payload)) + if (ob_position(payload)){ + sync_state->sent_bytes+=ob_position(payload); msp_send_packet(sync_state->connection, ob_ptr(payload), ob_position(payload)); + } ob_free(payload); } @@ -667,20 +700,22 @@ void sync_send_keys(struct sched_ent *alarm) static int process_transfer_message(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload) { + unsigned length = ob_remaining(payload); + int ret = 0; while(ob_remaining(payload)){ ob_checkpoint(payload); int msg_state = ob_get(payload); if (msg_state<0) - return 0; + goto end; sync_key_t key; if (ob_get_bytes(payload, key.key, sizeof key)<0) - return 0; - + goto end; + int rank=-1; if (msg_state & STATE_REQ){ rank = ob_get(payload); if (rank < 0) - return 0; + goto end; } DEBUGF(rhizome_sync_keys, "Processing sync message %s %s %d", @@ -689,7 +724,7 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync case STATE_SEND_BAR:{ rhizome_bar_t bar; if (ob_get_bytes(payload, bar.binary, sizeof(rhizome_bar_t))<0) - return 0; + goto end; if (!config.rhizome.fetch) break; @@ -704,7 +739,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync }else if (status != RHIZOME_BUNDLE_STATUS_NEW){ // don't consume the payload ob_rewind(payload); - return 1; + ret = 1; + goto end; } // send a request for the manifest rank = rhizome_bar_log_size(&bar); @@ -715,7 +751,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync case STATE_REQ_MANIFEST:{ // queue the transmission of the manifest - find_and_update_transfer(peer, sync_state, &key, STATE_SEND_MANIFEST, rank); + struct transfers *transfer = *find_and_update_transfer(peer, sync_state, &key, STATE_SEND_MANIFEST, rank); + transfer->req_len = DUMMY_MANIFEST_SIZE; break; } @@ -739,7 +776,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync if (!m){ // don't consume the payload ob_rewind(payload); - return 1; + ret = 1; + goto end; } memcpy(m->manifestdata, data, len); @@ -767,7 +805,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync // don't consume the payload rhizome_manifest_free(m); ob_rewind(payload); - return 1; + ret = 1; + goto end; } // start writing the payload @@ -790,8 +829,11 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync rhizome_fail_write(write); free(write); ob_rewind(payload); - return 1; + ret = 1; + goto end; } + if (add_status == RHIZOME_BUNDLE_STATUS_NEW) + sync_state->received_bundles++; DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)", alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status)); } @@ -803,7 +845,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync rhizome_fail_write(write); free(write); ob_rewind(payload); - return 1; + ret = 1; + goto end; default: break; @@ -843,7 +886,9 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){ enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(m, NULL); - DEBUGF(rhizome_sync_keys, "Import %s = %s", + if (add_state == RHIZOME_BUNDLE_STATUS_NEW) + sync_state->received_bundles++; + DEBUGF(rhizome_sync_keys, "Import %s = %s", alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state)); } else { WHYF("Failed to complete payload %s %s", alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status)); @@ -874,7 +919,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync rhizome_manifest *m = rhizome_new_manifest(); if (!m){ ob_rewind(payload); - return 1; + ret = 1; + goto end; } enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(key.key, sizeof(sync_key_t), m); @@ -883,7 +929,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync // TODO Tidy up. We don't have this bundle anymore! if (status != RHIZOME_BUNDLE_STATUS_NEW){ ob_rewind(payload); - return 1; + ret = 1; + goto end; } break; } @@ -896,7 +943,8 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync rhizome_manifest_free(m); if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW){ ob_rewind(payload); - return 1; + ret = 1; + goto end; } break; } @@ -933,6 +981,7 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync if (transfer->write->file_offset >= transfer->write->file_length){ // move this transfer to the global completing list + sync_state->received_bundles++; transfer->state = STATE_COMPLETING; *ptr = transfer->next; transfer->next = completing; @@ -945,7 +994,12 @@ static int process_transfer_message(struct subscriber *peer, struct rhizome_sync WHYF("Unknown message type %x", msg_state); } } - return 0; + +end: + // Don't double count bytes that we will process later + length -= ob_remaining(payload); + sync_state->received_bytes += length; + return ret; } diff --git a/tests/rhizomerestful b/tests/rhizomerestful index ffbb9a08..78f18d63 100755 --- a/tests/rhizomerestful +++ b/tests/rhizomerestful @@ -1172,4 +1172,28 @@ test_RhizomeStatus() { assertJq response.json 'contains({"filesystem_free_bytes":2041856})' } +doc_RhizomeSyncStatus="REST API Rhizome sync status" +setup_RhizomeSyncStatus() { + setup + rhizome_add_bundles "$SIDA" 1 10 + set_instance +B + create_single_identity + rhizome_add_bundles "$SIDB" 1 10 + start_servald_instances +B +} +test_RhizomeSyncStatus() { + set_instance +A + + while true; do + rest_request GET "/restful/rhizome/syncstatus.json" + transform_list_json response.json array_of_objects.json + + tfw_log "# checking if all bundles received" + [ $(jq "contains([{\"subscriber\":\"$SIDB\",\"received_bundles\":10,\"sending_bytes\":0,\"requested_bytes\":0}])" array_of_objects.json) = "true" ] && break + + sleep 1 + done + tfw_preserve array_of_objects.json +} + runTests "$@"