From 04991c1a791f7ea7441a0e0910bffadae83bf870 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 23 Aug 2016 14:46:30 +0930 Subject: [PATCH] Don't ack messages from unknown peers, & allow for blocking --- meshms.c | 124 ++++++++++++++++++++++++++++---------------- meshms.h | 6 ++- meshms_cli.c | 5 +- meshms_restful.c | 2 +- testdefs_meshms.sh | 2 +- tests/meshms | 92 ++++++++++++++++++++++++++++---- tests/meshmsrestful | 12 ++--- 7 files changed, 176 insertions(+), 67 deletions(-) diff --git a/meshms.c b/meshms.c index 1271d312..0d95c172 100644 --- a/meshms.c +++ b/meshms.c @@ -31,6 +31,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "dataformats.h" #include "overlay_buffer.h" +#define FLAG_BLOCKED (1<<0) +#define FLAG_KNOWN (1<<1) + static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_sid, const uint64_t offset); void meshms_free_conversations(struct meshms_conversations *conv) @@ -310,7 +313,7 @@ static enum meshms_status update_conversation(const keyring_identity *id, struct if (meshms_failed(status)) return status; - if (conv->metadata.my_last_ack >= conv->metadata.their_last_message) + if (conv->metadata.my_last_ack >= conv->metadata.their_last_message || !conv->known) return status; // append an ack for their message @@ -406,7 +409,7 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, struct m int ofs = SID_SIZE; - uint8_t flags = 0; // TODO flags + uint8_t flags = FLAG_KNOWN; struct meshms_metadata metadata; bzero(&metadata, sizeof metadata); int unpacked; @@ -428,37 +431,39 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, struct m flags = buffer[ofs++]; - if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.their_size)) == -1) - break; - ofs += unpacked; - if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.my_size)) == -1) - break; - ofs += unpacked; + if (!(flags & FLAG_BLOCKED)){ + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.their_size)) == -1) + break; + ofs += unpacked; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.my_size)) == -1) + break; + ofs += unpacked; - if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) - break; - ofs += unpacked; - metadata.their_last_message = metadata.their_size - delta; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.their_last_message = metadata.their_size - delta; - if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) - break; - ofs += unpacked; - metadata.read_offset = metadata.their_size - delta; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.read_offset = metadata.their_size - delta; - if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) - break; - ofs += unpacked; - metadata.their_last_ack_offset = metadata.their_size - delta; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.their_last_ack_offset = metadata.their_size - delta; - if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) - break; - ofs += unpacked; - metadata.my_last_ack = metadata.their_size - delta; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.my_last_ack = metadata.their_size - delta; - if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) - break; - ofs += unpacked; - metadata.their_last_ack = metadata.my_size - delta; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.their_last_ack = metadata.my_size - delta; + } } read.offset += ofs - bytes; @@ -471,10 +476,13 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, struct m ptr = &n->_next; n->them = *sid; + n->blocked = (flags & FLAG_BLOCKED) ? 1 : 0; + n->known = (flags & FLAG_KNOWN) ? 1 : 0; n->metadata = metadata; - DEBUGF(meshms, "Unpacked existing conversation for %s (their_size=%"PRIu64", my_size=%"PRIu64", last_message=%"PRIu64", read_offset=%"PRIu64", my_ack=%"PRIu64", their_ack=%"PRIu64")", + DEBUGF(meshms, "Unpacked existing conversation for %s (%stheir_size=%"PRIu64", my_size=%"PRIu64", last_message=%"PRIu64", read_offset=%"PRIu64", my_ack=%"PRIu64", their_ack=%"PRIu64")", alloca_tohex_sid_t(*sid), + (n->blocked ? "BLOCKED, " : ""), metadata.their_size, metadata.my_size, metadata.their_last_message, @@ -490,28 +498,36 @@ end: static ssize_t write_conversation(struct rhizome_write *write, struct meshms_conversations *conv) { + if (!conv->known) + return 0; + size_t len=0; unsigned char buffer[sizeof(conv->them) + (12*3) + 1]; bcopy(conv->them.binary, buffer, sizeof(conv->them)); len+=sizeof(conv->them); - buffer[len++] = 0; // TODO reserved for flags + buffer[len++] = + (conv->blocked ? FLAG_BLOCKED : 0) | + (conv->known ? FLAG_KNOWN : 0) + ; - assert(conv->metadata.their_size >= conv->metadata.their_last_message); - assert(conv->metadata.their_size >= conv->metadata.read_offset); - assert(conv->metadata.their_size >= conv->metadata.my_last_ack); - assert(conv->metadata.their_size >= conv->metadata.their_last_ack_offset); - assert(conv->metadata.my_size >= conv->metadata.their_last_ack); + if (!conv->blocked) { + assert(conv->metadata.their_size >= conv->metadata.their_last_message); + assert(conv->metadata.their_size >= conv->metadata.read_offset); + assert(conv->metadata.their_size >= conv->metadata.my_last_ack); + assert(conv->metadata.their_size >= conv->metadata.their_last_ack_offset); + assert(conv->metadata.my_size >= conv->metadata.their_last_ack); - // assume that most ack & read offsets are going to be near the ply length - // so store them as delta's. - len+=pack_uint(&buffer[len], conv->metadata.their_size); - len+=pack_uint(&buffer[len], conv->metadata.my_size); - len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_message); - len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.read_offset); - len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_ack_offset); - len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.my_last_ack); - len+=pack_uint(&buffer[len], conv->metadata.my_size - conv->metadata.their_last_ack); + // assume that most ack & read offsets are going to be near the ply length + // so store them as delta's. + len+=pack_uint(&buffer[len], conv->metadata.their_size); + len+=pack_uint(&buffer[len], conv->metadata.my_size); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_message); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.read_offset); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_ack_offset); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.my_last_ack); + len+=pack_uint(&buffer[len], conv->metadata.my_size - conv->metadata.their_last_ack); + } assert(len <= sizeof buffer); @@ -876,7 +892,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * } } -enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len) +enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, uint8_t mark_known, const char *message, size_t message_len) { assert(message_len != 0); if (message_len > MESSAGE_PLY_MAX_LEN) { @@ -912,6 +928,9 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie status = MESHMS_STATUS_UPDATED; } + if (mark_known) + c->known = 1; + enum meshms_status tmp_status = update_stats(c); if (meshms_failed(tmp_status)) return tmp_status; @@ -979,6 +998,19 @@ enum meshms_status meshms_mark_read(const sid_t *sender, const sid_t *recipient, if (meshms_failed(status = meshms_open_list(id, m, &conv))) goto end; + if (recipient){ + // implicitly mark this conversation as "known" + struct meshms_conversations *c = conv; + + while (c){ + if (cmp_sid_t(&conv->them, recipient)==0){ + c->known = 1; + break; + } + c = c->_next; + } + } + unsigned changed = 0; // check if any incoming conversations need to be acked or have new messages if (meshms_failed(status = update_conversations(id, &conv))) @@ -1002,7 +1034,7 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_ { unsigned ret=0; while (conv){ - if (!their_sid || cmp_sid_t(&conv->them, their_sid)==0){ + if (conv->known && (!their_sid || cmp_sid_t(&conv->them, their_sid)==0)){ // update read offset // - never past their last message // - never rewind, only advance diff --git a/meshms.h b/meshms.h index d153ca6c..c36b11a3 100644 --- a/meshms.h +++ b/meshms.h @@ -76,6 +76,9 @@ struct meshms_conversations { struct message_ply their_ply; struct meshms_metadata metadata; + + uint8_t blocked:1; + uint8_t known:1; }; /* Fetch the list of all MeshMS conversations into a binary tree whose nodes @@ -160,8 +163,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * * MESHMS_STATUS_UPDATED on success, any other value indicates a failure or * error (which is already logged). */ - -enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len); +enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, uint8_t mark_known, const char *message, size_t message_len); /* Update the read offset for one or more conversations. Returns * MESHMS_STATUS_UPDATED on success, any other value indicates a failure or diff --git a/meshms_cli.c b/meshms_cli.c index 707a09c0..5aa90172 100644 --- a/meshms_cli.c +++ b/meshms_cli.c @@ -94,7 +94,7 @@ end: DEFINE_CMD(app_meshms_send_message, 0, "Send a MeshMS message from to ", - "meshms","send","message" KEYRING_PIN_OPTIONS, "", "", ""); + "meshms","send","message" KEYRING_PIN_OPTIONS, "[--mark-known]", "", "", ""); static int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context *UNUSED(context)) { const char *my_sidhex, *their_sidhex, *message; @@ -102,6 +102,7 @@ static int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_c || cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, "") == -1 || cli_arg(parsed, "payload", &message, NULL, "") == -1) return -1; + uint8_t mark_known = (cli_arg(parsed, "--mark-known", NULL, NULL, NULL) == 0); sid_t my_sid, their_sid; if (str_to_sid_t(&my_sid, my_sidhex) == -1) @@ -120,7 +121,7 @@ static int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_c } // include terminating NUL - enum meshms_status status = meshms_send_message(&my_sid, &their_sid, message, strlen(message) + 1); + enum meshms_status status = meshms_send_message(&my_sid, &their_sid, mark_known, message, strlen(message) + 1); keyring_free(keyring); keyring = NULL; return meshms_failed(status) ? status : 0; diff --git a/meshms_restful.c b/meshms_restful.c index 222b674c..5af4ca67 100644 --- a/meshms_restful.c +++ b/meshms_restful.c @@ -673,7 +673,7 @@ static int restful_meshms_sendmessage_end(struct http_request *hr) assert(r->u.sendmsg.message.length > 0); assert(r->u.sendmsg.message.length <= MESSAGE_PLY_MAX_LEN); enum meshms_status status; - if (meshms_failed(status = meshms_send_message(&r->sid1, &r->sid2, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length))) + if (meshms_failed(status = meshms_send_message(&r->sid1, &r->sid2, 1, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length))) return http_request_meshms_response(r, 0, NULL, status); return http_request_meshms_response(r, 201, "Message sent", status); } diff --git a/testdefs_meshms.sh b/testdefs_meshms.sh index a4b806b8..4febce78 100644 --- a/testdefs_meshms.sh +++ b/testdefs_meshms.sh @@ -57,7 +57,7 @@ meshms_send_message() { --form "message=$text;type=text/plain;charset=utf-8" \ "http://$addr_localhost:$MESHMS_RESTFUL_PORT/restful/meshms/$sid_sender/$sid_recipient/sendmessage" else - executeOk_servald meshms send message $sid_sender $sid_recipient "$text" + executeOk_servald meshms send message --mark-known $sid_sender $sid_recipient "$text" fi } diff --git a/tests/meshms b/tests/meshms index dd337af4..a9a50973 100755 --- a/tests/meshms +++ b/tests/meshms @@ -91,6 +91,80 @@ test_SendAndList() { assertStdoutLineCount '==' 5 } +doc_UnexpectedMessages="Unsolicited messages are not ACK'd when listing" +setup_UnexpectedMessages() { + setup_common 2 +} +test_UnexpectedMessages() { + # create a single message + executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi" + # list it from the receiver + executeOk_servald meshms list messages $SIDA2 $SIDA1 + tfw_cat --stdout + # timestamp is unknown, because the message has not been acked. + assertStdoutGrep --stdout --matches=1 "^0:5:-1:<:Hi\$" + assertStdoutLineCount '==' 3 + # list again from the sender, has not been delivered + executeOk_servald meshms list messages $SIDA1 $SIDA2 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:5:$rexp_age:>:Hi\$" + assertStdoutLineCount '==' 3 +} + +doc_SendingAcks="Sending a message acknowledges delivery" +setup_SendingAcks() { + setup_common 2 +} +test_SendingAcks() { + # create a single message + executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi" + # send a reply + executeOk_servald meshms send message $SIDA2 $SIDA1 "Hello" + # list from the sender, our message should be marked as delivered + executeOk_servald meshms list messages $SIDA1 $SIDA2 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:11:-1:<:Hello\$" + assertStdoutGrep --stdout --matches=1 "^1:3:$rexp_age:ACK:delivered\$" + assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:>:Hi\$" + assertStdoutLineCount '==' 5 +} + +doc_MarkRead="Marking messages as read acknowledges delivery" +setup_MarkRead() { + setup_common 2 +} +test_MarkRead() { + # create a single message + executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi" + # list it,k confirm it is unread + executeOk_servald meshms list conversations $SIDA2 + assertStdoutGrep --stdout --matches=1 ":$SIDA1:unread:5:0\$" + # mark it as read + executeOk_servald meshms read messages $SIDA2 $SIDA1 + # check that the read marker is listed + executeOk_servald meshms list messages $SIDA2 $SIDA1 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:5:$rexp_age:MARK:read\$" + assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:<:Hi\$" + assertStdoutLineCount '==' 4 + # list it again, confirm that it is read + executeOk_servald meshms list conversations $SIDA2 + assertStdoutGrep --stdout --matches=1 ":$SIDA1::5:5\$" + # list from the sender, should be delivered + executeOk_servald meshms list messages $SIDA1 $SIDA2 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:3:$rexp_age:ACK:delivered\$" + assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:>:Hi\$" + assertStdoutLineCount '==' 4 + # send another message + executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you" + # list from the receiver should remember the mark offset + executeOk_servald meshms list messages $SIDA2 $SIDA1 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:MARK:read\$" + assertStdoutLineCount '==' 5 +} + check_meshms_bundles() { # Dump the MeshMS bundles to the log and check consistency # The only "file" bundle should be the conversation list @@ -131,10 +205,10 @@ setup_MessageThreading() { foreach_instance +A +B create_single_identity foreach_instance +A +B setup_logging set_instance +A - executeOk_servald meshms send message $SIDA $SIDB "Hello can you hear me" + executeOk_servald meshms send message --mark-known $SIDA $SIDB "Hello can you hear me" executeOk_servald meshms send message $SIDA $SIDB "Still waiting" set_instance +B - executeOk_servald meshms send message $SIDB $SIDA "Help Im trapped in a test case factory" + executeOk_servald meshms send message --mark-known $SIDB $SIDA "Help Im trapped in a test case factory" executeOk_servald meshms send message $SIDB $SIDA "Never mind" start_servald_instances +A +B } @@ -166,9 +240,9 @@ setup_reorderList() { create_identities 5 setup_logging # ensure conversations are known - executeOk_servald meshms send message $SIDA1 $SIDA2 "Start" - executeOk_servald meshms send message $SIDA1 $SIDA3 "Start" - executeOk_servald meshms send message $SIDA1 $SIDA4 "Start" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Start" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA3 "Start" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Start" } test_reorderList() { # new incoming messages should bump to the top @@ -198,10 +272,10 @@ setup_listConversations() { create_identities 5 setup_logging # create 3 threads, with all permutations of incoming and outgoing messages - executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1" - executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2" - executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3" - executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Message1" + executeOk_servald meshms send message --mark-known $SIDA3 $SIDA1 "Message2" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Message3" + executeOk_servald meshms send message --mark-known $SIDA4 $SIDA1 "Message4" } test_listConversations() { executeOk_servald meshms list conversations --include-message $SIDA1 diff --git a/tests/meshmsrestful b/tests/meshmsrestful index 50ff0295..8e9c4f58 100755 --- a/tests/meshmsrestful +++ b/tests/meshmsrestful @@ -121,10 +121,10 @@ setup_MeshmsListConversations() { IDENTITY_COUNT=5 setup # create 3 threads, with all permutations of incoming and outgoing messages - executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1" - executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2" - executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3" - executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Message1" + executeOk_servald meshms send message --mark-known $SIDA3 $SIDA1 "Message2" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Message3" + executeOk_servald meshms send message --mark-known $SIDA4 $SIDA1 "Message4" } test_MeshmsListConversations() { executeOk curl \ @@ -586,9 +586,9 @@ setup_MeshmsReadAllConversations() { IDENTITY_COUNT=5 setup # create 3 threads, with all permutations of incoming and outgoing messages - executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Message1" executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2" - executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3" + executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Message3" executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4" executeOk_servald meshms list conversations $SIDA1 assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"