Ack the senders ply and thread into activity feed

This commit is contained in:
Jeremy Lakeman 2017-05-10 11:01:02 +09:30
parent cc0d6fcdb9
commit 3812587a31
8 changed files with 251 additions and 203 deletions

164
meshmb.c
View File

@ -66,22 +66,31 @@ static int activity_next_ack(struct meshmb_activity_iterator *i){
DEBUGF(meshmb, "Found ack for %s, %"PRIu64" to %"PRIu64,
alloca_tohex(ack.binary, ack.binary_length), ack.start_offset, ack.end_offset);
struct feed_metadata *metadata;
if (tree_find(&i->feeds->root, (void**)&metadata, ack.binary, ack.binary_length, NULL, NULL)==TREE_FOUND){
if (i->metadata == metadata){
// shortcut for consecutive acks for the same incoming feed
DEBUGF(meshmb, "Ply still open @%"PRIu64,
i->msg_reader.read.offset);
}else{
message_ply_read_close(&i->msg_reader);
if (message_ply_read_open(&i->msg_reader, &metadata->bundle_id, NULL)==-1){
i->metadata = NULL;
continue;
}
i->metadata = metadata;
}
const rhizome_bid_t *bundle_id = NULL;
i->metadata = NULL;
if (ack.binary_length == 0){
// ack for our own message ply
bundle_id = &i->feeds->id->sign_keypair->public_key;
}else{
WARNF("Failed to find metadata for %s", alloca_tohex(ack.binary, ack.binary_length));
struct feed_metadata *metadata;
if (tree_find(&i->feeds->root, (void**)&metadata, ack.binary, ack.binary_length, NULL, NULL)==TREE_FOUND){
bundle_id = &metadata->bundle_id;
i->metadata = metadata;
}else{
WARNF("Failed to find metadata for %s", alloca_tohex(ack.binary, ack.binary_length));
continue;
}
}
if (memcmp(&i->msg_reader.bundle_id, bundle_id, sizeof *bundle_id)==0){
// shortcut for consecutive acks for the same incoming feed
DEBUGF(meshmb, "Ply still open @%"PRIu64, i->msg_reader.read.offset);
}else{
message_ply_read_close(&i->msg_reader);
if (message_ply_read_open(&i->msg_reader, bundle_id, NULL)==-1){
i->metadata = NULL;
continue;
}
}
i->ack_start = ack.start_offset;
@ -181,6 +190,51 @@ static int finish_ack_writing(struct meshmb_feeds *feeds){
return ret;
}
static int activity_ack(struct meshmb_feeds *feeds, struct message_ply_ack *ack)
{
// add an ack to our journal to thread new messages
if (!feeds->ack_manifest){
rhizome_manifest *m = rhizome_new_manifest();
DEBUGF(meshmb, "Opening private ply for ack thread");
struct rhizome_bundle_result result = rhizome_private_bundle(m, &feeds->ack_bundle_keypair);
switch(result.status){
case RHIZOME_BUNDLE_STATUS_NEW:
rhizome_manifest_set_tail(m, 0);
rhizome_manifest_set_filesize(m, 0);
case RHIZOME_BUNDLE_STATUS_SAME:
{
enum rhizome_payload_status pstatus = rhizome_write_open_journal(&feeds->ack_writer, m, 0, RHIZOME_SIZE_UNSET);
if (pstatus==RHIZOME_PAYLOAD_STATUS_NEW)
break;
}
// fallthrough
case RHIZOME_BUNDLE_STATUS_BUSY:
rhizome_bundle_result_free(&result);
rhizome_manifest_free(m);
return -1;
default:
// everything else should be impossible.
FATALF("Cannot create manifest: %s", alloca_rhizome_bundle_result(result));
}
rhizome_bundle_result_free(&result);
feeds->ack_manifest = m;
}
struct overlay_buffer *b = ob_new();
message_ply_append_ack(b, ack);
assert(!ob_overrun(b));
DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64,
ob_position(b), feeds->ack_writer.file_offset);
int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b));
DEBUGF(meshmb, "Acked incoming messages");
ob_free(b);
return r;
}
static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metadata, struct message_ply_read *reader)
{
if (!metadata->details.ply.found){
@ -252,59 +306,17 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
DEBUGF(meshmb, "Last message from %"PRIu64" to %"PRIu64, metadata->last_message_offset, last_offset);
if (last_offset > metadata->last_message_offset){
// add an ack to our journal to thread new messages
if (!feeds->ack_manifest){
rhizome_manifest *m = rhizome_new_manifest();
struct message_ply_ack ack;
bzero(&ack, sizeof ack);
DEBUGF(meshmb, "Opening private ply for ack thread");
ack.start_offset = metadata->size;
ack.end_offset = metadata->details.ply.size;
ack.binary = metadata->bundle_id.binary;
ack.binary_length = (metadata->tree_depth >> 3) + 3;
struct rhizome_bundle_result result = rhizome_private_bundle(m, &feeds->ack_bundle_keypair);
switch(result.status){
case RHIZOME_BUNDLE_STATUS_NEW:
rhizome_manifest_set_tail(m, 0);
rhizome_manifest_set_filesize(m, 0);
case RHIZOME_BUNDLE_STATUS_SAME:
{
enum rhizome_payload_status pstatus = rhizome_write_open_journal(&feeds->ack_writer, m, 0, RHIZOME_SIZE_UNSET);
if (pstatus==RHIZOME_PAYLOAD_STATUS_NEW)
break;
}
// fallthrough
case RHIZOME_BUNDLE_STATUS_BUSY:
rhizome_bundle_result_free(&result);
rhizome_manifest_free(m);
return -1;
default:
// everything else should be impossible.
FATALF("Cannot create manifest: %s", alloca_rhizome_bundle_result(result));
}
rhizome_bundle_result_free(&result);
feeds->ack_manifest = m;
}
{
struct overlay_buffer *b = ob_new();
struct message_ply_ack ack;
bzero(&ack, sizeof ack);
ack.start_offset = metadata->size;
ack.end_offset = metadata->details.ply.size;
ack.binary = metadata->bundle_id.binary;
ack.binary_length = (metadata->tree_depth >> 3) + 3;
message_ply_append_ack(b, &ack);
assert(!ob_overrun(b));
DEBUGF2(meshms, meshmb, "Appending %zu bytes @%"PRIu64,
ob_position(b), feeds->ack_writer.file_offset);
int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b));
DEBUGF(meshmb, "Acked incoming messages");
ob_free(b);
if (r == -1)
return -1;
}
int r = activity_ack(feeds, &ack);
if (r)
return r;
}
metadata->last_message_offset = last_offset;
@ -700,22 +712,34 @@ int meshmb_enum(struct meshmb_feeds *feeds, rhizome_bid_t *restart_from, meshmb_
return tree_walk(&feeds->root, restart_from ? restart_from->binary : NULL, sizeof *restart_from, enum_callback, &enum_context);
}
int meshmb_send(const keyring_identity *id, const char *message, size_t message_len,
int meshmb_send(struct meshmb_feeds *feeds, const char *message, size_t message_len,
unsigned nassignments, const struct rhizome_manifest_field_assignment *assignments){
const char *did=NULL, *name=NULL;
struct message_ply ply;
bzero(&ply, sizeof ply);
ply.bundle_id = id->sign_keypair->public_key;
ply.bundle_id = feeds->id->sign_keypair->public_key;
ply.known_bid = 1;
struct overlay_buffer *b = ob_new();
message_ply_append_message(b, message, message_len);
message_ply_append_timestamp(b);
keyring_identity_extract(id, &did, &name);
int ret = message_ply_append(id, RHIZOME_SERVICE_MESHMB, NULL, &ply, b, name, nassignments, assignments);
keyring_identity_extract(feeds->id, &did, &name);
int ret = message_ply_append(feeds->id, RHIZOME_SERVICE_MESHMB, NULL, &ply, b, name, nassignments, assignments);
if (ret==0){
struct message_ply_ack ack;
bzero(&ack, sizeof ack);
ack.start_offset = ply.size - ob_position(b);
ack.end_offset = ply.size;
ack.binary_length = 0;
activity_ack(feeds, &ack);
}
ob_free(b);
return ret;

View File

@ -27,7 +27,7 @@ struct meshmb_activity_iterator{
};
struct rhizome_manifest_field_assignment;
int meshmb_send(const keyring_identity *id, const char *message, size_t message_len,
int meshmb_send(struct meshmb_feeds *feeds, const char *message, size_t message_len,
unsigned nassignments, const struct rhizome_manifest_field_assignment *assignments);
// feed tracking

View File

@ -14,14 +14,42 @@
DEFINE_FEATURE(cli_meshmb);
static struct meshmb_feeds * cli_feeds_open(const struct cli_parsed *parsed){
const char *idhex;
if (cli_arg(parsed, "id", &idhex, str_is_identity, "") == -1)
return NULL;
identity_t identity;
if (str_to_identity_t(&identity, idhex) == -1){
WHY("Invalid identity");
return NULL;
}
if (create_serval_instance_dir() == -1
|| rhizome_opendb() == -1
|| !(keyring = keyring_open_instance_cli(parsed)))
return NULL;
keyring_identity *id = keyring_find_identity(keyring, &identity);
if (!id){
WHY("Invalid identity");
return NULL;
}
struct meshmb_feeds *feeds = NULL;
if (meshmb_open(id, &feeds)==-1)
return NULL;
return feeds;
}
DEFINE_CMD(app_meshmb_send, 0,
"Append a public broadcast message to your feed",
"meshmb", "send" KEYRING_PIN_OPTIONS, "<id>", "<message>", "...");
static int app_meshmb_send(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *idhex, *message;
if (cli_arg(parsed, "id", &idhex, str_is_identity, "") == -1
|| cli_arg(parsed, "message", &message, NULL, "") == -1)
const char *message;
if (cli_arg(parsed, "message", &message, NULL, "") == -1)
return -1;
unsigned nfields = (parsed->varargi == -1) ? 0 : parsed->argc - (unsigned)parsed->varargi;
@ -32,20 +60,26 @@ static int app_meshmb_send(const struct cli_parsed *parsed, struct cli_context *
return -1;
}
identity_t identity;
if (str_to_identity_t(&identity, idhex) == -1)
return WHY("Invalid identity");
struct meshmb_feeds *feeds = cli_feeds_open(parsed);
if (create_serval_instance_dir() == -1
|| rhizome_opendb() == -1
|| !(keyring = keyring_open_instance_cli(parsed)))
return -1;
int ret = -1;
if (feeds){
ret = meshmb_send(feeds, message, strlen(message)+1, nfields, fields);
keyring_identity *id = keyring_find_identity(keyring, &identity);
if (!id)
return WHY("Invalid identity");
if (ret!=-1){
ret = meshmb_flush(feeds);
if (ret!=-1)
ret=0;
}
return meshmb_send(id, message, strlen(message)+1, nfields, fields);
meshmb_close(feeds);
}
if (keyring)
keyring_free(keyring);
keyring = NULL;
return ret;
}
// TODO from offset....?
@ -177,35 +211,6 @@ static int app_meshmb_find(const struct cli_parsed *parsed, struct cli_context *
return 0;
}
static struct meshmb_feeds * cli_feeds_open(const struct cli_parsed *parsed){
const char *idhex;
if (cli_arg(parsed, "id", &idhex, str_is_identity, "") == -1)
return NULL;
identity_t identity;
if (str_to_identity_t(&identity, idhex) == -1){
WHY("Invalid identity");
return NULL;
}
if (create_serval_instance_dir() == -1
|| rhizome_opendb() == -1
|| !(keyring = keyring_open_instance_cli(parsed)))
return NULL;
keyring_identity *id = keyring_find_identity(keyring, &identity);
if (!id){
WHY("Invalid identity");
return NULL;
}
struct meshmb_feeds *feeds = NULL;
if (meshmb_open(id, &feeds)==-1)
return NULL;
return feeds;
}
DEFINE_CMD(app_meshmb_follow, 0,
"Start or stop following a broadcast feed",
"meshmb", "follow|ignore" KEYRING_PIN_OPTIONS, "<id>", "<peer>");

View File

@ -11,6 +11,68 @@
DEFINE_FEATURE(http_rest_meshmb);
// allow multiple requests to re-use the same struct meshmb_feeds *, keeping it up to date as bundles arrive
struct meshmb_session{
struct meshmb_session *next;
struct meshmb_session *prev;
unsigned ref_count;
keyring_identity *id;
struct meshmb_feeds *feeds;
};
static struct meshmb_session *sessions = NULL;
static struct meshmb_session *open_session(const identity_t *identity){
keyring_identity *id = keyring_find_identity(keyring, identity);
if (!id)
return NULL;
struct meshmb_session *session = sessions;
while(session){
if (session->id == id){
session->ref_count++;
return session;
}
session = session->next;
}
struct meshmb_feeds *feeds = NULL;
if (meshmb_open(id, &feeds)==-1)
return NULL;
meshmb_update(feeds);
session = emalloc(sizeof (struct meshmb_session));
if (!session){
meshmb_close(feeds);
return NULL;
}
session->next = sessions;
session->prev = NULL;
if (sessions)
sessions->prev = session;
sessions = session;
session->ref_count = 1;
session->id = id;
session->feeds = feeds;
return session;
}
static void close_session(struct meshmb_session *session){
if (--session->ref_count == 0){
if (session->next)
session->next->prev = session->prev;
if (session->prev)
session->prev->next = session->next;
else
sessions = session->next;
meshmb_close(session->feeds);
free(session);
}
}
static char *PART_MESSAGE = "message";
static int send_part_start(struct http_request *hr)
{
@ -77,16 +139,21 @@ static int send_content_end(struct http_request *hr)
assert(r->u.sendmsg.message.length > 0);
assert(r->u.sendmsg.message.length <= MESSAGE_PLY_MAX_LEN);
assert(keyring != NULL);
keyring_identity *id = keyring_find_identity(keyring, &r->bid);
if (!id){
http_request_simple_response(&r->http, 500, "TODO, detailed errors");
return 500;
struct meshmb_session *session = open_session(&r->bid);
int ret;
if (session
&& meshmb_send(session->feeds, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length, 0, NULL)!=-1
&& meshmb_flush(session->feeds)!=-1){
http_request_simple_response(&r->http, 201, "TODO, detailed response");
ret = 201;
}else{
http_request_simple_response(&r->http, 500, "TODO, detailed response");
ret = 500;
}
if (meshmb_send(id, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length, 0, NULL)==-1){
http_request_simple_response(&r->http, 500, "TODO, detailed errors");
return 500;
}
http_request_simple_response(&r->http, 201, "TODO, detailed response");
if (session)
close_session(session);
return 201;
}
@ -417,68 +484,6 @@ static int restful_meshmb_newsince_find(httpd_request *r, const char *remainder)
*/
// allow multiple requests to re-use the same struct meshmb_feeds *, keeping it up to date as bundles arrive
struct meshmb_session{
struct meshmb_session *next;
struct meshmb_session *prev;
unsigned ref_count;
keyring_identity *id;
struct meshmb_feeds *feeds;
};
static struct meshmb_session *sessions = NULL;
static struct meshmb_session *open_session(const identity_t *identity){
keyring_identity *id = keyring_find_identity(keyring, identity);
if (!id)
return NULL;
struct meshmb_session *session = sessions;
while(session){
if (session->id == id){
session->ref_count++;
return session;
}
session = session->next;
}
struct meshmb_feeds *feeds = NULL;
if (meshmb_open(id, &feeds)==-1)
return NULL;
meshmb_update(feeds);
session = emalloc(sizeof (struct meshmb_session));
if (!session){
meshmb_close(feeds);
return NULL;
}
session->next = sessions;
session->prev = NULL;
if (sessions)
sessions->prev = session;
sessions = session;
session->ref_count = 1;
session->id = id;
session->feeds = feeds;
return session;
}
static void close_session(struct meshmb_session *session){
if (--session->ref_count == 0){
if (session->next)
session->next->prev = session->prev;
if (session->prev)
session->prev->next = session->next;
else
sessions = session->next;
meshmb_close(session->feeds);
free(session);
}
}
static int restful_meshmb_follow(httpd_request *r, const char *remainder)
{
if (*remainder)

View File

@ -122,7 +122,7 @@ int message_ply_write_open(
return 0;
}
int message_ply_write_finish(struct message_ply_write *ply_write)
int message_ply_write_finish(struct message_ply_write *ply_write, struct message_ply *ply)
{
enum rhizome_payload_status status = rhizome_finish_write(&ply_write->write);
status = rhizome_finish_store(&ply_write->write, ply_write->m, status);
@ -135,6 +135,13 @@ int message_ply_write_finish(struct message_ply_write *ply_write)
rhizome_bundle_result_free(&result);
return -1;
}
if (ply){
ply->version = ply_write->m->version;
ply->tail = ply_write->m->tail;
ply->size = ply_write->m->filesize;
}
rhizome_bundle_result_free(&result);
if (mout && mout!=ply_write->m){
rhizome_manifest_free(ply_write->m);
@ -165,7 +172,7 @@ int message_ply_append(const keyring_identity *id, const char *service, const si
ob_position(b), write.write.written_offset);
if (rhizome_write_buffer(&write.write, ob_ptr(b), ob_position(b)) == -1)
goto end;
ret = message_ply_write_finish(&write);
ret = message_ply_write_finish(&write, ply);
end:
message_ply_write_close(&write);
return ret;

View File

@ -72,7 +72,7 @@ int message_ply_write_open(
const struct rhizome_manifest_field_assignment *assignments,
uint64_t advance_by);
int message_ply_write_finish(struct message_ply_write *write);
int message_ply_write_finish(struct message_ply_write *write, struct message_ply *ply);
void message_ply_write_close(struct message_ply_write *write);
void message_ply_append_ack(struct overlay_buffer *b, const struct message_ply_ack *ack);

View File

@ -132,11 +132,15 @@ test_meshmbThreading() {
executeOk_servald meshmb activity $IDA1
executeOk_servald meshmb send $IDA5 "Message 10"
executeOk_servald meshmb activity $IDA1
assertStdoutGrep --matches=1 "0:$IDA5:$SIDA5:Feed E:[0-9]\+:[0-9]\+:Message 10\$"
assertStdoutGrep --matches=1 "1:$IDA4:$SIDA4:Feed D:[0-9]\+:[0-9]\+:Message 9\$"
assertStdoutGrep --matches=1 "2:$IDA3:$SIDA3:Feed C:[0-9]\+:[0-9]\+:Message 8\$"
assertStdoutGrep --matches=1 "3:$IDA2:$SIDA2:Feed B:[0-9]\+:[0-9]\+:Message 7\$"
assertStdoutGrep --matches=1 "4:$IDA5:$SIDA5:Feed E:[0-9]\+:[0-9]\+:Message 6\$"
executeOk_servald meshmb send $IDA1 "Message 11"
executeOk_servald meshmb activity $IDA1
tfw_cat --stdout
assertStdoutGrep --matches=1 "0:$IDA1:$SIDA1:Feed A:[0-9]\+:[0-9]\+:Message 11\$"
assertStdoutGrep --matches=1 "1:$IDA5:$SIDA5:Feed E:[0-9]\+:[0-9]\+:Message 10\$"
assertStdoutGrep --matches=1 "2:$IDA4:$SIDA4:Feed D:[0-9]\+:[0-9]\+:Message 9\$"
assertStdoutGrep --matches=1 "3:$IDA3:$SIDA3:Feed C:[0-9]\+:[0-9]\+:Message 8\$"
assertStdoutGrep --matches=1 "4:$IDA2:$SIDA2:Feed B:[0-9]\+:[0-9]\+:Message 7\$"
assertStdoutGrep --matches=1 "5:$IDA5:$SIDA5:Feed E:[0-9]\+:[0-9]\+:Message 6\$"
}
runTests "$@"

View File

@ -221,6 +221,8 @@ setup_MeshMBRestActivity() {
executeOk_servald meshmb activity $IDA1
executeOk_servald meshmb send $IDA5 "Message 10"
executeOk_servald meshmb activity $IDA1
executeOk_servald meshmb send $IDA1 "Message 11"
executeOk_servald meshmb activity $IDA1
}
test_MeshMBRestActivity() {
executeOk curl \
@ -232,19 +234,20 @@ test_MeshMBRestActivity() {
"http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/activity.json"
tfw_cat http.headers activity.json
tfw_preserve activity.json
assert [ "$(jq '.rows | length' activity.json)" = 9 ]
assert [ "$(jq '.rows | length' activity.json)" = 10 ]
transform_list_json activity.json list.json
tfw_preserve list.json
assertJq list.json "contains([{ __index: 0, id: \"$IDA5\", author: \"$SIDA5\", name: \"Feed E\", message: \"Message 10\"}])"
assertJq list.json "contains([{ __index: 1, id: \"$IDA4\", author: \"$SIDA4\", name: \"Feed D\", message: \"Message 9\"}])"
assertJq list.json "contains([{ __index: 2, id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 8\"}])"
assertJq list.json "contains([{ __index: 3, id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 7\"}])"
assertJq list.json "contains([{ __index: 4, id: \"$IDA5\", author: \"$SIDA5\", name: \"Feed E\", message: \"Message 6\"}])"
assertJq list.json "contains([{ __index: 5, id: \"$IDA4\", author: \"$SIDA4\", name: \"Feed D\", message: \"Message 5\"}])"
assertJq list.json "contains([{ __index: 6, id: \"$IDA4\", author: \"$SIDA4\", name: \"Feed D\", message: \"Message 4\"}])"
assertJq list.json "contains([{ __index: 7, id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 3\"}])"
assertJq list.json "contains([{ __index: 8, id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 2\"}])"
assertJq list.json "contains([{ __index: 0, id: \"$IDA1\", author: \"$SIDA1\", name: \"Feed A\", message: \"Message 11\"}])"
assertJq list.json "contains([{ __index: 1, id: \"$IDA5\", author: \"$SIDA5\", name: \"Feed E\", message: \"Message 10\"}])"
assertJq list.json "contains([{ __index: 2, id: \"$IDA4\", author: \"$SIDA4\", name: \"Feed D\", message: \"Message 9\"}])"
assertJq list.json "contains([{ __index: 3, id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 8\"}])"
assertJq list.json "contains([{ __index: 4, id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 7\"}])"
assertJq list.json "contains([{ __index: 5, id: \"$IDA5\", author: \"$SIDA5\", name: \"Feed E\", message: \"Message 6\"}])"
assertJq list.json "contains([{ __index: 6, id: \"$IDA4\", author: \"$SIDA4\", name: \"Feed D\", message: \"Message 5\"}])"
assertJq list.json "contains([{ __index: 7, id: \"$IDA4\", author: \"$SIDA4\", name: \"Feed D\", message: \"Message 4\"}])"
assertJq list.json "contains([{ __index: 8, id: \"$IDA3\", author: \"$SIDA3\", name: \"Feed C\", message: \"Message 3\"}])"
assertJq list.json "contains([{ __index: 9, id: \"$IDA2\", author: \"$SIDA2\", name: \"Feed B\", message: \"Message 2\"}])"
}
doc_MeshMBRestNewActivity="Restful newsince incoming activity"