From 39db4efb6a43ddc02d4a098bcfcf1e401af597ed Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Mon, 18 Aug 2014 18:35:18 +0930 Subject: [PATCH] Add peer status to meshms conversations --- meshms.c | 97 +++++++++++++++++++++++++++++++++++----------------- meshms.h | 6 ++++ tests/meshms | 4 +-- 3 files changed, 73 insertions(+), 34 deletions(-) diff --git a/meshms.c b/meshms.c index 539949ac..7b7fe5a4 100644 --- a/meshms.c +++ b/meshms.c @@ -106,6 +106,7 @@ static struct meshms_conversations *add_conv(struct meshms_conversations **conv, if (*ptr == NULL && (*ptr = emalloc_zero(sizeof(struct meshms_conversations))) != NULL) { (*ptr)->_parent = parent; (*ptr)->them = *them; + (*ptr)->peer_status = PEER_REQUESTING; // assume the user hasn't made a decision } return *ptr; } @@ -164,18 +165,24 @@ static enum meshms_status get_database_conversations(const sid_t *my_sid, const struct meshms_conversations *ptr = add_conv(conv, &their_sid); if (!ptr) break; - struct meshms_ply *p; - if (them==sender){ - ptr->found_their_ply=1; - p=&ptr->their_ply; + + // completely ignore the conversation contents of blocked peers + if (ptr->peer_status == PEER_BLOCKED){ + // TODO delete ply? }else{ - ptr->found_my_ply=1; - p=&ptr->my_ply; + struct meshms_ply *p; + if (them==sender){ + ptr->found_their_ply=1; + p=&ptr->their_ply; + }else{ + ptr->found_my_ply=1; + p=&ptr->my_ply; + } + p->bundle_id = bid; + p->version = version; + p->tail = tail; + p->size = size; } - p->bundle_id = bid; - p->version = version; - p->tail = tail; - p->size = size; } sqlite3_finalize(statement); if (r==-1) @@ -416,7 +423,7 @@ static enum meshms_status update_conversation(const sid_t *my_sid, struct meshms if (config.debug.meshms) DEBUG("Checking if conversation needs to be acked"); - // Nothing to be done if they have never sent us anything + // Nothing to be done if they have been blocked or never sent us anything if (!conv->found_their_ply) return MESHMS_STATUS_OK; @@ -554,8 +561,8 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si ssize_t r = rhizome_read_buffered(&read, &buff, &version, 1); if (r == -1) goto end; - if (version != 1) { - WARNF("Expected version 1 (got 0x%02x)", version); + if (version < 1 || version > 2) { + WARNF("Expected version 1-2 (got 0x%02x)", version); goto fault; } while (1) { @@ -565,13 +572,16 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si status = MESHMS_STATUS_OK; goto end; } - if (r != sizeof sid.binary) + if (r != sizeof sid.binary){ + WHYF("Expected %d bytes, got %d", sizeof sid.binary, r); break; + } if (config.debug.meshms) DEBUGF("Reading existing conversation for %s", alloca_tohex_sid_t(sid)); - // unpack the stored details first so we know where the next record is - unsigned char details[12*3]; + // unpack all stored details, even if we need to filter them out + // so we know where to parse the next record + unsigned char details[12*3+1]; r = rhizome_read_buffered(&read, &buff, details, sizeof details); if (r == -1) break; @@ -594,6 +604,14 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si if (unpacked == -1) break; ofs += unpacked; + + uint8_t peer_status = PEER_REQUESTING; + if (version>=2){ + if (bytes <1) + break; + peer_status = details[ofs++]; + } + read.offset += ofs - bytes; // skip uninteresting records @@ -602,10 +620,11 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si struct meshms_conversations *ptr = add_conv(conv, &sid); if (!ptr) - goto end; + break; ptr->their_last_message = last_message; ptr->read_offset = read_offset; ptr->their_size = their_size; + ptr->peer_status = peer_status; } fault: status = MESHMS_STATUS_PROTOCOL_FAULT; @@ -620,7 +639,7 @@ static ssize_t write_conversation(struct rhizome_write *write, struct meshms_con if (!conv) return len; { - unsigned char buffer[sizeof(conv->them) + (8*3)]; + unsigned char buffer[sizeof(conv->them) + (8*3) + 1]; if (write) bcopy(conv->them.binary, buffer, sizeof(conv->them)); len+=sizeof(conv->them); @@ -628,6 +647,7 @@ static ssize_t write_conversation(struct rhizome_write *write, struct meshms_con len+=pack_uint(&buffer[len], conv->their_last_message); len+=pack_uint(&buffer[len], conv->read_offset); len+=pack_uint(&buffer[len], conv->their_size); + buffer[len++]=conv->peer_status; int ret=rhizome_write_buffer(write, buffer, len); if (ret == -1) return ret; @@ -635,6 +655,7 @@ static ssize_t write_conversation(struct rhizome_write *write, struct meshms_con len+=measure_packed_uint(conv->their_last_message); len+=measure_packed_uint(conv->read_offset); len+=measure_packed_uint(conv->their_size); + len+=1; } if (config.debug.meshms) DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %zu", @@ -681,7 +702,7 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct // TODO log something? goto end; - unsigned char version=1; + unsigned char version=2; if (rhizome_write_buffer(&write, &version, 1) == -1) goto end; if (write_conversation(&write, conv) == -1) @@ -964,16 +985,21 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie enum meshms_status status; if (!meshms_failed(status = find_or_create_conv(sender, recipient, &conv))) { assert(conv != NULL); - // construct a message payload - // TODO, new format here. - unsigned char buffer[message_len + 4 + 6]; - strncpy((char*)buffer, message, message_len); - // ensure message is NUL terminated - if (message[message_len - 1] != '\0') - buffer[message_len++] = '\0'; - message_len += append_footer(buffer + message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len); - message_len+=append_timestamp(buffer + message_len); - status = append_meshms_buffer(sender, conv, buffer, message_len); + if (conv->peer_status == PEER_BLOCKED){ + WHY("this recipient has been blocked"); + return MESHMS_STATUS_ERROR; + }else{ + // construct a message payload + // TODO, new format here. + unsigned char buffer[message_len + 4 + 6]; + strncpy((char*)buffer, message, message_len); + // ensure message is NUL terminated + if (message[message_len - 1] != '\0') + buffer[message_len++] = '\0'; + message_len += append_footer(buffer + message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len); + message_len+=append_timestamp(buffer + message_len); + status = append_meshms_buffer(sender, conv, buffer, message_len); + } } meshms_free_conversations(conv); return status; @@ -1060,10 +1086,10 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_ return status; } const char *names[]={ - "_id","recipient","read", "last_message", "read_offset" + "_id","status","recipient","read", "last_message", "read_offset" }; - cli_columns(context, 5, names); + cli_columns(context, 6, names); int rows = 0; if (conv) { struct meshms_conversation_iterator it; @@ -1073,6 +1099,12 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_ ) { if (rows >= offset) { cli_put_long(context, rows, ":"); + switch(it.current->peer_status){ + default: + case PEER_REQUESTING: cli_put_string(context, "REQUEST", ":"); break; + case PEER_ALLOWED: cli_put_string(context, "", ":"); break; + case PEER_BLOCKED: cli_put_string(context, "BLOCKED", ":"); break; + } cli_put_hexvalue(context, it.current->them.binary, sizeof(it.current->them), ":"); cli_put_string(context, it.current->read_offset < it.current->their_last_message ? "unread":"", ":"); cli_put_long(context, it.current->their_last_message, ":"); @@ -1219,10 +1251,11 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_ int cmp = their_sid ? cmp_sid_t(&conv->them, their_sid) : 0; if (!their_sid || cmp<0) ret += mark_read(conv->_left, their_sid, offset); - if (!their_sid || cmp==0){ + if (conv->peer_status != PEER_BLOCKED && (!their_sid || cmp==0)){ // update read offset // - never past their last message // - never rewind, only advance + // - ignore blocked conversations uint64_t new_offset = offset; if (new_offset > conv->their_last_message) new_offset = conv->their_last_message; diff --git a/meshms.h b/meshms.h index 661064ec..dd56323d 100644 --- a/meshms.h +++ b/meshms.h @@ -80,6 +80,12 @@ struct meshms_conversations { uint64_t read_offset; // our cached value for the last known size of their ply uint64_t their_size; + + enum meshms_peer_status{ + PEER_REQUESTING=0, + PEER_ALLOWED, + PEER_BLOCKED, + } peer_status; }; // cursor state for reading one half of a conversation diff --git a/tests/meshms b/tests/meshms index a07a6028..7b237716 100755 --- a/tests/meshms +++ b/tests/meshms @@ -180,8 +180,8 @@ setup_listConversations() { test_listConversations() { executeOk_servald meshms list conversations $SIDA1 tfw_cat --stdout - assertStdoutIs --stderr --line=1 -e '5\n' - assertStdoutIs --stderr --line=2 -e '_id:recipient:read:last_message:read_offset\n' + assertStdoutIs --stderr --line=1 -e '6\n' + assertStdoutIs --stderr --line=2 -e '_id:status:recipient:read:last_message:read_offset\n' assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$" assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$" assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"