Add newsince request for incoming activity

This commit is contained in:
Jeremy Lakeman 2017-03-21 13:07:33 +10:30
parent be9c138559
commit 4b98def664
6 changed files with 305 additions and 74 deletions

View File

@ -225,6 +225,12 @@ typedef struct httpd_request
uint8_t generation;
enum list_phase phase;
size_t rowcount;
time_ms_t end_time;
uint64_t start_ack_offset;
uint64_t current_ack_offset;
uint64_t current_msg_offset;
uint64_t end_ack_offset;
uint64_t end_msg_offset;
} meshmb_feeds;
struct {

View File

@ -47,18 +47,8 @@ struct meshmb_activity_iterator *meshmb_activity_open(struct meshmb_feeds *feeds
return ret;
}
int meshmb_activity_next(struct meshmb_activity_iterator *i){
static int activity_next_ack(struct meshmb_activity_iterator *i){
while(1){
// can we read another message?
if (message_ply_is_open(&i->msg_reader)
&& i->msg_reader.read.offset > i->ack_start){
DEBUGF(meshmb, "Reading next incoming record from %u",
i->msg_reader.read.offset);
if (message_ply_read_prev(&i->msg_reader)!=-1
&& i->msg_reader.read.offset >= i->ack_start)
return 1;
}
// read the next ack
if (message_ply_read_prev(&i->ack_reader)==-1)
return 0;
@ -94,6 +84,8 @@ int meshmb_activity_next(struct meshmb_activity_iterator *i){
i->ack_start = ack.start_offset;
i->msg_reader.read.offset = ack.end_offset;
return 1;
} break;
default:
@ -102,6 +94,37 @@ int meshmb_activity_next(struct meshmb_activity_iterator *i){
}
}
int meshmb_activity_seek(struct meshmb_activity_iterator *i, uint64_t ack_offset, uint64_t msg_offset){
if (ack_offset)
i->ack_reader.read.offset = ack_offset;
int r;
if ((r = activity_next_ack(i))!=1)
return r;
if (msg_offset){
if (msg_offset > i->msg_reader.read.offset || msg_offset < i->ack_start)
return -1;
i->msg_reader.read.offset = msg_offset;
}
return meshmb_activity_next(i);
}
int meshmb_activity_next(struct meshmb_activity_iterator *i){
while(1){
// can we read another message?
if (message_ply_is_open(&i->msg_reader)
&& i->msg_reader.read.offset > i->ack_start){
DEBUGF(meshmb, "Reading next incoming record from %u",
i->msg_reader.read.offset);
if (message_ply_read_prev(&i->msg_reader)!=-1
&& i->msg_reader.read.offset >= i->ack_start)
return 1;
}
int r;
if ((r = activity_next_ack(i))!=1)
return r;
}
}
void meshmb_activity_close(struct meshmb_activity_iterator *i){
message_ply_read_close(&i->ack_reader);
message_ply_read_close(&i->msg_reader);
@ -293,17 +316,21 @@ static int update_stats_tree(void **record, void *context)
}
// eg, if a bundle_add trigger occurs while the feed list is open
void meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader)
int meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader)
{
struct feed_metadata *metadata;
if (strcmp(m->service, RHIZOME_SERVICE_MESHMB) == 0
&& tree_find(&feeds->root, (void**)&metadata, m->keypair.public_key.binary, sizeof m->keypair.public_key.binary, NULL, NULL)==0){
metadata->ply.found = 1;
metadata->ply.size = m->filesize;
update_stats(feeds, metadata, reader);
if (metadata->ply.size != m->filesize){
metadata->ply.size = m->filesize;
if (update_stats(feeds, metadata, reader)==-1)
return -1;
}
return 1;
}
return 0;
}
int meshmb_update(struct meshmb_feeds *feeds)

View File

@ -50,12 +50,13 @@ int meshmb_enum(struct meshmb_feeds *feeds, rhizome_bid_t *restart_from, meshmb_
// enumerate messages, starting with the most recently received
struct meshmb_activity_iterator *meshmb_activity_open(struct meshmb_feeds *feeds);
int meshmb_activity_next(struct meshmb_activity_iterator *i);
int meshmb_activity_seek(struct meshmb_activity_iterator *i, uint64_t ack_offset, uint64_t msg_offset);
void meshmb_activity_close(struct meshmb_activity_iterator *i);
// update metadata of all feeds based on current rhizome contents (optionally call after opening)
int meshmb_update(struct meshmb_feeds *feeds);
// update metadata of a single feed, eg because of a new bundle or when about to read a single ply.
// it is the callers reponsibility to supply a reader and close it
void meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader);
int meshmb_bundle_update(struct meshmb_feeds *feeds, rhizome_manifest *m, struct message_ply_read *reader);
#endif

View File

@ -141,6 +141,35 @@ static int strn_to_position_token(const char *str, uint64_t *position, const cha
return 1;
}
static strbuf activity_token_to_str(strbuf b, const httpd_request *r)
{
uint8_t tmp[12];
char tmp_str[BASE64_ENCODED_LEN(12)+1];
unsigned len = 0;
len += pack_uint(&tmp[len], r->u.meshmb_feeds.current_ack_offset);
len += pack_uint(&tmp[len], r->u.meshmb_feeds.current_msg_offset);
assert(len <= sizeof tmp);
size_t n = base64url_encode(tmp_str, tmp, len);
tmp_str[n] = '\0';
return strbuf_puts(b, tmp_str);
}
static int strn_to_activity_token(const char *str, httpd_request *r, const char **afterp)
{
uint8_t token[12];
size_t token_len = base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL);
int unpacked;
if ((unpacked = unpack_uint(token, token_len, &r->u.meshmb_feeds.end_ack_offset))!=-1
&& (unpacked = unpack_uint(token + unpacked, token_len - unpacked, &r->u.meshmb_feeds.end_msg_offset))!=-1
&& **afterp=='/'){
(*afterp)++;
} else {
*afterp=str;
}
return 1;
}
static int next_ply_message(httpd_request *r){
if (!message_ply_is_open(&r->u.plylist.ply_reader)){
if (message_ply_read_open(&r->u.plylist.ply_reader, &r->bid, NULL)==-1){
@ -415,6 +444,7 @@ static struct meshmb_session *open_session(const identity_t *identity){
if (meshmb_open(id, &feeds)==-1)
return NULL;
meshmb_update(feeds);
session = emalloc(sizeof (struct meshmb_session));
if (!session){
meshmb_close(feeds);
@ -585,12 +615,21 @@ static void feedlist_on_rhizome_add(httpd_request *r, rhizome_manifest *m)
{
struct message_ply_read reader;
bzero(&reader, sizeof(reader));
meshmb_bundle_update(r->u.meshmb_feeds.session->feeds, m, &reader);
int ret = meshmb_bundle_update(r->u.meshmb_feeds.session->feeds, m, &reader);
message_ply_read_close(&reader);
if (ret!=1)
return;
// TODO short timer? Syncing with a new neighbour might update a lot of subscribed feeds.
int gen = meshmb_flush(r->u.meshmb_feeds.session->feeds);
if (gen>=0 && gen != r->u.meshmb_feeds.generation)
if (gen>=0 && gen != r->u.meshmb_feeds.generation){
if (r->u.meshmb_feeds.iterator){
meshmb_activity_close(r->u.meshmb_feeds.iterator);
r->u.meshmb_feeds.iterator = NULL;
}
r->u.meshmb_feeds.generation = gen;
http_request_resume_response(&r->http);
}
}
static void feedlist_finalise(httpd_request *r)
@ -625,11 +664,64 @@ static int restful_meshmb_feedlist(httpd_request *r, const char *remainder)
return 1;
}
static void activity_test_end(httpd_request *r){
struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator;
if (iterator){
if (iterator->ack_reader.record_end_offset > r->u.meshmb_feeds.end_ack_offset
|| (iterator->ack_reader.record_end_offset == r->u.meshmb_feeds.end_ack_offset
&& iterator->msg_reader.record_end_offset > r->u.meshmb_feeds.end_msg_offset)){
r->u.meshmb_feeds.phase = LIST_ROWS;
DEBUGF(httpd,"Iterator @ack %"PRIu64", @msg %"PRIu64,
r->u.meshmb_feeds.current_ack_offset,
r->u.meshmb_feeds.current_msg_offset);
return;
}
}
r->u.meshmb_feeds.phase = LIST_END;
}
static void activity_next(httpd_request *r){
struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator;
while(iterator && meshmb_activity_next(iterator)==1){
switch(iterator->msg_reader.type){
case MESSAGE_BLOCK_TYPE_MESSAGE:
r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset;
r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset;
activity_test_end(r);
return;
}
}
r->u.meshmb_feeds.phase = LIST_END;
}
static void activity_iterator_open(httpd_request *r){
if (r->u.meshmb_feeds.iterator)
return;
struct meshmb_activity_iterator *iterator = meshmb_activity_open(r->u.meshmb_feeds.session->feeds);
if (iterator){
r->u.meshmb_feeds.iterator = iterator;
meshmb_activity_seek(iterator, r->u.meshmb_feeds.current_ack_offset, r->u.meshmb_feeds.current_msg_offset);
if (r->u.meshmb_feeds.start_ack_offset == 0)
r->u.meshmb_feeds.start_ack_offset = iterator->ack_reader.read.length;
r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset;
r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset;
if (iterator->msg_reader.type != MESSAGE_BLOCK_TYPE_MESSAGE){
activity_next(r);
return;
}
}
activity_test_end(r);
}
static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, strbuf b)
{
httpd_request *r = (httpd_request *) hr;
const char *headers[] = {
"token",
".token",
"id",
"author",
"name",
@ -640,8 +732,6 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s
DEBUGF(httpd, "Phase %d", r->u.meshmb_feeds.phase);
struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator;
switch (r->u.meshmb_feeds.phase) {
case LIST_HEADER:
strbuf_puts(b, "{\n\"header\":[");
@ -652,57 +742,78 @@ static int restful_meshmb_activity_json_content_chunk(struct http_request *hr, s
strbuf_json_string(b, headers[i]);
}
strbuf_puts(b, "],\n\"rows\":[");
if (strbuf_overrun(b))
return 1;
if(meshmb_activity_next(iterator)==1)
r->u.meshmb_feeds.phase = LIST_ROWS;
else{
r->u.meshmb_feeds.phase = LIST_END;
return 1;
}
// fallthrough
if (!strbuf_overrun(b))
activity_iterator_open(r);
return 1;
case LIST_ROWS:
case LIST_FIRST:
ROWS:
{
size_t checkpoint = strbuf_len(b);
activity_iterator_open(r);
struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator;
while(r->u.meshmb_feeds.phase == LIST_ROWS){
switch(iterator->msg_reader.type){
case MESSAGE_BLOCK_TYPE_MESSAGE:
if (r->u.meshmb_feeds.rowcount!=0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[");
strbuf_sprintf(b, "%zu", r->u.meshmb_feeds.rowcount);
strbuf_puts(b, ",");
strbuf_json_hex(b, iterator->msg_reader.bundle_id.binary, sizeof iterator->msg_reader.bundle_id.binary);
strbuf_puts(b, ",");
strbuf_json_hex(b, iterator->msg_reader.author.binary, sizeof iterator->msg_reader.author.binary);
strbuf_puts(b, ",");
strbuf_json_string(b, iterator->msg_reader.name);
strbuf_puts(b, ",");
strbuf_sprintf(b, "%d", iterator->ack_timestamp);
strbuf_puts(b, ",");
strbuf_sprintf(b, "%lu", iterator->msg_reader.record_end_offset);
strbuf_puts(b, ",");
strbuf_json_string(b, (const char *)iterator->msg_reader.record);
strbuf_puts(b, "]");
if (strbuf_overrun(b)){
strbuf_trunc(b, checkpoint);
return 1;
}
checkpoint = strbuf_len(b);
r->u.meshmb_feeds.rowcount++;
if (r->u.meshmb_feeds.rowcount!=0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[\"");
activity_token_to_str(b, r);
strbuf_puts(b, "\",");
strbuf_json_hex(b, iterator->msg_reader.bundle_id.binary, sizeof iterator->msg_reader.bundle_id.binary);
strbuf_puts(b, ",");
strbuf_json_hex(b, iterator->msg_reader.author.binary, sizeof iterator->msg_reader.author.binary);
strbuf_puts(b, ",");
strbuf_json_string(b, iterator->msg_reader.name);
strbuf_puts(b, ",");
strbuf_sprintf(b, "%d", iterator->ack_timestamp);
strbuf_puts(b, ",");
strbuf_sprintf(b, "%lu", iterator->msg_reader.record_end_offset);
strbuf_puts(b, ",");
strbuf_json_string(b, (const char *)iterator->msg_reader.record);
strbuf_puts(b, "]");
if (!strbuf_overrun(b)){
r->u.meshmb_feeds.rowcount++;
DEBUGF(httpd, "Wrote record %u (%s)", r->u.meshmb_feeds.rowcount, (const char *)iterator->msg_reader.record);
activity_next(r);
}
return 1;
}
case LIST_END:
{
time_ms_t now;
// during a new-since request, we don't really want to end until the time limit has elapsed
if (r->u.meshmb_feeds.end_time && (now = gettime_ms()) < r->u.meshmb_feeds.end_time) {
// where we started this time, will become where we end on the next pass;
r->u.meshmb_feeds.end_ack_offset = r->u.meshmb_feeds.start_ack_offset;
r->u.meshmb_feeds.end_msg_offset = 0;
r->u.meshmb_feeds.phase = LIST_ROWS;
struct meshmb_activity_iterator *iterator = r->u.meshmb_feeds.iterator;
if (iterator && iterator->ack_reader.read.length > r->u.meshmb_feeds.start_ack_offset){
DEBUGF(httpd, "Seeking back to ack %"PRIu64", msg (0) to resume now", iterator->ack_reader.read.length);
r->u.meshmb_feeds.start_ack_offset = iterator->ack_reader.read.length;
meshmb_activity_seek(iterator, r->u.meshmb_feeds.start_ack_offset, 0);
r->u.meshmb_feeds.current_ack_offset = iterator->ack_reader.record_end_offset;
r->u.meshmb_feeds.current_msg_offset = iterator->msg_reader.record_end_offset;
if (iterator->msg_reader.type != MESSAGE_BLOCK_TYPE_MESSAGE)
activity_next(r);
goto ROWS;
}
if(meshmb_activity_next(iterator)==1)
r->u.meshmb_feeds.phase = LIST_ROWS;
else
r->u.meshmb_feeds.phase = LIST_END;
r->u.meshmb_feeds.start_ack_offset = 0;
r->u.meshmb_feeds.current_ack_offset = 0;
r->u.meshmb_feeds.current_msg_offset = 0;
if (iterator)
meshmb_activity_close(iterator);
r->u.meshmb_feeds.iterator = NULL;
http_request_pause_response(&r->http, r->u.meshmb_feeds.end_time);
return 0;
}
}
// fallthrough
case LIST_END:
strbuf_puts(b, "\n]\n}\n");
if (!strbuf_overrun(b))
r->u.plylist.phase = LIST_DONE;
@ -728,24 +839,17 @@ static int restful_meshmb_activity(httpd_request *r, const char *remainder)
http_request_simple_response(&r->http, 500, "TODO, detailed response");
return 500;
}
struct meshmb_activity_iterator *iterator = meshmb_activity_open(session->feeds);
if (!iterator){
close_session(session);
http_request_simple_response(&r->http, 500, "TODO, detailed response");
return 500;
}
assert(r->finalise_union == NULL);
r->finalise_union = feedlist_finalise;
r->trigger_rhizome_bundle_added = feedlist_on_rhizome_add;
r->u.meshmb_feeds.phase = LIST_HEADER;
r->u.meshmb_feeds.session = session;
r->u.meshmb_feeds.iterator = iterator;
r->u.meshmb_feeds.iterator = NULL;
r->u.meshmb_feeds.generation = meshmb_flush(session->feeds);
bzero(&r->u.meshmb_feeds.bundle_id, sizeof r->u.meshmb_feeds.bundle_id);
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshmb_activity_json_content);
return 1;
}
DECLARE_HANDLER("/restful/meshmb/", restful_meshmb_);
@ -779,7 +883,15 @@ static int restful_meshmb_(httpd_request *r, const char *remainder)
} else if (strcmp(remainder, "/activity.json") == 0) {
handler = restful_meshmb_activity;
remainder = "";
r->ui64 = 0;
r->u.meshmb_feeds.end_ack_offset = 0;
r->u.meshmb_feeds.end_msg_offset = 0;
r->u.meshmb_feeds.end_time = 0;
} else if ( str_startswith(remainder, "/activity/", &end)
&& strn_to_activity_token(end, r, &end)
&& strcmp(end, "activity.json") == 0) {
r->u.meshmb_feeds.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000;
handler = restful_meshmb_activity;
remainder = "";
} else if ( str_startswith(remainder, "/newsince/", &end)
&& strn_to_position_token(end, &r->ui64, &end)
&& strcmp(end, "messagelist.json") == 0) {

View File

@ -1979,9 +1979,10 @@ void server_rhizome_add_bundle(uint64_t rowid){
break;
if (unpack_manifest_row(m, statement)!=-1){
if (rhizome_manifest_verify(m)){
assert(max_rowid < m->rowid);
max_rowid = m->rowid;
if (max_rowid < m->rowid)
max_rowid = m->rowid;
CALL_TRIGGER(bundle_add, m);
// Note that a trigger might cause a new bundle to be added, and max_rowid to jump
}
}
rhizome_manifest_free(m);

View File

@ -174,6 +174,26 @@ test_MeshMBRestFeeds() {
])"
}
doc_MeshMBRestEmptyActivity="Restful activity with no content"
setup_MeshMBRestEmptyActivity() {
IDENTITY_COUNT=1
setup
}
test_MeshMBRestEmptyActivity() {
executeOk --timeout=10 curl \
-H "Expect:" \
--silent --fail --show-error \
--output activity.json \
--dump-header http.headers \
--basic --user harry:potter \
"http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/activity.json"
tfw_cat http.headers activity.json
tfw_preserve activity.json
assert [ "$(jq '.rows | length' activity.json)" = 0 ]
transform_list_json activity.json list.json
tfw_preserve list.json
}
doc_MeshMBRestActivity="Restful thread incoming activity"
setup_MeshMBRestActivity() {
IDENTITY_COUNT=5
@ -227,5 +247,69 @@ test_MeshMBRestActivity() {
assertJq list.json "contains([{ __index: 8, id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 2\"}])"
}
doc_MeshMBRestNewActivity="Restful newsince incoming activity"
setup_MeshMBRestNewActivity() {
IDENTITY_COUNT=3
setup
executeOk_servald keyring set did $SIDA1 "" "Feed A"
executeOk_servald keyring set did $SIDA2 "" "Feed B"
executeOk_servald keyring set did $SIDA3 "" "Feed C"
executeOk_servald meshmb send $IDA2 "Message 1"
executeOk_servald meshmb send $IDA3 "Message 2"
executeOk_servald meshmb follow $IDA1 $IDA2
executeOk_servald meshmb follow $IDA1 $IDA3
executeOk curl \
-H "Expect:" \
--silent --fail --show-error \
--output activity.json \
--dump-header http.headers \
--basic --user harry:potter \
"http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/activity.json"
assert [ "$(jq '.rows | length' activity.json)" = 2 ]
transform_list_json activity.json array_of_objects.json
tfw_preserve array_of_objects.json
token=$(jq --raw-output '.[0][".token"]' array_of_objects.json)
assert [ -n "$token" ]
}
test_MeshMBRestNewActivity() {
for i in 1 2 3; do
fork %curl$i curl \
--silent --fail --show-error \
--no-buffer \
--output newsince$i.json \
--basic --user harry:potter \
"http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/activity/$token/activity.json"
done
wait_until [ -e newsince1.json -a -e newsince2.json -a -e newsince3.json ]
executeOk_servald meshmb send $IDA2 "Message 3"
executeOk_servald meshmb send $IDA3 "Message 4"
executeOk_servald meshmb send $IDA2 "Message 5"
executeOk_servald meshmb send $IDA3 "Message 6"
for i in 1 2 3; do
wait_until grep "Message 3" newsince$i.json
wait_until grep "Message 4" newsince$i.json
wait_until grep "Message 5" newsince$i.json
wait_until grep "Message 6" newsince$i.json
done
fork_terminate_all
fork_wait_all
for i in 1 2 3; do
if [ $(jq . newsince$i | wc -c) -eq 0 ]; then
echo ']}' >>newsince$i.json
assert [ $(jq . newsince$i.json | wc -c) -ne 0 ]
fi
transform_list_json newsince$i.json objects$i.json
tfw_preserve newsince$i.json objects$i.json
done
for i in 1 2 3; do
assert [ "$(jq '.rows | length' newsince$i.json)" = 4 ]
assertJq objects$i.json "contains([{ id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 3\"}])"
assertJq objects$i.json "contains([{ id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 4\"}])"
assertJq objects$i.json "contains([{ id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 5\"}])"
assertJq objects$i.json "contains([{ id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 6\"}])"
done
}
runTests "$@"