Don't ack messages from unknown peers, & allow for blocking

This commit is contained in:
Jeremy Lakeman 2016-08-23 14:46:30 +09:30
parent e13b9c3c94
commit 04991c1a79
7 changed files with 176 additions and 67 deletions

124
meshms.c
View File

@ -31,6 +31,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "dataformats.h"
#include "overlay_buffer.h"
#define FLAG_BLOCKED (1<<0)
#define FLAG_KNOWN (1<<1)
static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_sid, const uint64_t offset);
void meshms_free_conversations(struct meshms_conversations *conv)
@ -310,7 +313,7 @@ static enum meshms_status update_conversation(const keyring_identity *id, struct
if (meshms_failed(status))
return status;
if (conv->metadata.my_last_ack >= conv->metadata.their_last_message)
if (conv->metadata.my_last_ack >= conv->metadata.their_last_message || !conv->known)
return status;
// append an ack for their message
@ -406,7 +409,7 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, struct m
int ofs = SID_SIZE;
uint8_t flags = 0; // TODO flags
uint8_t flags = FLAG_KNOWN;
struct meshms_metadata metadata;
bzero(&metadata, sizeof metadata);
int unpacked;
@ -428,37 +431,39 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, struct m
flags = buffer[ofs++];
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.their_size)) == -1)
break;
ofs += unpacked;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.my_size)) == -1)
break;
ofs += unpacked;
if (!(flags & FLAG_BLOCKED)){
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.their_size)) == -1)
break;
ofs += unpacked;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.my_size)) == -1)
break;
ofs += unpacked;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_message = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_message = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.read_offset = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.read_offset = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_ack_offset = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_ack_offset = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.my_last_ack = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.my_last_ack = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_ack = metadata.my_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_ack = metadata.my_size - delta;
}
}
read.offset += ofs - bytes;
@ -471,10 +476,13 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, struct m
ptr = &n->_next;
n->them = *sid;
n->blocked = (flags & FLAG_BLOCKED) ? 1 : 0;
n->known = (flags & FLAG_KNOWN) ? 1 : 0;
n->metadata = metadata;
DEBUGF(meshms, "Unpacked existing conversation for %s (their_size=%"PRIu64", my_size=%"PRIu64", last_message=%"PRIu64", read_offset=%"PRIu64", my_ack=%"PRIu64", their_ack=%"PRIu64")",
DEBUGF(meshms, "Unpacked existing conversation for %s (%stheir_size=%"PRIu64", my_size=%"PRIu64", last_message=%"PRIu64", read_offset=%"PRIu64", my_ack=%"PRIu64", their_ack=%"PRIu64")",
alloca_tohex_sid_t(*sid),
(n->blocked ? "BLOCKED, " : ""),
metadata.their_size,
metadata.my_size,
metadata.their_last_message,
@ -490,28 +498,36 @@ end:
static ssize_t write_conversation(struct rhizome_write *write, struct meshms_conversations *conv)
{
if (!conv->known)
return 0;
size_t len=0;
unsigned char buffer[sizeof(conv->them) + (12*3) + 1];
bcopy(conv->them.binary, buffer, sizeof(conv->them));
len+=sizeof(conv->them);
buffer[len++] = 0; // TODO reserved for flags
buffer[len++] =
(conv->blocked ? FLAG_BLOCKED : 0) |
(conv->known ? FLAG_KNOWN : 0)
;
assert(conv->metadata.their_size >= conv->metadata.their_last_message);
assert(conv->metadata.their_size >= conv->metadata.read_offset);
assert(conv->metadata.their_size >= conv->metadata.my_last_ack);
assert(conv->metadata.their_size >= conv->metadata.their_last_ack_offset);
assert(conv->metadata.my_size >= conv->metadata.their_last_ack);
if (!conv->blocked) {
assert(conv->metadata.their_size >= conv->metadata.their_last_message);
assert(conv->metadata.their_size >= conv->metadata.read_offset);
assert(conv->metadata.their_size >= conv->metadata.my_last_ack);
assert(conv->metadata.their_size >= conv->metadata.their_last_ack_offset);
assert(conv->metadata.my_size >= conv->metadata.their_last_ack);
// assume that most ack & read offsets are going to be near the ply length
// so store them as delta's.
len+=pack_uint(&buffer[len], conv->metadata.their_size);
len+=pack_uint(&buffer[len], conv->metadata.my_size);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_message);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.read_offset);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_ack_offset);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.my_last_ack);
len+=pack_uint(&buffer[len], conv->metadata.my_size - conv->metadata.their_last_ack);
// assume that most ack & read offsets are going to be near the ply length
// so store them as delta's.
len+=pack_uint(&buffer[len], conv->metadata.their_size);
len+=pack_uint(&buffer[len], conv->metadata.my_size);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_message);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.read_offset);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_ack_offset);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.my_last_ack);
len+=pack_uint(&buffer[len], conv->metadata.my_size - conv->metadata.their_last_ack);
}
assert(len <= sizeof buffer);
@ -876,7 +892,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
}
}
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len)
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, uint8_t mark_known, const char *message, size_t message_len)
{
assert(message_len != 0);
if (message_len > MESSAGE_PLY_MAX_LEN) {
@ -912,6 +928,9 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
status = MESHMS_STATUS_UPDATED;
}
if (mark_known)
c->known = 1;
enum meshms_status tmp_status = update_stats(c);
if (meshms_failed(tmp_status))
return tmp_status;
@ -979,6 +998,19 @@ enum meshms_status meshms_mark_read(const sid_t *sender, const sid_t *recipient,
if (meshms_failed(status = meshms_open_list(id, m, &conv)))
goto end;
if (recipient){
// implicitly mark this conversation as "known"
struct meshms_conversations *c = conv;
while (c){
if (cmp_sid_t(&conv->them, recipient)==0){
c->known = 1;
break;
}
c = c->_next;
}
}
unsigned changed = 0;
// check if any incoming conversations need to be acked or have new messages
if (meshms_failed(status = update_conversations(id, &conv)))
@ -1002,7 +1034,7 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_
{
unsigned ret=0;
while (conv){
if (!their_sid || cmp_sid_t(&conv->them, their_sid)==0){
if (conv->known && (!their_sid || cmp_sid_t(&conv->them, their_sid)==0)){
// update read offset
// - never past their last message
// - never rewind, only advance

View File

@ -76,6 +76,9 @@ struct meshms_conversations {
struct message_ply their_ply;
struct meshms_metadata metadata;
uint8_t blocked:1;
uint8_t known:1;
};
/* Fetch the list of all MeshMS conversations into a binary tree whose nodes
@ -160,8 +163,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
* MESHMS_STATUS_UPDATED on success, any other value indicates a failure or
* error (which is already logged).
*/
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len);
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, uint8_t mark_known, const char *message, size_t message_len);
/* Update the read offset for one or more conversations. Returns
* MESHMS_STATUS_UPDATED on success, any other value indicates a failure or

View File

@ -94,7 +94,7 @@ end:
DEFINE_CMD(app_meshms_send_message, 0,
"Send a MeshMS message from <sender_sid> to <recipient_sid>",
"meshms","send","message" KEYRING_PIN_OPTIONS, "<sender_sid>", "<recipient_sid>", "<payload>");
"meshms","send","message" KEYRING_PIN_OPTIONS, "[--mark-known]", "<sender_sid>", "<recipient_sid>", "<payload>");
static int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *my_sidhex, *their_sidhex, *message;
@ -102,6 +102,7 @@ static int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_c
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "payload", &message, NULL, "") == -1)
return -1;
uint8_t mark_known = (cli_arg(parsed, "--mark-known", NULL, NULL, NULL) == 0);
sid_t my_sid, their_sid;
if (str_to_sid_t(&my_sid, my_sidhex) == -1)
@ -120,7 +121,7 @@ static int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_c
}
// include terminating NUL
enum meshms_status status = meshms_send_message(&my_sid, &their_sid, message, strlen(message) + 1);
enum meshms_status status = meshms_send_message(&my_sid, &their_sid, mark_known, message, strlen(message) + 1);
keyring_free(keyring);
keyring = NULL;
return meshms_failed(status) ? status : 0;

View File

@ -673,7 +673,7 @@ static int restful_meshms_sendmessage_end(struct http_request *hr)
assert(r->u.sendmsg.message.length > 0);
assert(r->u.sendmsg.message.length <= MESSAGE_PLY_MAX_LEN);
enum meshms_status status;
if (meshms_failed(status = meshms_send_message(&r->sid1, &r->sid2, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length)))
if (meshms_failed(status = meshms_send_message(&r->sid1, &r->sid2, 1, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length)))
return http_request_meshms_response(r, 0, NULL, status);
return http_request_meshms_response(r, 201, "Message sent", status);
}

View File

@ -57,7 +57,7 @@ meshms_send_message() {
--form "message=$text;type=text/plain;charset=utf-8" \
"http://$addr_localhost:$MESHMS_RESTFUL_PORT/restful/meshms/$sid_sender/$sid_recipient/sendmessage"
else
executeOk_servald meshms send message $sid_sender $sid_recipient "$text"
executeOk_servald meshms send message --mark-known $sid_sender $sid_recipient "$text"
fi
}

View File

@ -91,6 +91,80 @@ test_SendAndList() {
assertStdoutLineCount '==' 5
}
doc_UnexpectedMessages="Unsolicited messages are not ACK'd when listing"
setup_UnexpectedMessages() {
setup_common 2
}
test_UnexpectedMessages() {
# create a single message
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
# list it from the receiver
executeOk_servald meshms list messages $SIDA2 $SIDA1
tfw_cat --stdout
# timestamp is unknown, because the message has not been acked.
assertStdoutGrep --stdout --matches=1 "^0:5:-1:<:Hi\$"
assertStdoutLineCount '==' 3
# list again from the sender, has not been delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:5:$rexp_age:>:Hi\$"
assertStdoutLineCount '==' 3
}
doc_SendingAcks="Sending a message acknowledges delivery"
setup_SendingAcks() {
setup_common 2
}
test_SendingAcks() {
# create a single message
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
# send a reply
executeOk_servald meshms send message $SIDA2 $SIDA1 "Hello"
# list from the sender, our message should be marked as delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:11:-1:<:Hello\$"
assertStdoutGrep --stdout --matches=1 "^1:3:$rexp_age:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:>:Hi\$"
assertStdoutLineCount '==' 5
}
doc_MarkRead="Marking messages as read acknowledges delivery"
setup_MarkRead() {
setup_common 2
}
test_MarkRead() {
# create a single message
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
# list it,k confirm it is unread
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stdout --matches=1 ":$SIDA1:unread:5:0\$"
# mark it as read
executeOk_servald meshms read messages $SIDA2 $SIDA1
# check that the read marker is listed
executeOk_servald meshms list messages $SIDA2 $SIDA1
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:5:$rexp_age:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:<:Hi\$"
assertStdoutLineCount '==' 4
# list it again, confirm that it is read
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stdout --matches=1 ":$SIDA1::5:5\$"
# list from the sender, should be delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:3:$rexp_age:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:>:Hi\$"
assertStdoutLineCount '==' 4
# send another message
executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you"
# list from the receiver should remember the mark offset
executeOk_servald meshms list messages $SIDA2 $SIDA1
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:MARK:read\$"
assertStdoutLineCount '==' 5
}
check_meshms_bundles() {
# Dump the MeshMS bundles to the log and check consistency
# The only "file" bundle should be the conversation list
@ -131,10 +205,10 @@ setup_MessageThreading() {
foreach_instance +A +B create_single_identity
foreach_instance +A +B setup_logging
set_instance +A
executeOk_servald meshms send message $SIDA $SIDB "Hello can you hear me"
executeOk_servald meshms send message --mark-known $SIDA $SIDB "Hello can you hear me"
executeOk_servald meshms send message $SIDA $SIDB "Still waiting"
set_instance +B
executeOk_servald meshms send message $SIDB $SIDA "Help Im trapped in a test case factory"
executeOk_servald meshms send message --mark-known $SIDB $SIDA "Help Im trapped in a test case factory"
executeOk_servald meshms send message $SIDB $SIDA "Never mind"
start_servald_instances +A +B
}
@ -166,9 +240,9 @@ setup_reorderList() {
create_identities 5
setup_logging
# ensure conversations are known
executeOk_servald meshms send message $SIDA1 $SIDA2 "Start"
executeOk_servald meshms send message $SIDA1 $SIDA3 "Start"
executeOk_servald meshms send message $SIDA1 $SIDA4 "Start"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Start"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA3 "Start"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Start"
}
test_reorderList() {
# new incoming messages should bump to the top
@ -198,10 +272,10 @@ setup_listConversations() {
create_identities 5
setup_logging
# create 3 threads, with all permutations of incoming and outgoing messages
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1"
executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2"
executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3"
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Message1"
executeOk_servald meshms send message --mark-known $SIDA3 $SIDA1 "Message2"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Message3"
executeOk_servald meshms send message --mark-known $SIDA4 $SIDA1 "Message4"
}
test_listConversations() {
executeOk_servald meshms list conversations --include-message $SIDA1

View File

@ -121,10 +121,10 @@ setup_MeshmsListConversations() {
IDENTITY_COUNT=5
setup
# create 3 threads, with all permutations of incoming and outgoing messages
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1"
executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2"
executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3"
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Message1"
executeOk_servald meshms send message --mark-known $SIDA3 $SIDA1 "Message2"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Message3"
executeOk_servald meshms send message --mark-known $SIDA4 $SIDA1 "Message4"
}
test_MeshmsListConversations() {
executeOk curl \
@ -586,9 +586,9 @@ setup_MeshmsReadAllConversations() {
IDENTITY_COUNT=5
setup
# create 3 threads, with all permutations of incoming and outgoing messages
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA2 "Message1"
executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2"
executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3"
executeOk_servald meshms send message --mark-known $SIDA1 $SIDA4 "Message3"
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4"
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"