Add peer status to meshms conversations

This commit is contained in:
Jeremy Lakeman 2014-08-18 18:35:18 +09:30
parent a37db8e958
commit 39db4efb6a
3 changed files with 73 additions and 34 deletions

View File

@ -106,6 +106,7 @@ static struct meshms_conversations *add_conv(struct meshms_conversations **conv,
if (*ptr == NULL && (*ptr = emalloc_zero(sizeof(struct meshms_conversations))) != NULL) {
(*ptr)->_parent = parent;
(*ptr)->them = *them;
(*ptr)->peer_status = PEER_REQUESTING; // assume the user hasn't made a decision
}
return *ptr;
}
@ -164,6 +165,11 @@ static enum meshms_status get_database_conversations(const sid_t *my_sid, const
struct meshms_conversations *ptr = add_conv(conv, &their_sid);
if (!ptr)
break;
// completely ignore the conversation contents of blocked peers
if (ptr->peer_status == PEER_BLOCKED){
// TODO delete ply?
}else{
struct meshms_ply *p;
if (them==sender){
ptr->found_their_ply=1;
@ -177,6 +183,7 @@ static enum meshms_status get_database_conversations(const sid_t *my_sid, const
p->tail = tail;
p->size = size;
}
}
sqlite3_finalize(statement);
if (r==-1)
return MESHMS_STATUS_ERROR;
@ -416,7 +423,7 @@ static enum meshms_status update_conversation(const sid_t *my_sid, struct meshms
if (config.debug.meshms)
DEBUG("Checking if conversation needs to be acked");
// Nothing to be done if they have never sent us anything
// Nothing to be done if they have been blocked or never sent us anything
if (!conv->found_their_ply)
return MESHMS_STATUS_OK;
@ -554,8 +561,8 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
ssize_t r = rhizome_read_buffered(&read, &buff, &version, 1);
if (r == -1)
goto end;
if (version != 1) {
WARNF("Expected version 1 (got 0x%02x)", version);
if (version < 1 || version > 2) {
WARNF("Expected version 1-2 (got 0x%02x)", version);
goto fault;
}
while (1) {
@ -565,13 +572,16 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
status = MESHMS_STATUS_OK;
goto end;
}
if (r != sizeof sid.binary)
if (r != sizeof sid.binary){
WHYF("Expected %d bytes, got %d", sizeof sid.binary, r);
break;
}
if (config.debug.meshms)
DEBUGF("Reading existing conversation for %s", alloca_tohex_sid_t(sid));
// unpack the stored details first so we know where the next record is
unsigned char details[12*3];
// unpack all stored details, even if we need to filter them out
// so we know where to parse the next record
unsigned char details[12*3+1];
r = rhizome_read_buffered(&read, &buff, details, sizeof details);
if (r == -1)
break;
@ -594,6 +604,14 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
if (unpacked == -1)
break;
ofs += unpacked;
uint8_t peer_status = PEER_REQUESTING;
if (version>=2){
if (bytes <1)
break;
peer_status = details[ofs++];
}
read.offset += ofs - bytes;
// skip uninteresting records
@ -602,10 +620,11 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
struct meshms_conversations *ptr = add_conv(conv, &sid);
if (!ptr)
goto end;
break;
ptr->their_last_message = last_message;
ptr->read_offset = read_offset;
ptr->their_size = their_size;
ptr->peer_status = peer_status;
}
fault:
status = MESHMS_STATUS_PROTOCOL_FAULT;
@ -620,7 +639,7 @@ static ssize_t write_conversation(struct rhizome_write *write, struct meshms_con
if (!conv)
return len;
{
unsigned char buffer[sizeof(conv->them) + (8*3)];
unsigned char buffer[sizeof(conv->them) + (8*3) + 1];
if (write)
bcopy(conv->them.binary, buffer, sizeof(conv->them));
len+=sizeof(conv->them);
@ -628,6 +647,7 @@ static ssize_t write_conversation(struct rhizome_write *write, struct meshms_con
len+=pack_uint(&buffer[len], conv->their_last_message);
len+=pack_uint(&buffer[len], conv->read_offset);
len+=pack_uint(&buffer[len], conv->their_size);
buffer[len++]=conv->peer_status;
int ret=rhizome_write_buffer(write, buffer, len);
if (ret == -1)
return ret;
@ -635,6 +655,7 @@ static ssize_t write_conversation(struct rhizome_write *write, struct meshms_con
len+=measure_packed_uint(conv->their_last_message);
len+=measure_packed_uint(conv->read_offset);
len+=measure_packed_uint(conv->their_size);
len+=1;
}
if (config.debug.meshms)
DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %zu",
@ -681,7 +702,7 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct
// TODO log something?
goto end;
unsigned char version=1;
unsigned char version=2;
if (rhizome_write_buffer(&write, &version, 1) == -1)
goto end;
if (write_conversation(&write, conv) == -1)
@ -964,6 +985,10 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
enum meshms_status status;
if (!meshms_failed(status = find_or_create_conv(sender, recipient, &conv))) {
assert(conv != NULL);
if (conv->peer_status == PEER_BLOCKED){
WHY("this recipient has been blocked");
return MESHMS_STATUS_ERROR;
}else{
// construct a message payload
// TODO, new format here.
unsigned char buffer[message_len + 4 + 6];
@ -975,6 +1000,7 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
message_len+=append_timestamp(buffer + message_len);
status = append_meshms_buffer(sender, conv, buffer, message_len);
}
}
meshms_free_conversations(conv);
return status;
}
@ -1060,10 +1086,10 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_
return status;
}
const char *names[]={
"_id","recipient","read", "last_message", "read_offset"
"_id","status","recipient","read", "last_message", "read_offset"
};
cli_columns(context, 5, names);
cli_columns(context, 6, names);
int rows = 0;
if (conv) {
struct meshms_conversation_iterator it;
@ -1073,6 +1099,12 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_
) {
if (rows >= offset) {
cli_put_long(context, rows, ":");
switch(it.current->peer_status){
default:
case PEER_REQUESTING: cli_put_string(context, "REQUEST", ":"); break;
case PEER_ALLOWED: cli_put_string(context, "", ":"); break;
case PEER_BLOCKED: cli_put_string(context, "BLOCKED", ":"); break;
}
cli_put_hexvalue(context, it.current->them.binary, sizeof(it.current->them), ":");
cli_put_string(context, it.current->read_offset < it.current->their_last_message ? "unread":"", ":");
cli_put_long(context, it.current->their_last_message, ":");
@ -1219,10 +1251,11 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_
int cmp = their_sid ? cmp_sid_t(&conv->them, their_sid) : 0;
if (!their_sid || cmp<0)
ret += mark_read(conv->_left, their_sid, offset);
if (!their_sid || cmp==0){
if (conv->peer_status != PEER_BLOCKED && (!their_sid || cmp==0)){
// update read offset
// - never past their last message
// - never rewind, only advance
// - ignore blocked conversations
uint64_t new_offset = offset;
if (new_offset > conv->their_last_message)
new_offset = conv->their_last_message;

View File

@ -80,6 +80,12 @@ struct meshms_conversations {
uint64_t read_offset;
// our cached value for the last known size of their ply
uint64_t their_size;
enum meshms_peer_status{
PEER_REQUESTING=0,
PEER_ALLOWED,
PEER_BLOCKED,
} peer_status;
};
// cursor state for reading one half of a conversation

View File

@ -180,8 +180,8 @@ setup_listConversations() {
test_listConversations() {
executeOk_servald meshms list conversations $SIDA1
tfw_cat --stdout
assertStdoutIs --stderr --line=1 -e '5\n'
assertStdoutIs --stderr --line=2 -e '_id:recipient:read:last_message:read_offset\n'
assertStdoutIs --stderr --line=1 -e '6\n'
assertStdoutIs --stderr --line=2 -e '_id:status:recipient:read:last_message:read_offset\n'
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"