cli interface for reading threaded feed activity

This commit is contained in:
Jeremy Lakeman 2017-02-21 13:36:38 +10:30
parent b7b77e81ee
commit 1973df13b3
9 changed files with 306 additions and 112 deletions

120
meshmb.c
View File

@ -33,6 +33,83 @@ struct meshmb_feeds{
uint8_t generation;
};
struct meshmb_activity_iterator *meshmb_activity_open(struct meshmb_feeds *feeds){
struct meshmb_activity_iterator *ret = emalloc_zero(sizeof(struct meshmb_activity_iterator));
if (!ret)
return NULL;
ret->feeds = feeds;
if (message_ply_read_open(&ret->ack_reader, NULL, &feeds->ack_bundle_keypair)==-1){
free(ret);
ret = NULL;
}
return ret;
}
int meshmb_activity_next(struct meshmb_activity_iterator *i){
while(1){
// can we read another message?
if (message_ply_is_open(&i->msg_reader)
&& i->msg_reader.read.offset > i->ack_start){
DEBUGF(meshmb, "Reading next incoming record from %u",
i->msg_reader.read.offset);
if (message_ply_read_prev(&i->msg_reader)!=-1
&& i->msg_reader.read.offset >= i->ack_start)
return 1;
}
// read the next ack
if (message_ply_read_prev(&i->ack_reader)==-1)
return 0;
switch (i->ack_reader.type) {
case MESSAGE_BLOCK_TYPE_TIME:
message_ply_parse_timestamp(&i->ack_reader, &i->ack_timestamp);
continue;
case MESSAGE_BLOCK_TYPE_ACK:{
uint64_t ack_end;
rhizome_bid_t *bid;
if (message_ply_parse_ack(&i->ack_reader,
&ack_end,
&i->ack_start,
&bid
) == -1)
return -1;
DEBUGF(meshmb, "Found ack for %s, %u to %u",
alloca_tohex_rhizome_bid_t(*bid), i->ack_start, ack_end);
if (bcmp(&i->msg_ply, bid, sizeof(*bid))==0){
// shortcut for consecutive acks for the same incoming feed
DEBUGF(meshmb, "Ply still open @%u",
i->msg_reader.read.offset);
} else {
message_ply_read_close(&i->msg_reader);
if (message_ply_read_open(&i->msg_reader, bid, NULL)==-1){
bzero(&i->msg_ply, sizeof i->msg_ply);
continue;
}
i->msg_ply = *bid;
}
i->msg_reader.read.offset = ack_end;
} break;
default:
continue;
}
}
}
void meshmb_activity_close(struct meshmb_activity_iterator *i){
message_ply_read_close(&i->ack_reader);
message_ply_read_close(&i->msg_reader);
free(i);
}
// only remember this many bytes of ply names & last messages
#define MAX_NAME_LEN (256) // ??
#define MAX_MSG_LEN (256) // ??
@ -41,26 +118,39 @@ static int finish_ack_writing(struct meshmb_feeds *feeds){
if (!feeds->ack_manifest)
return 0;
int ret=-1;
int ret;
DEBUGF(meshmb, "Completing private ply for ack thread");
enum rhizome_payload_status status = rhizome_finish_write(&feeds->ack_writer);
status = rhizome_finish_store(&feeds->ack_writer, feeds->ack_manifest, status);
if (status == RHIZOME_PAYLOAD_STATUS_NEW){
rhizome_manifest *mout=NULL;
struct rhizome_bundle_result result = rhizome_manifest_finalise(feeds->ack_manifest, &mout, 0);
if (mout && mout!=feeds->ack_manifest)
rhizome_manifest_free(mout);
if (result.status == RHIZOME_BUNDLE_STATUS_NEW)
ret = 0;
rhizome_bundle_result_free(&result);
{
struct overlay_buffer *b = ob_new();
message_ply_append_timestamp(b);
ret = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b));
ob_free(b);
}
if (ret==0){
ret =-1;
enum rhizome_payload_status status = rhizome_finish_write(&feeds->ack_writer);
status = rhizome_finish_store(&feeds->ack_writer, feeds->ack_manifest, status);
if (status == RHIZOME_PAYLOAD_STATUS_NEW){
rhizome_manifest *mout=NULL;
struct rhizome_bundle_result result = rhizome_manifest_finalise(feeds->ack_manifest, &mout, 0);
if (mout && mout!=feeds->ack_manifest)
rhizome_manifest_free(mout);
if (result.status == RHIZOME_BUNDLE_STATUS_NEW)
ret = 0;
rhizome_bundle_result_free(&result);
}
}
if (ret!=0)
rhizome_fail_write(&feeds->ack_writer);
bzero(&feeds->ack_writer, sizeof feeds->ack_writer);
rhizome_manifest_free(feeds->ack_manifest);
feeds->ack_manifest = NULL;
return ret;
}
@ -83,7 +173,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
return 0;
if (!message_ply_is_open(reader)
&& message_ply_read_open(reader, &metadata->ply.bundle_id)!=0)
&& message_ply_read_open(reader, &metadata->ply.bundle_id, NULL)!=0)
return -1;
// TODO allow the user to specify an overridden name?
@ -167,7 +257,7 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
{
struct overlay_buffer *b = ob_new();
message_ply_append_ack(b, last_offset, metadata->last_message_offset, &metadata->ply.bundle_id);
message_ply_append_ack(b, metadata->ply.size, metadata->size, &metadata->ply.bundle_id);
int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b));
DEBUGF(meshmb, "Acked incoming messages");
ob_free(b);

View File

@ -17,6 +17,16 @@ struct meshmb_feed_details{
time_s_t timestamp;
};
// threaded feed iterator state
struct meshmb_activity_iterator{
struct meshmb_feeds *feeds;
struct message_ply_read ack_reader;
time_s_t ack_timestamp;
uint64_t ack_start;
rhizome_bid_t msg_ply;
struct message_ply_read msg_reader;
};
struct rhizome_manifest_field_assignment;
int meshmb_send(const keyring_identity *id, const char *message, size_t message_len,
unsigned nassignments, const struct rhizome_manifest_field_assignment *assignments);
@ -37,6 +47,11 @@ int meshmb_ignore(struct meshmb_feeds *feeds, rhizome_bid_t *bid);
typedef int (*meshmb_callback) (struct meshmb_feed_details *details, void *context);
int meshmb_enum(struct meshmb_feeds *feeds, rhizome_bid_t *restart_from, meshmb_callback callback, void *context);
// enumerate messages, starting with the most recently received
struct meshmb_activity_iterator *meshmb_activity_open(struct meshmb_feeds *feeds);
int meshmb_activity_next(struct meshmb_activity_iterator *i);
void meshmb_activity_close(struct meshmb_activity_iterator *i);
// update metadata of all feeds based on current rhizome contents (optionally call after opening)
int meshmb_update(struct meshmb_feeds *feeds);
// update metadata of a single feed, eg because of a new bundle or when about to read a single ply.

View File

@ -70,7 +70,7 @@ static int app_meshmb_read(const struct cli_parsed *parsed, struct cli_context *
struct message_ply_read read;
bzero(&read, sizeof read);
if (message_ply_read_open(&read, &bid)==-1)
if (message_ply_read_open(&read, &bid, NULL)==-1)
return -1;
int ret=0;
@ -177,58 +177,69 @@ static int app_meshmb_find(const struct cli_parsed *parsed, struct cli_context *
return 0;
}
static struct meshmb_feeds * cli_feeds_open(const struct cli_parsed *parsed){
const char *idhex;
if (cli_arg(parsed, "id", &idhex, str_is_identity, "") == -1)
return NULL;
identity_t identity;
if (str_to_identity_t(&identity, idhex) == -1){
WHY("Invalid identity");
return NULL;
}
if (create_serval_instance_dir() == -1
|| rhizome_opendb() == -1
|| !(keyring = keyring_open_instance_cli(parsed)))
return NULL;
keyring_identity *id = keyring_find_identity(keyring, &identity);
if (!id){
WHY("Invalid identity");
return NULL;
}
struct meshmb_feeds *feeds = NULL;
if (meshmb_open(id, &feeds)==-1)
return NULL;
return feeds;
}
DEFINE_CMD(app_meshmb_follow, 0,
"Start or stop following a broadcast feed",
"meshmb", "follow|ignore" KEYRING_PIN_OPTIONS, "<id>", "<peer>");
static int app_meshmb_follow(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *idhex, *peerhex;
if (cli_arg(parsed, "id", &idhex, str_is_identity, "") == -1
||cli_arg(parsed, "peer", &peerhex, str_is_identity, "") == -1)
const char *peerhex;
if (cli_arg(parsed, "peer", &peerhex, str_is_identity, "") == -1)
return -1;
int follow = cli_arg(parsed, "follow", NULL, NULL, NULL) == 0;
identity_t identity;
identity_t peer;
if (str_to_identity_t(&identity, idhex) == -1
||str_to_identity_t(&peer, peerhex) == -1)
if (str_to_identity_t(&peer, peerhex) == -1)
return WHY("Invalid identity");
if (create_serval_instance_dir() == -1
|| rhizome_opendb() == -1
|| !(keyring = keyring_open_instance_cli(parsed)))
return -1;
struct meshmb_feeds *feeds = cli_feeds_open(parsed);
int ret = -1;
struct meshmb_feeds *feeds = NULL;
if (feeds){
if (follow){
ret = meshmb_follow(feeds, &peer);
}else{
ret = meshmb_ignore(feeds, &peer);
}
keyring_identity *id = keyring_find_identity(keyring, &identity);
if (!id){
WHY("Invalid identity");
goto end;
}
if (ret!=-1){
ret = meshmb_flush(feeds);
if (ret!=-1)
ret=0;
}
if (meshmb_open(id, &feeds)==-1)
goto end;
if (follow){
if (meshmb_follow(feeds, &peer)==-1)
goto end;
}else{
if (meshmb_ignore(feeds, &peer)==-1)
goto end;
}
if (meshmb_flush(feeds)==-1)
goto end;
ret = 0;
end:
if (feeds)
meshmb_close(feeds);
}
if (keyring)
keyring_free(keyring);
keyring = NULL;
@ -258,64 +269,89 @@ DEFINE_CMD(app_meshmb_list, 0,
"meshmb", "list", "following" KEYRING_PIN_OPTIONS, "<id>");
static int app_meshmb_list(const struct cli_parsed *parsed, struct cli_context *context)
{
const char *idhex;
if (cli_arg(parsed, "id", &idhex, str_is_identity, "") == -1)
return -1;
identity_t identity;
if (str_to_identity_t(&identity, idhex) == -1)
return WHY("Invalid identity");
if (create_serval_instance_dir() == -1
|| rhizome_opendb() == -1
|| !(keyring = keyring_open_instance_cli(parsed)))
return -1;
struct meshmb_feeds *feeds = cli_feeds_open(parsed);
int ret = -1;
struct meshmb_feeds *feeds = NULL;
keyring_identity *id = keyring_find_identity(keyring, &identity);
if (!id){
WHY("Invalid identity");
goto end;
if (feeds){
const char *names[]={
"_id",
"id",
"author",
"name",
"age",
"last_message"
};
cli_start_table(context, NELS(names), names);
struct cli_enum_context enum_context = {
.rowcount = 0,
.context = context,
};
meshmb_enum(feeds, NULL, list_callback, &enum_context);
meshmb_close(feeds);
cli_end_table(context, enum_context.rowcount);
ret = 0;
}
if (meshmb_open(id, &feeds)==-1)
goto end;
const char *names[]={
"_id",
"id",
"author",
"name",
"age",
"last_message"
};
cli_start_table(context, NELS(names), names);
struct cli_enum_context enum_context = {
.rowcount = 0,
.context = context,
};
meshmb_enum(feeds, NULL, list_callback, &enum_context);
cli_end_table(context, enum_context.rowcount);
ret = 0;
end:
if (feeds)
meshmb_close(feeds);
keyring_free(keyring);
if (keyring)
keyring_free(keyring);
keyring = NULL;
return 0;
return ret;
}
/*
DEFINE_CMD(app_meshmb_news, 0,
"",
"meshmb", "news" KEYRING_PIN_OPTIONS, "<id>");
static int app_meshmb_news(const struct cli_parsed *parsed, struct cli_context *context)
DEFINE_CMD(app_meshmb_activity, 0,
"List messages from feeds that you are currently following",
"meshmb", "activity" KEYRING_PIN_OPTIONS, "<id>");
static int app_meshmb_activity(const struct cli_parsed *parsed, struct cli_context *context)
{
return 0;
struct meshmb_feeds *feeds = cli_feeds_open(parsed);
int ret = -1;
if (feeds){
meshmb_update(feeds);
meshmb_flush(feeds);
struct meshmb_activity_iterator *iterator = meshmb_activity_open(feeds);
if (iterator){
const char *names[]={
"_id",
"id",
"author",
"name",
"age",
"offset",
"message"
};
cli_start_table(context, NELS(names), names);
unsigned rowcount=0;
time_s_t now = gettime();
while(meshmb_activity_next(iterator)==1){
switch(iterator->msg_reader.type){
case MESSAGE_BLOCK_TYPE_MESSAGE:
cli_put_long(context, rowcount++, ":");
cli_put_string(context, alloca_tohex_rhizome_bid_t(iterator->msg_ply), ":");
cli_put_string(context, alloca_tohex_identity_t(&iterator->msg_reader.author), ":");
cli_put_string(context, iterator->msg_reader.name, ":");
cli_put_long(context, iterator->ack_timestamp ? (long)(now - iterator->ack_timestamp) : (long)-1, ":");
cli_put_long(context, iterator->msg_reader.record_end_offset, ":");
cli_put_string(context, (const char *)iterator->msg_reader.record, "\n");
}
}
cli_end_table(context, rowcount);
meshmb_activity_close(iterator);
ret = 0;
}
meshmb_close(feeds);
}
if (keyring)
keyring_free(keyring);
keyring = NULL;
return ret;
}
*/

View File

@ -143,7 +143,7 @@ static int strn_to_position_token(const char *str, uint64_t *position, const cha
static int next_ply_message(httpd_request *r){
if (!message_ply_is_open(&r->u.plylist.ply_reader)){
if (message_ply_read_open(&r->u.plylist.ply_reader, &r->bid)==-1){
if (message_ply_read_open(&r->u.plylist.ply_reader, &r->bid, NULL)==-1){
r->u.plylist.eof = 1;
return -1;
}

View File

@ -174,7 +174,7 @@ static enum meshms_status open_ply(struct message_ply *ply, struct message_ply_r
{
if (ply->found
&& !message_ply_is_open(reader)
&& message_ply_read_open(reader, &ply->bundle_id)!=0)
&& message_ply_read_open(reader, &ply->bundle_id, NULL)!=0)
return MESHMS_STATUS_ERROR;
return MESHMS_STATUS_OK;
}

View File

@ -69,7 +69,7 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_
if (it.current->metadata.their_last_message && it.current->their_ply.found){
struct message_ply_read reader;
bzero(&reader, sizeof reader);
if (message_ply_read_open(&reader, &it.current->their_ply.bundle_id) == 0){
if (message_ply_read_open(&reader, &it.current->their_ply.bundle_id, NULL) == 0){
reader.read.offset = it.current->metadata.their_last_message;
if (message_ply_read_prev(&reader)==0){
cli_put_string(context, (const char *)reader.record, "\n");

View File

@ -167,16 +167,27 @@ end:
return ret;
}
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid)
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid, const sign_keypair_t *keypair)
{
DEBUGF(meshms, "Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return -1;
int ret=-1;
if (rhizome_retrieve_manifest(bid, m) == RHIZOME_BUNDLE_STATUS_SAME
enum rhizome_bundle_status status;
if (keypair){
DEBUGF2(meshms, meshmb, "Opening ply %s", alloca_tohex_rhizome_bid_t(keypair->public_key));
struct rhizome_bundle_result result;
result = rhizome_private_bundle(m, keypair);
status = result.status;
rhizome_bundle_result_free(&result);
}else{
DEBUGF2(meshms, meshmb, "Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
status = rhizome_retrieve_manifest(bid, m);
}
if (status == RHIZOME_BUNDLE_STATUS_SAME
&& rhizome_open_decrypt_read(m, &ply->read) == RHIZOME_PAYLOAD_STATUS_STORED){
assert(m->filesize != RHIZOME_SIZE_UNSET);
@ -222,7 +233,7 @@ int message_ply_read_prev(struct message_ply_read *ply)
ply->record_end_offset = ply->read.offset;
uint8_t footer[2];
if (ply->read.offset <= sizeof footer) {
DEBUG(meshms, "EOF");
DEBUGF2(meshms, meshmb, "EOF");
return -1;
}
ply->read.offset -= sizeof footer;
@ -237,10 +248,10 @@ int message_ply_read_prev(struct message_ply_read *ply)
ply->type = r & 0xF;
ply->record_length = r >> 4;
}
DEBUGF(meshms, "Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset);
DEBUGF2(meshms, meshmb, "Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset);
// need to allow for advancing the tail and cutting a message in half.
if (ply->record_length + sizeof footer > ply->read.offset){
DEBUGF(meshms, "EOF");
DEBUGF2(meshms, meshmb, "EOF");
return -1;
}
ply->read.offset -= ply->record_length + sizeof(footer);

View File

@ -43,7 +43,7 @@ struct message_ply_write{
struct rhizome_write write;
};
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid);
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid, const sign_keypair_t *keypair);
void message_ply_read_close(struct message_ply_read *ply);
int message_ply_read_prev(struct message_ply_read *ply);
int message_ply_find_prev(struct message_ply_read *ply, char type);

View File

@ -97,4 +97,46 @@ test_meshmbFollow() {
assertStdoutGrep --matches=1 ":$IDA3:$SIDA3:Feed C:[0-9]\+:Message 3\$"
}
doc_meshmbThreading="Thread incoming message feeds"
setup_meshmbThreading() {
setup_identities 5
executeOk_servald keyring set did $SIDA1 "" "Feed A"
executeOk_servald keyring set did $SIDA2 "" "Feed B"
executeOk_servald keyring set did $SIDA3 "" "Feed C"
executeOk_servald keyring set did $SIDA4 "" "Feed D"
executeOk_servald keyring set did $SIDA5 "" "Feed E"
executeOk_servald meshmb send $IDA2 "Message 2"
executeOk_servald meshmb send $IDA3 "Message 3"
}
test_meshmbThreading() {
executeOk_servald meshmb follow $IDA1 $IDA2
executeOk_servald meshmb follow $IDA1 $IDA3
executeOk_servald meshmb activity $IDA1
assertStdoutGrep --matches=1 "0:$IDA3:$SIDA3:Feed C:[0-9]\+:[0-9]\+:Message 3\$"
assertStdoutGrep --matches=1 "1:$IDA2:$SIDA2:Feed B:[0-9]\+:[0-9]\+:Message 2\$"
executeOk_servald meshmb send $IDA4 "Message 4"
executeOk_servald meshmb follow $IDA1 $IDA4
executeOk_servald meshmb send $IDA4 "Message 5"
executeOk_servald meshmb activity $IDA1
assertStdoutGrep --matches=1 "0:$IDA4:$SIDA4:Feed D:[0-9]\+:[0-9]\+:Message 5\$"
assertStdoutGrep --matches=1 "1:$IDA4:$SIDA4:Feed D:[0-9]\+:[0-9]\+:Message 4\$"
executeOk_servald meshmb send $IDA5 "Message 6"
executeOk_servald meshmb follow $IDA1 $IDA5
executeOk_servald meshmb activity $IDA1
assertStdoutGrep --matches=1 "0:$IDA5:$SIDA5:Feed E:[0-9]\+:[0-9]\+:Message 6\$"
executeOk_servald meshmb send $IDA2 "Message 7"
executeOk_servald meshmb activity $IDA1
executeOk_servald meshmb send $IDA3 "Message 8"
executeOk_servald meshmb activity $IDA1
executeOk_servald meshmb send $IDA4 "Message 9"
executeOk_servald meshmb activity $IDA1
executeOk_servald meshmb send $IDA5 "Message 10"
executeOk_servald meshmb activity $IDA1
assertStdoutGrep --matches=1 "0:$IDA5:$SIDA5:Feed E:[0-9]\+:[0-9]\+:Message 10\$"
assertStdoutGrep --matches=1 "1:$IDA4:$SIDA4:Feed D:[0-9]\+:[0-9]\+:Message 9\$"
assertStdoutGrep --matches=1 "2:$IDA3:$SIDA3:Feed C:[0-9]\+:[0-9]\+:Message 8\$"
assertStdoutGrep --matches=1 "3:$IDA2:$SIDA2:Feed B:[0-9]\+:[0-9]\+:Message 7\$"
assertStdoutGrep --matches=1 "4:$IDA5:$SIDA5:Feed E:[0-9]\+:[0-9]\+:Message 6\$"
}
runTests "$@"