diff --git a/httpd.h b/httpd.h index e319fbc8..306287c8 100644 --- a/httpd.h +++ b/httpd.h @@ -225,6 +225,12 @@ typedef struct httpd_request uint8_t generation; enum list_phase phase; size_t rowcount; + time_ms_t end_time; + uint64_t start_ack_offset; + uint64_t current_ack_offset; + uint64_t current_msg_offset; + uint64_t end_ack_offset; + uint64_t end_msg_offset; } meshmb_feeds; struct { diff --git a/meshmb.c b/meshmb.c index 6b27d2ce..eed130be 100644 --- a/meshmb.c +++ b/meshmb.c @@ -47,18 +47,8 @@ struct meshmb_activity_iterator *meshmb_activity_open(struct meshmb_feeds *feeds return ret; } -int meshmb_activity_next(struct meshmb_activity_iterator *i){ +static int activity_next_ack(struct meshmb_activity_iterator *i){ while(1){ - // can we read another message? - if (message_ply_is_open(&i->msg_reader) - && i->msg_reader.read.offset > i->ack_start){ - DEBUGF(meshmb, "Reading next incoming record from %u", - i->msg_reader.read.offset); - if (message_ply_read_prev(&i->msg_reader)!=-1 - && i->msg_reader.read.offset >= i->ack_start) - return 1; - } - // read the next ack if (message_ply_read_prev(&i->ack_reader)==-1) return 0; @@ -94,6 +84,8 @@ int meshmb_activity_next(struct meshmb_activity_iterator *i){ i->ack_start = ack.start_offset; i->msg_reader.read.offset = ack.end_offset; + return 1; + } break; default: @@ -102,6 +94,37 @@ int meshmb_activity_next(struct meshmb_activity_iterator *i){ } } +int meshmb_activity_seek(struct meshmb_activity_iterator *i, uint64_t ack_offset, uint64_t msg_offset){ + if (ack_offset) + i->ack_reader.read.offset = ack_offset; + int r; + if ((r = activity_next_ack(i))!=1) + return r; + if (msg_offset){ + if (msg_offset > i->msg_reader.read.offset || msg_offset < i->ack_start) + return -1; + i->msg_reader.read.offset = msg_offset; + } + return meshmb_activity_next(i); +} + +int meshmb_activity_next(struct meshmb_activity_iterator *i){ + while(1){ + // can we read another message? + if (message_ply_is_open(&i->msg_reader) + && i->msg_reader.read.offset > i->ack_start){ + DEBUGF(meshmb, "Reading next incoming record from %u", + i->msg_reader.read.offset); + if (message_ply_read_prev(&i->msg_reader)!=-1 + && i->msg_reader.read.offset >= i->ack_start) + return 1; + } + int r; + if ((r = activity_next_ack(i))!=1) + return r; + } +} + void meshmb_activity_close(struct meshmb_activity_iterator *i){ message_ply_read_close(&i->ack_reader); message_ply_read_close(&i->msg_reader); @@ -293,17 +316,21 @@ static int update_stats_tree(void **record, void *context) } // eg, if a bundle_add trigger occurs while the feed list is open -void meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader) +int meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader) { struct feed_metadata *metadata; if (strcmp(m->service, RHIZOME_SERVICE_MESHMB) == 0 && tree_find(&feeds->root, (void**)&metadata, m->keypair.public_key.binary, sizeof m->keypair.public_key.binary, NULL, NULL)==0){ metadata->ply.found = 1; - metadata->ply.size = m->filesize; - - update_stats(feeds, metadata, reader); + if (metadata->ply.size != m->filesize){ + metadata->ply.size = m->filesize; + if (update_stats(feeds, metadata, reader)==-1) + return -1; + } + return 1; } + return 0; } int meshmb_update(struct meshmb_feeds *feeds) diff --git a/meshmb.h b/meshmb.h index 5bbdd4f2..a377941b 100644 --- a/meshmb.h +++ b/meshmb.h @@ -50,12 +50,13 @@ int meshmb_enum(struct meshmb_feeds *feeds, rhizome_bid_t *restart_from, meshmb_ // enumerate messages, starting with the most recently received struct meshmb_activity_iterator *meshmb_activity_open(struct meshmb_feeds *feeds); int meshmb_activity_next(struct meshmb_activity_iterator *i); +int meshmb_activity_seek(struct meshmb_activity_iterator *i, uint64_t ack_offset, uint64_t msg_offset); void meshmb_activity_close(struct meshmb_activity_iterator *i); // update metadata of all feeds based on current rhizome contents (optionally call after opening) int meshmb_update(struct meshmb_feeds *feeds); // update metadata of a single feed, eg because of a new bundle or when about to read a single ply. // it is the callers reponsibility to supply a reader and close it -void meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader); +int meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader); #endif diff --git a/meshmb_restful.c b/meshmb_restful.c index 42da8bc8..e404556d 100644 --- a/meshmb_restful.c +++ b/meshmb_restful.c @@ -141,6 +141,35 @@ static int strn_to_position_token(const char *str, uint64_t *position, const cha return 1; } +static strbuf activity_token_to_str(strbuf b, const httpd_request *r) +{ + uint8_t tmp[12]; + char tmp_str[BASE64_ENCODED_LEN(12)+1]; + unsigned len = 0; + len += pack_uint(&tmp[len], r->u.meshmb_feeds.current_ack_offset); + len += pack_uint(&tmp[len], r->u.meshmb_feeds.current_msg_offset); + assert(len <= sizeof tmp); + size_t n = base64url_encode(tmp_str, tmp, len); + tmp_str[n] = '\0'; + return strbuf_puts(b, tmp_str); +} + +static int strn_to_activity_token(const char *str, httpd_request *r, const char **afterp) +{ + uint8_t token[12]; + size_t token_len = base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL); + + int unpacked; + if ((unpacked = unpack_uint(token, token_len, &r->u.meshmb_feeds.end_ack_offset))!=-1 + && (unpacked = unpack_uint(token + unpacked, token_len - unpacked, &r->u.meshmb_feeds.end_msg_offset))!=-1 + && **afterp=='/'){ + (*afterp)++; + } else { + *afterp=str; + } + return 1; +} + static int next_ply_message(httpd_request *r){ if (!message_ply_is_open(&r->u.plylist.ply_reader)){ if (message_ply_read_open(&r->u.plylist.ply_reader, &r->bid, NULL)==-1){ @@ -415,6 +444,7 @@ static struct meshmb_session *open_session(const identity_t *identity){ if (meshmb_open(id, &feeds)==-1) return NULL; + meshmb_update(feeds); session = emalloc(sizeof (struct meshmb_session)); if (!session){ meshmb_close(feeds); @@ -585,12 +615,21 @@ static void feedlist_on_rhizome_add(httpd_request *r, rhizome_manifest *m) { struct message_ply_read reader; bzero(&reader, sizeof(reader)); - meshmb_bundle_update(r->u.meshmb_feeds.session->feeds, m, &reader); + int ret = meshmb_bundle_update(r->u.meshmb_feeds.session->feeds, m, &reader); message_ply_read_close(&reader); + if (ret!=1) + return; + // TODO short timer? Syncing with a new neighbour might update a lot of subscribed feeds. int gen = meshmb_flush(r->u.meshmb_feeds.session->feeds); - if (gen>=0 && gen != r->u.meshmb_feeds.generation) + if (gen>=0 && gen != r->u.meshmb_feeds.generation){ + if (r->u.meshmb_feeds.iterator){ + meshmb_activity_close(r->u.meshmb_feeds.iterator); + r->u.meshmb_feeds.iterator = NULL; + } + r->u.meshmb_feeds.generation = gen; http_request_resume_response(&r->http); + } } static void feedlist_finalise(httpd_request *r) @@ -625,11 +664,64 @@ static int restful_meshmb_feedlist(httpd_request *r, const char *remainder) return 1; } +static void activity_test_end(httpd_request *r){ + struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator; + if (iterator){ + if (iterator->ack_reader.record_end_offset > r->u.meshmb_feeds.end_ack_offset + || (iterator->ack_reader.record_end_offset == r->u.meshmb_feeds.end_ack_offset + && iterator->msg_reader.record_end_offset > r->u.meshmb_feeds.end_msg_offset)){ + + r->u.meshmb_feeds.phase = LIST_ROWS; + + DEBUGF(httpd,"Iterator @ack %"PRIu64", @msg %"PRIu64, + r->u.meshmb_feeds.current_ack_offset, + r->u.meshmb_feeds.current_msg_offset); + return; + } + } + r->u.meshmb_feeds.phase = LIST_END; +} + +static void activity_next(httpd_request *r){ + struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator; + while(iterator && meshmb_activity_next(iterator)==1){ + switch(iterator->msg_reader.type){ + case MESSAGE_BLOCK_TYPE_MESSAGE: + + r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset; + r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset; + activity_test_end(r); + return; + } + } + + r->u.meshmb_feeds.phase = LIST_END; +} + +static void activity_iterator_open(httpd_request *r){ + if (r->u.meshmb_feeds.iterator) + return; + struct meshmb_activity_iterator *iterator = meshmb_activity_open(r->u.meshmb_feeds.session->feeds); + if (iterator){ + r->u.meshmb_feeds.iterator = iterator; + meshmb_activity_seek(iterator, r->u.meshmb_feeds.current_ack_offset, r->u.meshmb_feeds.current_msg_offset); + if (r->u.meshmb_feeds.start_ack_offset == 0) + r->u.meshmb_feeds.start_ack_offset = iterator->ack_reader.read.length; + r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset; + r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset; + if (iterator->msg_reader.type != MESSAGE_BLOCK_TYPE_MESSAGE){ + activity_next(r); + return; + } + } + activity_test_end(r); +} + static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, strbuf b) { httpd_request *r = (httpd_request *) hr; const char *headers[] = { - "token", + ".token", "id", "author", "name", @@ -640,8 +732,6 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s DEBUGF(httpd, "Phase %d", r->u.meshmb_feeds.phase); - struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator; - switch (r->u.meshmb_feeds.phase) { case LIST_HEADER: strbuf_puts(b, "{\n\"header\":["); @@ -652,57 +742,78 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s strbuf_json_string(b, headers[i]); } strbuf_puts(b, "],\n\"rows\":["); - if (strbuf_overrun(b)) - return 1; - if(meshmb_activity_next(iterator)==1) - r->u.meshmb_feeds.phase = LIST_ROWS; - else{ - r->u.meshmb_feeds.phase = LIST_END; - return 1; - } - // fallthrough + if (!strbuf_overrun(b)) + activity_iterator_open(r); + + return 1; + case LIST_ROWS: case LIST_FIRST: +ROWS: { - size_t checkpoint = strbuf_len(b); + activity_iterator_open(r); + struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator; - while(r->u.meshmb_feeds.phase == LIST_ROWS){ - switch(iterator->msg_reader.type){ - case MESSAGE_BLOCK_TYPE_MESSAGE: - if (r->u.meshmb_feeds.rowcount!=0) - strbuf_putc(b, ','); - strbuf_puts(b, "\n["); - strbuf_sprintf(b, "%zu", r->u.meshmb_feeds.rowcount); - strbuf_puts(b, ","); - strbuf_json_hex(b, iterator->msg_reader.bundle_id.binary, sizeof iterator->msg_reader.bundle_id.binary); - strbuf_puts(b, ","); - strbuf_json_hex(b, iterator->msg_reader.author.binary, sizeof iterator->msg_reader.author.binary); - strbuf_puts(b, ","); - strbuf_json_string(b, iterator->msg_reader.name); - strbuf_puts(b, ","); - strbuf_sprintf(b, "%d", iterator->ack_timestamp); - strbuf_puts(b, ","); - strbuf_sprintf(b, "%lu", iterator->msg_reader.record_end_offset); - strbuf_puts(b, ","); - strbuf_json_string(b, (const char *)iterator->msg_reader.record); - strbuf_puts(b, "]"); - if (strbuf_overrun(b)){ - strbuf_trunc(b, checkpoint); - return 1; - } - checkpoint = strbuf_len(b); - r->u.meshmb_feeds.rowcount++; + if (r->u.meshmb_feeds.rowcount!=0) + strbuf_putc(b, ','); + strbuf_puts(b, "\n[\""); + activity_token_to_str(b, r); + strbuf_puts(b, "\","); + strbuf_json_hex(b, iterator->msg_reader.bundle_id.binary, sizeof iterator->msg_reader.bundle_id.binary); + strbuf_puts(b, ","); + strbuf_json_hex(b, iterator->msg_reader.author.binary, sizeof iterator->msg_reader.author.binary); + strbuf_puts(b, ","); + strbuf_json_string(b, iterator->msg_reader.name); + strbuf_puts(b, ","); + strbuf_sprintf(b, "%d", iterator->ack_timestamp); + strbuf_puts(b, ","); + strbuf_sprintf(b, "%lu", iterator->msg_reader.record_end_offset); + strbuf_puts(b, ","); + strbuf_json_string(b, (const char *)iterator->msg_reader.record); + strbuf_puts(b, "]"); + if (!strbuf_overrun(b)){ + r->u.meshmb_feeds.rowcount++; + DEBUGF(httpd, "Wrote record %u (%s)", r->u.meshmb_feeds.rowcount, (const char *)iterator->msg_reader.record); + activity_next(r); + } + return 1; + } + + case LIST_END: + + { + time_ms_t now; + // during a new-since request, we don't really want to end until the time limit has elapsed + if (r->u.meshmb_feeds.end_time && (now = gettime_ms()) < r->u.meshmb_feeds.end_time) { + // where we started this time, will become where we end on the next pass; + r->u.meshmb_feeds.end_ack_offset = r->u.meshmb_feeds.start_ack_offset; + r->u.meshmb_feeds.end_msg_offset = 0; + r->u.meshmb_feeds.phase = LIST_ROWS; + + struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator; + if (iterator && iterator->ack_reader.read.length > r->u.meshmb_feeds.start_ack_offset){ + DEBUGF(httpd, "Seeking back to ack %"PRIu64", msg (0) to resume now", iterator->ack_reader.read.length); + r->u.meshmb_feeds.start_ack_offset = iterator->ack_reader.read.length; + meshmb_activity_seek(iterator, r->u.meshmb_feeds.start_ack_offset, 0); + r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset; + r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset; + if (iterator->msg_reader.type != MESSAGE_BLOCK_TYPE_MESSAGE) + activity_next(r); + goto ROWS; } - if(meshmb_activity_next(iterator)==1) - r->u.meshmb_feeds.phase = LIST_ROWS; - else - r->u.meshmb_feeds.phase = LIST_END; + + r->u.meshmb_feeds.start_ack_offset = 0; + r->u.meshmb_feeds.current_ack_offset = 0; + r->u.meshmb_feeds.current_msg_offset = 0; + if (iterator) + meshmb_activity_close(iterator); + r->u.meshmb_feeds.iterator = NULL; + http_request_pause_response(&r->http, r->u.meshmb_feeds.end_time); + return 0; } } - // fallthrough - case LIST_END: strbuf_puts(b, "\n]\n}\n"); if (!strbuf_overrun(b)) r->u.plylist.phase = LIST_DONE; @@ -728,24 +839,17 @@ static int restful_meshmb_activity(httpd_request *r, const char *remainder) http_request_simple_response(&r->http, 500, "TODO, detailed response"); return 500; } - struct meshmb_activity_iterator *iterator = meshmb_activity_open(session->feeds); - if (!iterator){ - close_session(session); - http_request_simple_response(&r->http, 500, "TODO, detailed response"); - return 500; - } assert(r->finalise_union == NULL); r->finalise_union = feedlist_finalise; r->trigger_rhizome_bundle_added = feedlist_on_rhizome_add; r->u.meshmb_feeds.phase = LIST_HEADER; r->u.meshmb_feeds.session = session; - r->u.meshmb_feeds.iterator = iterator; + r->u.meshmb_feeds.iterator = NULL; r->u.meshmb_feeds.generation = meshmb_flush(session->feeds); bzero(&r->u.meshmb_feeds.bundle_id, sizeof r->u.meshmb_feeds.bundle_id); http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshmb_activity_json_content); return 1; - } DECLARE_HANDLER("/restful/meshmb/", restful_meshmb_); @@ -779,7 +883,15 @@ static int restful_meshmb_(httpd_request *r, const char *remainder) } else if (strcmp(remainder, "/activity.json") == 0) { handler = restful_meshmb_activity; remainder = ""; - r->ui64 = 0; + r->u.meshmb_feeds.end_ack_offset = 0; + r->u.meshmb_feeds.end_msg_offset = 0; + r->u.meshmb_feeds.end_time = 0; + } else if ( str_startswith(remainder, "/activity/", &end) + && strn_to_activity_token(end, r, &end) + && strcmp(end, "activity.json") == 0) { + r->u.meshmb_feeds.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000; + handler = restful_meshmb_activity; + remainder = ""; } else if ( str_startswith(remainder, "/newsince/", &end) && strn_to_position_token(end, &r->ui64, &end) && strcmp(end, "messagelist.json") == 0) { diff --git a/rhizome_database.c b/rhizome_database.c index df2648d0..75a30506 100644 --- a/rhizome_database.c +++ b/rhizome_database.c @@ -1979,9 +1979,10 @@ void server_rhizome_add_bundle(uint64_t rowid){ break; if (unpack_manifest_row(m, statement)!=-1){ if (rhizome_manifest_verify(m)){ - assert(max_rowid < m->rowid); - max_rowid = m->rowid; + if (max_rowid < m->rowid) + max_rowid = m->rowid; CALL_TRIGGER(bundle_add, m); + // Note that a trigger might cause a new bundle to be added, and max_rowid to jump } } rhizome_manifest_free(m); diff --git a/tests/meshmbrestful b/tests/meshmbrestful index 72be23d7..cf67f266 100755 --- a/tests/meshmbrestful +++ b/tests/meshmbrestful @@ -174,6 +174,26 @@ test_MeshMBRestFeeds() { ])" } +doc_MeshMBRestEmptyActivity="Restful activity with no content" +setup_MeshMBRestEmptyActivity() { + IDENTITY_COUNT=1 + setup +} +test_MeshMBRestEmptyActivity() { + executeOk --timeout=10 curl \ + -H "Expect:" \ + --silent --fail --show-error \ + --output activity.json \ + --dump-header http.headers \ + --basic --user harry:potter \ + "http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/activity.json" + tfw_cat http.headers activity.json + tfw_preserve activity.json + assert [ "$(jq '.rows | length' activity.json)" = 0 ] + transform_list_json activity.json list.json + tfw_preserve list.json +} + doc_MeshMBRestActivity="Restful thread incoming activity" setup_MeshMBRestActivity() { IDENTITY_COUNT=5 @@ -227,5 +247,69 @@ test_MeshMBRestActivity() { assertJq list.json "contains([{ __index: 8, id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 2\"}])" } +doc_MeshMBRestNewActivity="Restful newsince incoming activity" +setup_MeshMBRestNewActivity() { + IDENTITY_COUNT=3 + setup + executeOk_servald keyring set did $SIDA1 "" "Feed A" + executeOk_servald keyring set did $SIDA2 "" "Feed B" + executeOk_servald keyring set did $SIDA3 "" "Feed C" + executeOk_servald meshmb send $IDA2 "Message 1" + executeOk_servald meshmb send $IDA3 "Message 2" + executeOk_servald meshmb follow $IDA1 $IDA2 + executeOk_servald meshmb follow $IDA1 $IDA3 + executeOk curl \ + -H "Expect:" \ + --silent --fail --show-error \ + --output activity.json \ + --dump-header http.headers \ + --basic --user harry:potter \ + "http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/activity.json" + assert [ "$(jq '.rows | length' activity.json)" = 2 ] + transform_list_json activity.json array_of_objects.json + tfw_preserve array_of_objects.json + token=$(jq --raw-output '.[0][".token"]' array_of_objects.json) + assert [ -n "$token" ] +} +test_MeshMBRestNewActivity() { + for i in 1 2 3; do + fork %curl$i curl \ + --silent --fail --show-error \ + --no-buffer \ + --output newsince$i.json \ + --basic --user harry:potter \ + "http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/activity/$token/activity.json" + done + wait_until [ -e newsince1.json -a -e newsince2.json -a -e newsince3.json ] + executeOk_servald meshmb send $IDA2 "Message 3" + executeOk_servald meshmb send $IDA3 "Message 4" + executeOk_servald meshmb send $IDA2 "Message 5" + executeOk_servald meshmb send $IDA3 "Message 6" + for i in 1 2 3; do + wait_until grep "Message 3" newsince$i.json + wait_until grep "Message 4" newsince$i.json + wait_until grep "Message 5" newsince$i.json + wait_until grep "Message 6" newsince$i.json + done + fork_terminate_all + fork_wait_all + for i in 1 2 3; do + if [ $(jq . newsince$i | wc -c) -eq 0 ]; then + echo ']}' >>newsince$i.json + assert [ $(jq . newsince$i.json | wc -c) -ne 0 ] + fi + transform_list_json newsince$i.json objects$i.json + tfw_preserve newsince$i.json objects$i.json + done + for i in 1 2 3; do + assert [ "$(jq '.rows | length' newsince$i.json)" = 4 ] + assertJq objects$i.json "contains([{ id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 3\"}])" + assertJq objects$i.json "contains([{ id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 4\"}])" + assertJq objects$i.json "contains([{ id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 5\"}])" + assertJq objects$i.json "contains([{ id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 6\"}])" + done +} + + runTests "$@"