From 09d04d7c8fe0f849f05d6f5c277797f572dddb83 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Mon, 10 Oct 2016 13:34:18 +1030 Subject: [PATCH] Add restful api for listing broadcast messages --- httpd.h | 14 ++- meshmb_restful.c | 225 ++++++++++++++++++++++++++++++++++++++++++++ meshms_restful.c | 5 +- tests/all | 1 + tests/meshmbrestful | 37 +++++++- 5 files changed, 275 insertions(+), 7 deletions(-) diff --git a/httpd.h b/httpd.h index a2260a51..9ed154d6 100644 --- a/httpd.h +++ b/httpd.h @@ -193,7 +193,7 @@ typedef struct httpd_request } msglist; - /* For responses that send a MeshMS message. + /* For responses that send a MeshMS / MeshMB message. */ struct { // Which part is currently being received @@ -204,7 +204,17 @@ typedef struct httpd_request struct form_buf_malloc message; } sendmsg; - + + struct{ + struct message_ply_read ply_reader; + enum list_phase phase; + uint64_t start_offset; + uint64_t current_offset; + uint64_t end_offset; + size_t rowcount; + time_ms_t end_time; + time_s_t timestamp; + } plylist; struct { int fd; diff --git a/meshmb_restful.c b/meshmb_restful.c index ef1f2d89..0a4e01b2 100644 --- a/meshmb_restful.c +++ b/meshmb_restful.c @@ -1,4 +1,5 @@ #include "serval.h" +#include "dataformats.h" #include "conf.h" #include "httpd.h" #include "str.h" @@ -111,6 +112,221 @@ static int restful_meshmb_send(httpd_request *r, const char *remainder) return 1; } +static strbuf position_token_to_str(strbuf b, uint64_t position) +{ + uint8_t tmp[12]; + char tmp_str[BASE64_ENCODED_LEN(12)+1]; + + int len = pack_uint(tmp, position); + assert(len <= (int)sizeof tmp); + size_t n = base64url_encode(tmp_str, tmp, len); + tmp_str[n] = '\0'; + return strbuf_puts(b, tmp_str); +} + +static int strn_to_position_token(const char *str, uint64_t *position, const char **afterp) +{ + uint8_t token[12]; + size_t token_len = base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL); + + int unpacked; + if ((unpacked = unpack_uint(token, token_len, position))==-1){ + *position = 0; + *afterp=str; + }else + (*afterp)++; + return 1; +} + +static int next_ply_message(httpd_request *r){ + if (!message_ply_is_open(&r->u.plylist.ply_reader)){ + if (message_ply_read_open(&r->u.plylist.ply_reader, &r->bid)==-1) + return -1; + + // skip back to where we were + if (r->u.plylist.current_offset) + r->u.plylist.ply_reader.read.offset = r->u.plylist.current_offset; + + DEBUGF(httpd, "Opened ply @%"PRIu64, r->u.plylist.ply_reader.read.offset); + } + + if (r->u.plylist.current_offset==0){ + // enumerate everything from the top + DEBUGF(httpd, "Started reading @%"PRIu64, r->u.plylist.ply_reader.read.length); + r->u.plylist.current_offset = + r->u.plylist.start_offset = + r->u.plylist.ply_reader.read.offset = + r->u.plylist.ply_reader.read.length; + } + + while(message_ply_read_prev(&r->u.plylist.ply_reader) == 0){ + r->u.plylist.current_offset = r->u.plylist.ply_reader.record_end_offset; + if (r->u.plylist.current_offset <= r->u.plylist.end_offset){ + DEBUGF(httpd, "Hit end %"PRIu64" @%"PRIu64, + r->u.plylist.end_offset, r->u.plylist.current_offset); + break; + } + + switch(r->u.plylist.ply_reader.type){ + case MESSAGE_BLOCK_TYPE_TIME: + if (r->u.plylist.ply_reader.record_length<4){ + WARN("Malformed ply, expected 4 byte timestamp"); + continue; + } + r->u.plylist.timestamp = read_uint32(r->u.plylist.ply_reader.record); + break; + + case MESSAGE_BLOCK_TYPE_MESSAGE: + return 1; + + case MESSAGE_BLOCK_TYPE_ACK: + // TODO, link to some other ply? + break; + + default: + //ignore unknown types + break; + } + } + return 0; +} + +static int restful_meshmb_list_json_content_chunk(struct http_request *hr, strbuf b) +{ + httpd_request *r = (httpd_request *) hr; + // The "my_sid" and "their_sid" per-conversation fields allow the same JSON structure to be used + // in a future, non-SID-specific request, eg, to list all conversations for all currently open + // identities. + const char *headers[] = { + "offset", + "token", + "text", + "timestamp" + }; + + DEBUGF(httpd, "Phase %d", r->u.plylist.phase); + + switch (r->u.plylist.phase) { + case LIST_HEADER: + strbuf_puts(b, "{\n\"header\":["); + unsigned i; + for (i = 0; i != NELS(headers); ++i) { + if (i) + strbuf_putc(b, ','); + strbuf_json_string(b, headers[i]); + } + strbuf_puts(b, "],\n\"rows\":["); + if (!strbuf_overrun(b)) + r->u.plylist.phase = LIST_ROWS; + return 1; + +ROWS: + case LIST_ROWS: + case LIST_FIRST: + + if (!message_ply_is_open(&r->u.plylist.ply_reader)){ + // re-load the current message text + if (next_ply_message(r)!=1) + goto END; + } + + if (r->u.plylist.rowcount!=0) + strbuf_putc(b, ','); + strbuf_puts(b, "\n["); + + strbuf_sprintf(b, "%"PRIu64, r->u.plylist.current_offset); + strbuf_puts(b, ",\""); + position_token_to_str(b, r->u.plylist.current_offset); + strbuf_puts(b, "\","); + strbuf_json_string(b, (const char *)r->u.plylist.ply_reader.record); + strbuf_putc(b, ','); + strbuf_sprintf(b, "%d", r->u.plylist.timestamp); + strbuf_puts(b, "]"); + + if (!strbuf_overrun(b)) { + ++r->u.plylist.rowcount; + if (next_ply_message(r)!=1) + r->u.plylist.phase = LIST_END; + } + return 1; + +END: + r->u.plylist.phase = LIST_END; + case LIST_END: + + { + time_ms_t now; + // during a new-since request, we don't really want to end until the time limit has elapsed + if (r->u.plylist.end_time && (now = gettime_ms()) < r->u.plylist.end_time) { + // where we started this time, will become where we end on the next pass; + r->u.plylist.end_offset = r->u.plylist.start_offset; + r->u.plylist.current_offset = 0; + r->u.plylist.phase = LIST_ROWS; + + if (r->u.plylist.ply_reader.read.length > r->u.plylist.start_offset && next_ply_message(r)==1) + // new content arrived while we were iterating, we can resume immediately + goto ROWS; + + message_ply_read_close(&r->u.plylist.ply_reader); + http_request_pause_response(&r->http, r->u.plylist.end_time); + return 0; + } + } + + strbuf_puts(b, "\n]\n}\n"); + if (!strbuf_overrun(b)) + r->u.plylist.phase = LIST_DONE; + // fall through... + case LIST_DONE: + return 0; + } + abort(); + return 0; +} + +static int restful_meshmb_list_json_content(struct http_request *hr, unsigned char *buf, size_t bufsz, struct http_content_generator_result *result) +{ + return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshmb_list_json_content_chunk); +} + +static void list_on_rhizome_add(httpd_request *r, rhizome_manifest *m) +{ + if (strcmp(m->service, RHIZOME_SERVICE_MESHMB) == 0 + && cmp_rhizome_bid_t(&m->keypair.public_key, &r->bid)==0) { + message_ply_read_close(&r->u.plylist.ply_reader); + http_request_resume_response(&r->http); + } +} + +static void list_finalise(httpd_request *r) +{ + message_ply_read_close(&r->u.plylist.ply_reader); +} + +static int restful_meshmb_list(httpd_request *r, const char *remainder) +{ + if (*remainder) + return 404; + assert(r->finalise_union == NULL); + r->finalise_union = list_finalise; + r->trigger_rhizome_bundle_added = list_on_rhizome_add; + r->u.plylist.phase = LIST_HEADER; + r->u.plylist.rowcount = 0; + r->u.plylist.end_offset = r->ui64; + + http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshmb_list_json_content); + return 1; +} + +static int restful_meshmb_newsince_list(httpd_request *r, const char *remainder) +{ + int ret; + if ((ret = restful_meshmb_list(r, remainder))==1){ + r->u.plylist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000; + } + return ret; +} + DECLARE_HANDLER("/restful/meshmb/", restful_meshmb_); static int restful_meshmb_(httpd_request *r, const char *remainder) { @@ -131,6 +347,15 @@ static int restful_meshmb_(httpd_request *r, const char *remainder) handler = restful_meshmb_send; verb = HTTP_VERB_POST; remainder = ""; + } else if (strcmp(remainder, "/messagelist.json") == 0) { + handler = restful_meshmb_list; + remainder = ""; + r->ui64 = 0; + } else if ( str_startswith(remainder, "/newsince/", &end) + && strn_to_position_token(end, &r->ui64, &end) + && strcmp(end, "messagelist.json") == 0) { + handler = restful_meshmb_newsince_list; + remainder = ""; } } diff --git a/meshms_restful.c b/meshms_restful.c index e2663b11..88fb678b 100644 --- a/meshms_restful.c +++ b/meshms_restful.c @@ -291,10 +291,8 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques r->u.mclist.phase = LIST_END; // fall through... } else { - if (r->u.mclist.phase==LIST_ROWS) + if (r->u.mclist.rowcount!=0) strbuf_putc(b, ','); - else - r->u.mclist.phase=LIST_ROWS; strbuf_puts(b, "\n["); strbuf_sprintf(b, "%zu", r->u.mclist.rowcount); strbuf_putc(b, ','); @@ -309,6 +307,7 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.read_offset); strbuf_puts(b, "]"); if (!strbuf_overrun(b)) { + r->u.mclist.phase=LIST_ROWS; meshms_conversation_iterator_advance(&r->u.mclist.iter); ++r->u.mclist.rowcount; } diff --git a/tests/all b/tests/all index e0ede96f..c9ee3d52 100755 --- a/tests/all +++ b/tests/all @@ -39,6 +39,7 @@ includeTests vomp includeTests keyringrestful includeTests rhizomerestful includeTests meshmsrestful +includeTests meshmbrestful if type -p "$JAVAC" >/dev/null; then includeTests jni includeTests keyringjava diff --git a/tests/meshmbrestful b/tests/meshmbrestful index 8c26304a..fc1d7772 100755 --- a/tests/meshmbrestful +++ b/tests/meshmbrestful @@ -13,6 +13,7 @@ setup_instance() { set debug.http_server on \ set debug.httpd on \ set debug.meshmb on \ + set debug.meshms on \ set debug.verbose on \ set log.console.level debug set_extra_config @@ -48,8 +49,8 @@ teardown() { report_all_servald_servers } -doc_MeshmbSend="Send a broadcast message" -test_MeshmbSend() { +doc_MeshMBRestSend="Restful send of a broadcast message" +test_MeshMBRestSend() { executeOk curl \ -H "Expect:" \ --silent --fail --show-error \ @@ -62,5 +63,37 @@ test_MeshmbSend() { tfw_cat -h broadcast } +doc_MeshMBRestList="Restful list of meshmb messages" +setup_MeshMBRestList() { + setup + executeOk_servald meshmb send $IDA1 "Message 1" + executeOk_servald meshmb send $IDA1 "Message 2" +} +test_MeshMBRestList() { + executeOk curl \ + -H "Expect:" \ + --silent --fail --show-error \ + --output listmessages.json \ + --dump-header http.headers \ + --basic --user harry:potter \ + "http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/messagelist.json" + tfw_cat http.headers listmessages.json + tfw_preserve listmessages.json + assert [ "$(jq '.rows | length' listmessages.json)" = 2 ] + transform_list_json listmessages.json list.json + tfw_preserve list.json + assertJq list.json \ + "contains([ + { offset: 12, + text: \"Message 1\" + } + ])" + assertJq list.json \ + "contains([ + { offset: 30, + text: \"Message 2\" + } + ])" +} runTests "$@"