From a1caa4bf684eb3147a7c5ceaa05d3a9945865864 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Thu, 25 Jul 2013 14:49:06 +0930 Subject: [PATCH] ACK conversations when listed, track conversations in a bundle --- meshms.c | 290 ++++++++++++++++++++++++++++++++++++++++++--------- tests/meshms | 137 +++++++++++++----------- 2 files changed, 315 insertions(+), 112 deletions(-) diff --git a/meshms.c b/meshms.c index 7f7995e2..c267be35 100644 --- a/meshms.c +++ b/meshms.c @@ -2,6 +2,7 @@ #include "rhizome.h" #include "log.h" #include "conf.h" +#include "crypto.h" #define MESHMS_BLOCK_TYPE_ACK 0x01 #define MESHMS_BLOCK_TYPE_MESSAGE 0x02 @@ -12,10 +13,6 @@ struct ply{ uint64_t version; uint64_t tail; uint64_t size; - - uint64_t last_message; - uint64_t last_ack; - uint64_t last_ack_offset; }; struct conversations{ @@ -26,10 +23,18 @@ struct conversations{ struct ply my_ply; char found_their_ply; struct ply their_ply; + + // what is the offset of their last message + uint64_t their_last_message; + // what is the last message we marked as read + uint64_t read_offset; + // our cached value for the last known size of their ply + uint64_t their_size; }; struct ply_read{ struct rhizome_read read; + struct rhizome_read_buffer buff; uint64_t record_end_offset; uint16_t record_length; int buffer_size; @@ -37,6 +42,8 @@ struct ply_read{ unsigned char *buffer; }; +static int meshms_conversations_list(const char *my_sid_hex, const char *their_sid_hex, struct conversations **conv); + static void free_conversations(struct conversations *conv){ if (!conv) return; @@ -45,9 +52,53 @@ static void free_conversations(struct conversations *conv){ free(conv); } +static int get_my_conversation_bundle(const char *my_sid, rhizome_manifest *m) +{ + /* Find our private key */ + int cn=0,in=0,kp=0; + sid_t sender_sid; + if (cf_opt_sid(&sender_sid,my_sid)) + return WHYF("Could not parse SID: %s",my_sid); + if (!keyring_find_sid(keyring,&cn,&in,&kp,sender_sid.binary)) + return WHYF("SID was not found in keyring: %s", my_sid); + + char seed[1024]; + snprintf(seed, sizeof(seed), + "incorrection%sconcentrativeness", + alloca_tohex(keyring->contexts[cn]->identities[in] + ->keypairs[kp]->private_key, crypto_box_curve25519xsalsa20poly1305_SECRETKEYBYTES)); + + int ret = rhizome_get_bundle_from_seed(m, seed); + if (ret<0) + return -1; + + // always consider the content encrypted, we don't need to rely on the manifest itself. + m->payloadEncryption=1; + return 0; +} + +static struct conversations *add_conv(struct conversations **conv, const char *them){ + struct conversations **ptr=conv; + while(*ptr){ + int cmp = strcmp((*ptr)->them, them); + if (cmp==0) + break; + if (cmp<0) + ptr = &(*ptr)->_left; + else + ptr = &(*ptr)->_right; + } + if (!*ptr){ + *ptr = emalloc_zero(sizeof(struct conversations)); + if (*ptr) + strncpy((*ptr)->them, them, SID_STRLEN); + } + return *ptr; +} + // find matching conversations // if their_sid_hex == my_sid_hex, return all conversations with any recipient -static int meshms_conversations_list(const char *my_sid_hex, const char *their_sid_hex, struct conversations **conv){ +static int get_database_conversations(const char *my_sid_hex, const char *their_sid_hex, struct conversations **conv){ sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT id, version, filesize, tail, sender, recipient " @@ -84,33 +135,24 @@ static int meshms_conversations_list(const char *my_sid_hex, const char *their_s if (config.debug.meshms) DEBUGF("found id %s, sender %s, recipient %s", id, sender, recipient); - struct conversations **ptr=conv; - while(*ptr){ - int cmp = strcmp((*ptr)->them, them); - if (cmp==0) - break; - if (cmp<0) - ptr = &(*ptr)->_left; - else - ptr = &(*ptr)->_right; - } - if (!*ptr){ - *ptr = emalloc_zero(sizeof(struct conversations)); - strncpy((*ptr)->them, them, SID_STRLEN); - } + struct conversations *ptr = add_conv(conv, them); + if (!ptr) + goto end; + struct ply *p; if (them==sender){ - (*ptr)->found_their_ply=1; - p=&(*ptr)->their_ply; + ptr->found_their_ply=1; + p=&ptr->their_ply; }else{ - (*ptr)->found_my_ply=1; - p=&(*ptr)->my_ply; + ptr->found_my_ply=1; + p=&ptr->my_ply; } strncpy(p->bundle_id, id, RHIZOME_MANIFEST_ID_STRLEN+1); p->version = version; p->tail = tail; p->size = size; } + end: if (ret!=SQLITE_OK){ WHYF("Query failed: %s", sqlite3_errmsg(rhizome_db)); @@ -174,15 +216,13 @@ static int ply_read_close(struct ply_read *ply){ ply->buffer=NULL; } ply->buffer_size=0; + ply->buff.len=0; return rhizome_read_close(&ply->read); } // read the next record from the ply (backwards) // returns 1 on EOF, -1 on failure static int ply_read_next(struct ply_read *ply){ - // TODO read in RHIZOME_CRYPT_PAGE_SIZE blocks, aligned to boundaries - if (config.debug.meshms) - DEBUGF("Attempting to read next record ending @%"PRId64,ply->read.offset); ply->record_end_offset=ply->read.offset; unsigned char footer[2]; ply->read.offset-=sizeof(footer); @@ -191,7 +231,7 @@ static int ply_read_next(struct ply_read *ply){ DEBUGF("EOF"); return 1; } - if (rhizome_read(&ply->read, footer, sizeof(footer))!=sizeof(footer)) + if (rhizome_read_buffered(&ply->read, &ply->buff, footer, sizeof(footer))) return -1; // (rhizome_read automatically advances the offset by the number of bytes read) ply->record_length=read_uint16(footer); @@ -199,7 +239,7 @@ static int ply_read_next(struct ply_read *ply){ ply->record_length = ply->record_length>>4; if (config.debug.meshms) - DEBUGF("Found record %d, length %d", ply->type, ply->record_length); + DEBUGF("Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset); // need to allow for advancing the tail and cutting a message in half. if (ply->record_length + sizeof(footer) > ply->read.offset){ @@ -227,6 +267,7 @@ static int ply_read_next(struct ply_read *ply){ return 0; } +// keep reading past messages until you find this type. static int ply_find_next(struct ply_read *ply, char type){ while(1){ int ret = ply_read_next(ply); @@ -269,6 +310,7 @@ end: } // update if any conversations are unread or need to be acked. +// return -1 for failure, 1 if the conversation index needs to be saved. static int update_conversation(const char *my_sidhex, struct conversations *conv){ if (config.debug.meshms) DEBUG("Checking if conversation needs to be acked"); @@ -297,12 +339,22 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv goto end; ret = ply_find_next(&ply, MESHMS_BLOCK_TYPE_MESSAGE); - if (ret!=0) + if (ret!=0){ + // no messages indicates that we didn't do anthing + if (ret>0) + ret=0; goto end; + } - uint64_t last_message_offset = ply.record_end_offset; + if (conv->their_last_message == ply.record_end_offset){ + // nothing has changed since last time + ret=0; + goto end; + } + + conv->their_last_message = ply.record_end_offset; if (config.debug.meshms) - DEBUGF("Found last message @%"PRId64, last_message_offset); + DEBUGF("Found last message @%"PRId64, conv->their_last_message); ply_read_close(&ply); // find our previous ack @@ -337,22 +389,21 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv DEBUGF("No outgoing ply"); } - if (previous_ack >= last_message_offset){ + if (previous_ack >= conv->their_last_message){ // their last message has already been acked - ret=0; + ret=1; goto end; } // append an ack for their message - // TODO shorter format here? if (config.debug.meshms) - DEBUGF("Creating ACK for %"PRId64" - %"PRId64, previous_ack, last_message_offset); + DEBUGF("Creating ACK for %"PRId64" - %"PRId64, previous_ack, conv->their_last_message); unsigned char buffer[24]; int ofs=0; - ofs+=pack_uint(&buffer[ofs], last_message_offset); + ofs+=pack_uint(&buffer[ofs], conv->their_last_message); if (previous_ack) - ofs+=pack_uint(&buffer[ofs], last_message_offset - previous_ack); + ofs+=pack_uint(&buffer[ofs], conv->their_last_message - previous_ack); ofs+=append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_ACK, ofs); ret = append_meshms_buffer(my_sidhex, conv, buffer, ofs); @@ -362,17 +413,161 @@ end: rhizome_manifest_free(m_ours); if (m_theirs) rhizome_manifest_free(m_theirs); + + // if it's all good, remember the size of their ply at the time we examined it. + if (ret>=0) + conv->their_size = conv->their_ply.size; + return ret; } -// check if any conversations have changed +// update conversations, and return 1 if the conversation index should be saved static int update_conversations(const char *my_sidhex, struct conversations *conv){ if (!conv) return 0; - update_conversations(my_sidhex, conv->_left); - update_conversation(my_sidhex, conv); - update_conversations(my_sidhex, conv->_right); - return 0; + int ret = 0; + if (update_conversations(my_sidhex, conv->_left)) + ret=1; + + if (conv->their_size != conv->their_ply.size){ + if (update_conversation(my_sidhex, conv)>0) + ret=1; + } + + if (update_conversations(my_sidhex, conv->_right)) + ret=1; + + return ret; +} + +// read our cached conversation list from our rhizome payload +static int read_known_conversations(rhizome_manifest *m, const char *their_sid_hex, struct conversations **conv){ + if (m->haveSecret!=EXISTING_BUNDLE_ID) + return 0; + + struct rhizome_read read; + bzero(&read, sizeof(read)); + struct rhizome_read_buffer buff; + bzero(&buff, sizeof(buff)); + + int ret = rhizome_open_decrypt_read(m, NULL, &read, 0); + if (ret<0) + goto end; + + while (1){ + char them[SID_STRLEN+1]; + ret=rhizome_read_buffered(&read, &buff, (unsigned char *)them, sizeof(them)); + if (ret<=0) + break; + if (their_sid_hex && strcmp(them, their_sid_hex)) + continue; + struct conversations *ptr = add_conv(conv, them); + if (!ptr) + return -1; + unsigned char details[8*3]; + ret=rhizome_read_buffered(&read, &buff, details, sizeof(details)); + if (ret<=0) + break; + + ptr->their_last_message = read_uint64(details); + ptr->read_offset = read_uint64(details+8); + ptr->their_size = read_uint64(details+16); + } +end: + rhizome_read_close(&read); + return ret; +} + +static int write_conversation(struct rhizome_write *write, struct conversations *conv){ + int len=0; + { + unsigned char buffer[SID_STRLEN + 1 + (8*3)]; + if (write) + bcopy(conv->them, buffer, sizeof(conv->them)); + len+=sizeof(conv->them); + if (write) + write_uint64(&buffer[len], conv->their_last_message); + len+=8; + if (write) + write_uint64(&buffer[len], conv->read_offset); + len+=8; + if (write) + write_uint64(&buffer[len], conv->their_size); + len+=8; + } + // write the two child nodes + int ret=write_conversation(write, conv->_left); + if (ret<0) + return ret; + len+=ret; + ret=write_conversation(write, conv->_right); + if (ret<0) + return ret; + len+=ret; + return len; +} + +static int write_known_conversations(rhizome_manifest *m, struct conversations *conv){ + if (m->haveSecret!=EXISTING_BUNDLE_ID) + return 0; + rhizome_manifest *mout=NULL; + + struct rhizome_write write; + bzero(&write, sizeof(write)); + int ret=-1; + + // TODO rebalance tree... + + // measure the final payload first + int len=write_conversation(NULL, conv); + if (len<0) + goto end; + + // then write it + m->fileLength = len; + if (rhizome_write_open_manifest(&write, m)) + goto end; + if (write_conversation(&write, conv)<0) + goto end; + if (rhizome_finish_write(&write)) + goto end; + if (rhizome_manifest_finalise(m,&mout)) + goto end; + + ret=0; +end: + if (ret) + rhizome_fail_write(&write); + if (mout && m!=mout) + rhizome_manifest_free(mout); + return ret; +} + +// read information about existing conversations from a rhizome payload +static int meshms_conversations_list(const char *my_sid_hex, const char *their_sid_hex, struct conversations **conv){ + int ret=-1; + rhizome_manifest *m = rhizome_new_manifest(); + if (!m) + goto end; + if (get_my_conversation_bundle(my_sid_hex, m)) + goto end; + + // read conversations payload + if (read_known_conversations(m, NULL, conv)) + goto end; + + if (get_database_conversations(my_sid_hex, their_sid_hex?their_sid_hex:my_sid_hex, conv)) + goto end; + + if (update_conversations(my_sid_hex, *conv) && !their_sid_hex){ + if (write_known_conversations(m, *conv)) + goto end; + } + ret=0; + +end: + rhizome_manifest_free(m); + return ret; } // recursively traverse the conversation tree in sorted order and output the details of each conversation @@ -385,7 +580,7 @@ static int output_conversations(struct cli_context *context, struct conversation if (count <0 || output + traverse_count < offset + count){ if (output + traverse_count >= offset){ cli_put_string(context, conv->them, ":"); - cli_put_string(context, "read", ":");// TODO + cli_put_string(context, conv->read_offset < conv->their_last_message ? "unread":"", ":"); cli_put_string(context, "delivered", "\n");// TODO } traverse_count++; @@ -415,12 +610,9 @@ int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_context return -1; struct conversations *conv=NULL; - if (meshms_conversations_list(sidhex, sidhex, &conv)) + if (meshms_conversations_list(sidhex, NULL, &conv)) return -1; - //TODO, when we are tracking read state - //update_conversations(my_sidhex, conv); - const char *names[]={ "sid","read","delivered" }; @@ -479,8 +671,6 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context if (!conv) return -1; - update_conversation(my_sidhex, conv); - int ret=-1; const char *names[]={ @@ -558,7 +748,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context cli_put_long(context, id++, ":"); cli_put_long(context, read_theirs.read.offset, ":"); cli_put_string(context, their_sidhex, ":"); - cli_put_string(context, "read", ":"); + cli_put_string(context, read_theirs.record_end_offset <= conv->read_offset?"":"unread", ":"); cli_put_string(context, (char *)read_theirs.buffer, "\n"); } } diff --git a/tests/meshms b/tests/meshms index 44483f11..72ee91d2 100755 --- a/tests/meshms +++ b/tests/meshms @@ -22,45 +22,14 @@ source "${0%/*}/../testframework.sh" source "${0%/*}/../testdefs.sh" source "${0%/*}/../testdefs_rhizome.sh" -doc_listConversations="" -setup_listConversations() { - setup_servald - set_instance +A - create_identities 5 - executeOk_servald config \ - set debug.rhizome on \ - set debug.meshms on \ - set log.console.level debug - #cheating, adding fake message logs to the same servald - echo "Message1" >file1 - echo -e "service=MeshMS1\nsender=$SIDA1\nrecipient=$SIDA2" >file1.manifest - echo "Message2" >file2 - echo -e "service=MeshMS1\nsender=$SIDA3\nrecipient=$SIDA1" >file2.manifest - echo "Message3" >file3 - echo -e "service=MeshMS1\nsender=$SIDA1\nrecipient=$SIDA4" >file3.manifest - echo "Message4" >file4 - echo -e "service=MeshMS1\nsender=$SIDA4\nrecipient=$SIDA1" >file4.manifest - executeOk_servald rhizome add file '' file1 file1.manifest - executeOk_servald rhizome add file '' file2 file2.manifest - executeOk_servald rhizome add file '' file3 file3.manifest - executeOk_servald rhizome add file '' file4 file4.manifest -} -test_listConversations() { - executeOk_servald meshms list conversations $SIDA1 - assertStdoutIs --stderr --line=1 -e '3\n' - assertStdoutIs --stderr --line=2 -e 'sid:read:delivered\n' - assertStdoutGrep --stderr --matches=1 "^$SIDA2:read:delivered\$" - assertStdoutGrep --stderr --matches=1 "^$SIDA3:read:delivered\$" - assertStdoutGrep --stderr --matches=1 "^$SIDA4:read:delivered\$" - assertStdoutLineCount '==' 5 - executeOk_servald meshms list conversations $SIDA1 1 - assertStdoutLineCount '==' 4 - executeOk_servald meshms list conversations $SIDA1 1 1 - assertStdoutLineCount '==' 3 +teardown() { + stop_all_servald_servers + kill_all_servald_processes + assert_no_servald_processes } -doc_AddMessages="Add messages and ack's to a 2 party conversation" -setup_AddMessages() { +doc_MessageDelivery="Send messages and ack them in a 2 party conversation" +setup_MessageDelivery() { setup_servald set_instance +A create_identities 2 @@ -68,7 +37,7 @@ setup_AddMessages() { set debug.meshms on \ set log.console.level debug } -test_AddMessages() { +test_MessageDelivery() { # 1. empty list executeOk_servald meshms list messages $SIDA1 $SIDA2 assertStdoutIs --stdout --line=1 -e '5\n' @@ -88,40 +57,84 @@ test_AddMessages() { assertStdoutLineCount '==' 4 # 4. list the messages from the receivers point of view (which ACKs them) executeOk_servald meshms list messages $SIDA2 $SIDA1 - tfw_cat --stdout + tfw_cat --stdout --stderr assertStdoutGrep --stdout --matches=1 ":How are you\$" assertStdoutGrep --stdout --matches=1 ":Hi\$" assertStdoutLineCount '==' 4 # 5. list messages from the senders point of view after they have been delivered executeOk_servald meshms list messages $SIDA1 $SIDA2 - tfw_cat --stdout + tfw_cat --stdout --stderr assertStdoutGrep --stdout --matches=1 ":delivered:How are you\$" assertStdoutGrep --stdout --matches=1 ":delivered:Hi\$" assertStdoutLineCount '==' 4 - # 6. both parties add more messages without acking - executeOk_servald meshms send message $SIDA2 $SIDA1 "Help Im trapped in a test case factory" - executeOk_servald meshms send message $SIDA2 $SIDA1 "Never mind" - executeOk_servald meshms send message $SIDA1 $SIDA2 "Hello can you hear me" - executeOk_servald meshms send message $SIDA1 $SIDA2 "Still waiting" - # 7. each person see's the final threaded message list in a different order - executeOk_servald meshms list messages $SIDA2 $SIDA1 +} + +doc_MessageThreading="Messages sent at the same time, thread differently" +setup_MessageThreading() { + setup_servald + foreach_instance +A +B create_single_identity + set_instance +A + executeOk_servald meshms send message $SIDA $SIDB "Hello can you hear me" + executeOk_servald meshms send message $SIDA $SIDB "Still waiting" + set_instance +B + executeOk_servald meshms send message $SIDB $SIDA "Help Im trapped in a test case factory" + executeOk_servald meshms send message $SIDB $SIDA "Never mind" + start_servald_instances +A +B +} +test_MessageThreading() { + #TODO wait for bundle to arrive + sleep 3 + + set_instance +B + executeOk_servald meshms list messages $SIDB $SIDA tfw_cat --stdout - assertStdoutGrep --stdout --matches=1 "^0:43:$SIDA1:read:Still waiting\$" - assertStdoutGrep --stdout --matches=1 "^1:19:$SIDA1:read:Hello can you hear me\$" - assertStdoutGrep --stdout --matches=1 "^2:44:$SIDA2::Never mind\$" - assertStdoutGrep --stdout --matches=1 "^3:3:$SIDA2::Help Im trapped in a test case factory\$" - assertStdoutGrep --stdout --matches=1 "^4:5:$SIDA1:read:How are you\$" - assertStdoutGrep --stdout --matches=1 "^5:0:$SIDA1:read:Hi\$" - assertStdoutLineCount '==' 8 - executeOk_servald meshms list messages $SIDA1 $SIDA2 + assertStdoutGrep --stdout --matches=1 "^0:24:$SIDA:unread:Still waiting\$" + assertStdoutGrep --stdout --matches=1 "^1:0:$SIDA:unread:Hello can you hear me\$" + assertStdoutGrep --stdout --matches=1 "^2:41:$SIDB::Never mind\$" + assertStdoutGrep --stdout --matches=1 "^3:0:$SIDB::Help Im trapped in a test case factory\$" + assertStdoutLineCount '==' 6 + + #TODO wait for bundle to arrive + sleep 3 + + set_instance +A + executeOk_servald meshms list messages $SIDA $SIDB tfw_cat --stdout - assertStdoutGrep --stdout --matches=1 "^0:44:$SIDA2:read:Never mind\$" - assertStdoutGrep --stdout --matches=1 "^1:3:$SIDA2:read:Help Im trapped in a test case factory\$" - assertStdoutGrep --stdout --matches=1 "^2:43:$SIDA1:delivered:Still waiting\$" - assertStdoutGrep --stdout --matches=1 "^3:19:$SIDA1:delivered:Hello can you hear me\$" - assertStdoutGrep --stdout --matches=1 "^4:5:$SIDA1:delivered:How are you\$" - assertStdoutGrep --stdout --matches=1 "^5:0:$SIDA1:delivered:Hi\$" - assertStdoutLineCount '==' 8 + assertStdoutGrep --stdout --matches=1 "^0:41:$SIDB:unread:Never mind\$" + assertStdoutGrep --stdout --matches=1 "^1:0:$SIDB:unread:Help Im trapped in a test case factory\$" + assertStdoutGrep --stdout --matches=1 "^2:24:$SIDA:delivered:Still waiting\$" + assertStdoutGrep --stdout --matches=1 "^3:0:$SIDA:delivered:Hello can you hear me\$" + assertStdoutLineCount '==' 6 +} + +doc_listConversations="List multiple conversations, with different numbers of messages" +setup_listConversations() { + setup_servald + set_instance +A + create_identities 5 + executeOk_servald config \ + set debug.rhizome on \ + set debug.meshms on \ + set log.console.level debug + #cheating, adding fake message logs to the same servald + executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1" + executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2" + executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3" + executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4" +} +test_listConversations() { + executeOk_servald meshms list conversations $SIDA1 + tfw_cat --stdout + assertStdoutIs --stderr --line=1 -e '3\n' + assertStdoutIs --stderr --line=2 -e 'sid:read:delivered\n' + assertStdoutGrep --stderr --matches=1 "^$SIDA2::delivered\$" + assertStdoutGrep --stderr --matches=1 "^$SIDA3:unread:delivered\$" + assertStdoutGrep --stderr --matches=1 "^$SIDA4:unread:delivered\$" + assertStdoutLineCount '==' 5 + executeOk_servald meshms list conversations $SIDA1 1 + assertStdoutLineCount '==' 4 + executeOk_servald meshms list conversations $SIDA1 1 1 + assertStdoutLineCount '==' 3 } runTests "$@"