New messages bump the conversation to the top

This commit is contained in:
Jeremy Lakeman 2014-10-27 11:08:02 +10:30
parent bcd9575b54
commit 1848493468
3 changed files with 114 additions and 104 deletions

183
meshms.c
View File

@ -38,12 +38,10 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_
void meshms_free_conversations(struct meshms_conversations *conv) void meshms_free_conversations(struct meshms_conversations *conv)
{ {
if (conv) { while(conv){
if (conv->_parent && conv != conv->_parent->_left && conv != conv->_parent->_right) struct meshms_conversations *n = conv;
WHYF("deformed MeshMS conversation tree"); conv = conv->_next;
meshms_free_conversations(conv->_left); free(n);
meshms_free_conversations(conv->_right);
free(conv);
} }
} }
@ -92,22 +90,19 @@ static enum meshms_status get_my_conversation_bundle(const sid_t *my_sidp, rhizo
static struct meshms_conversations *add_conv(struct meshms_conversations **conv, const sid_t *them) static struct meshms_conversations *add_conv(struct meshms_conversations **conv, const sid_t *them)
{ {
struct meshms_conversations **ptr = conv; struct meshms_conversations **ptr = conv;
struct meshms_conversations *parent = NULL;
while (*ptr) { while (*ptr) {
int cmp = cmp_sid_t(&(*ptr)->them, them); int cmp = cmp_sid_t(&(*ptr)->them, them);
if (cmp == 0) if (cmp == 0)
break; return *ptr;
parent = *ptr; ptr=&(*ptr)->_next;
if (cmp < 0)
ptr = &(*ptr)->_left;
else
ptr = &(*ptr)->_right;
} }
if (*ptr == NULL && (*ptr = emalloc_zero(sizeof(struct meshms_conversations))) != NULL) { struct meshms_conversations *n = emalloc_zero(sizeof(struct meshms_conversations));
(*ptr)->_parent = parent; if (n){
(*ptr)->them = *them; n->them = *them;
n->_next = *conv;
*conv = n;
} }
return *ptr; return n;
} }
// find matching conversations // find matching conversations
@ -511,25 +506,28 @@ end:
} }
// update conversations, and return MESHMS_STATUS_UPDATED if the conversation index should be saved // update conversations, and return MESHMS_STATUS_UPDATED if the conversation index should be saved
static enum meshms_status update_conversations(const sid_t *my_sid, struct meshms_conversations *conv) static enum meshms_status update_conversations(const sid_t *my_sid, struct meshms_conversations **conv)
{ {
enum meshms_status rstatus = MESHMS_STATUS_OK; enum meshms_status rstatus = MESHMS_STATUS_OK;
if (conv) { struct meshms_conversations **ptr = conv;
enum meshms_status status; while (*ptr) {
if (meshms_failed(status = update_conversations(my_sid, conv->_left))) struct meshms_conversations *n = *ptr;
return status; if (n->their_size != n->their_ply.size) {
if (status == MESHMS_STATUS_UPDATED) enum meshms_status status;
rstatus = MESHMS_STATUS_UPDATED; if (meshms_failed(status = update_conversation(my_sid, n)))
if (conv->their_size != conv->their_ply.size) {
if (meshms_failed(status = update_conversation(my_sid, conv)))
return status; return status;
if (status == MESHMS_STATUS_UPDATED) if (status == MESHMS_STATUS_UPDATED){
rstatus = MESHMS_STATUS_UPDATED; rstatus = MESHMS_STATUS_UPDATED;
if (config.debug.meshms)
DEBUGF("Bumping conversation from %s", alloca_tohex_sid_t(n->them));
// bump to head of list
*ptr = n->_next;
n->_next = *conv;
*conv = n;
continue;
}
} }
if (meshms_failed(status = update_conversations(my_sid, conv->_right))) ptr = &(*ptr)->_next;
return status;
if (status == MESHMS_STATUS_UPDATED)
rstatus = MESHMS_STATUS_UPDATED;
} }
return rstatus; return rstatus;
} }
@ -541,6 +539,7 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
if (m->haveSecret==NEW_BUNDLE_ID) if (m->haveSecret==NEW_BUNDLE_ID)
return MESHMS_STATUS_OK; return MESHMS_STATUS_OK;
struct meshms_conversations **ptr = conv;
struct rhizome_read read; struct rhizome_read read;
bzero(&read, sizeof(read)); bzero(&read, sizeof(read));
struct rhizome_read_buffer buff; struct rhizome_read_buffer buff;
@ -563,6 +562,7 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
WARNF("Expected version 1 (got 0x%02x)", version); WARNF("Expected version 1 (got 0x%02x)", version);
goto fault; goto fault;
} }
while (1) { while (1) {
sid_t sid; sid_t sid;
r = rhizome_read_buffered(&read, &buff, sid.binary, sizeof sid.binary); r = rhizome_read_buffered(&read, &buff, sid.binary, sizeof sid.binary);
@ -605,12 +605,17 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
if (their_sid && cmp_sid_t(&sid, their_sid) != 0) if (their_sid && cmp_sid_t(&sid, their_sid) != 0)
continue; continue;
struct meshms_conversations *ptr = add_conv(conv, &sid); struct meshms_conversations *n = emalloc_zero(sizeof(struct meshms_conversations));
if (!ptr) if (!n)
goto end; goto end;
ptr->their_last_message = last_message;
ptr->read_offset = read_offset; *ptr = n;
ptr->their_size = their_size; ptr = &n->_next;
n->them = sid;
n->their_last_message = last_message;
n->read_offset = read_offset;
n->their_size = their_size;
} }
fault: fault:
status = MESHMS_STATUS_PROTOCOL_FAULT; status = MESHMS_STATUS_PROTOCOL_FAULT;
@ -622,42 +627,42 @@ end:
static ssize_t write_conversation(struct rhizome_write *write, struct meshms_conversations *conv) static ssize_t write_conversation(struct rhizome_write *write, struct meshms_conversations *conv)
{ {
size_t len=0; size_t len=0;
if (!conv) unsigned char buffer[sizeof(conv->them) + (8*3)];
return len; if (write)
{ bcopy(conv->them.binary, buffer, sizeof(conv->them));
unsigned char buffer[sizeof(conv->them) + (8*3)]; len+=sizeof(conv->them);
if (write) if (write){
bcopy(conv->them.binary, buffer, sizeof(conv->them)); len+=pack_uint(&buffer[len], conv->their_last_message);
len+=sizeof(conv->them); len+=pack_uint(&buffer[len], conv->read_offset);
if (write){ len+=pack_uint(&buffer[len], conv->their_size);
len+=pack_uint(&buffer[len], conv->their_last_message); int ret=rhizome_write_buffer(write, buffer, len);
len+=pack_uint(&buffer[len], conv->read_offset); if (ret == -1)
len+=pack_uint(&buffer[len], conv->their_size); return ret;
int ret=rhizome_write_buffer(write, buffer, len); }else{
if (ret == -1) len+=measure_packed_uint(conv->their_last_message);
return ret; len+=measure_packed_uint(conv->read_offset);
}else{ len+=measure_packed_uint(conv->their_size);
len+=measure_packed_uint(conv->their_last_message); }
len+=measure_packed_uint(conv->read_offset); if (config.debug.meshms)
len+=measure_packed_uint(conv->their_size); DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %zu",
} alloca_tohex_sid_t(conv->them),
if (config.debug.meshms) conv->their_last_message,
DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %zu", conv->read_offset,
alloca_tohex_sid_t(conv->them), conv->their_size,
conv->their_last_message, len);
conv->read_offset, return len;
conv->their_size, }
len);
static ssize_t write_conversations(struct rhizome_write *write, struct meshms_conversations *conv)
{
ssize_t len=0;
while(conv){
ssize_t this_len = write_conversation(write, conv);
if (this_len==-1)
return this_len;
len+=this_len;
conv = conv->_next;
} }
// write the two child nodes
ssize_t ret = write_conversation(write, conv->_left);
if (ret == -1)
return ret;
len += (size_t) ret;
ret = write_conversation(write, conv->_right);
if (ret == -1)
return ret;
len += (size_t) ret;
return len; return len;
} }
@ -672,7 +677,7 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct
// TODO rebalance tree... // TODO rebalance tree...
// measure the final payload first // measure the final payload first
ssize_t len=write_conversation(NULL, conv); ssize_t len=write_conversations(NULL, conv);
if (len == -1) if (len == -1)
goto end; goto end;
@ -689,7 +694,7 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct
unsigned char version=1; unsigned char version=1;
if (rhizome_write_buffer(&write, &version, 1) == -1) if (rhizome_write_buffer(&write, &version, 1) == -1)
goto end; goto end;
if (write_conversation(&write, conv) == -1) if (write_conversations(&write, conv) == -1)
goto end; goto end;
pstatus = rhizome_finish_write(&write); pstatus = rhizome_finish_write(&write);
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW) if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW)
@ -748,7 +753,7 @@ enum meshms_status meshms_conversations_list(const sid_t *my_sid, const sid_t *t
goto end; goto end;
if (meshms_failed(status = get_database_conversations(my_sid, their_sid, conv))) if (meshms_failed(status = get_database_conversations(my_sid, their_sid, conv)))
goto end; goto end;
if ((status = update_conversations(my_sid, *conv)) == MESHMS_STATUS_UPDATED && their_sid == NULL) if ((status = update_conversations(my_sid, conv)) == MESHMS_STATUS_UPDATED && their_sid == NULL)
status = write_known_conversations(m, *conv); status = write_known_conversations(m, *conv);
end: end:
rhizome_manifest_free(m); rhizome_manifest_free(m);
@ -763,11 +768,7 @@ end:
*/ */
void meshms_conversation_iterator_start(struct meshms_conversation_iterator *it, struct meshms_conversations *conv) void meshms_conversation_iterator_start(struct meshms_conversation_iterator *it, struct meshms_conversations *conv)
{ {
assert(conv->_parent == NULL); // can only iterate over whole tree
it->current = conv; it->current = conv;
// infix traversal; descend to the leftmost leaf and start there
while (it->current->_left)
it->current = it->current->_left;
} }
/* Advance to the next conversation in the tree. /* Advance to the next conversation in the tree.
@ -777,20 +778,7 @@ void meshms_conversation_iterator_start(struct meshms_conversation_iterator *it,
void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *it) void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *it)
{ {
assert(it->current != NULL); // do not call on a finished iterator assert(it->current != NULL); // do not call on a finished iterator
if (it->current->_right) { it->current = it->current->_next;
it->current = it->current->_right;
while (it->current->_left)
it->current = it->current->_left;
}
else {
while (1) {
struct meshms_conversations *conv = it->current;
it->current = it->current->_parent;
if (it->current == NULL || conv == it->current->_left)
break;
assert(conv == it->current->_right);
}
}
} }
enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *iter, const sid_t *me, const sid_t *them) enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *iter, const sid_t *me, const sid_t *them)
@ -1007,7 +995,7 @@ enum meshms_status meshms_mark_read(const sid_t *sender, const sid_t *recipient,
goto end; goto end;
// check if any incoming conversations need to be acked or have new messages and update the read offset // check if any incoming conversations need to be acked or have new messages and update the read offset
unsigned changed = 0; unsigned changed = 0;
if (meshms_failed(status = update_conversations(sender, conv))) if (meshms_failed(status = update_conversations(sender, &conv)))
goto end; goto end;
if (status == MESHMS_STATUS_UPDATED) if (status == MESHMS_STATUS_UPDATED)
changed = 1; changed = 1;
@ -1220,10 +1208,8 @@ static int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_
static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_sid, const uint64_t offset) static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_sid, const uint64_t offset)
{ {
unsigned ret=0; unsigned ret=0;
if (conv){ while (conv){
int cmp = their_sid ? cmp_sid_t(&conv->them, their_sid) : 0; int cmp = their_sid ? cmp_sid_t(&conv->them, their_sid) : 0;
if (!their_sid || cmp<0)
ret += mark_read(conv->_left, their_sid, offset);
if (!their_sid || cmp==0){ if (!their_sid || cmp==0){
// update read offset // update read offset
// - never past their last message // - never past their last message
@ -1238,9 +1224,10 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_
conv->read_offset = new_offset; conv->read_offset = new_offset;
ret++; ret++;
} }
if (their_sid)
break;
} }
if (!their_sid || cmp>0) conv = conv->_next;
ret += mark_read(conv->_right, their_sid, offset);
} }
return ret; return ret;
} }

View File

@ -58,12 +58,7 @@ struct meshms_ply {
}; };
struct meshms_conversations { struct meshms_conversations {
// binary tree struct meshms_conversations *_next;
struct meshms_conversations *_left;
struct meshms_conversations *_right;
// keeping a pointer to parent node here means the traversal iterator does not need a stack, so
// there is no fixed limit on the tree depth
struct meshms_conversations *_parent;
// who are we talking to? // who are we talking to?
sid_t them; sid_t them;

View File

@ -165,6 +165,34 @@ test_MessageThreading() {
assertStdoutLineCount '==' 7 assertStdoutLineCount '==' 7
} }
doc_reorderList="New incoming messages force the conversation to the top"
setup_reorderList() {
setup_servald
set_instance +A
create_identities 5
setup_logging
}
test_reorderList() {
# new incoming messages should bump to the top
executeOk_servald meshms send message $SIDA4 $SIDA1 "Bump"
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 "^0:$SIDA4:unread:"
executeOk_servald meshms send message $SIDA3 $SIDA1 "Bump"
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 "^0:$SIDA3:unread:"
assertStdoutGrep --stderr --matches=1 "^1:$SIDA4:unread:"
executeOk_servald meshms send message $SIDA2 $SIDA1 "Bump"
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 "^0:$SIDA2:unread:"
assertStdoutGrep --stderr --matches=1 "^1:$SIDA3:unread:"
assertStdoutGrep --stderr --matches=1 "^2:$SIDA4:unread:"
executeOk_servald meshms send message $SIDA4 $SIDA1 "Bump"
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 "^0:$SIDA4:unread:"
assertStdoutGrep --stderr --matches=1 "^1:$SIDA2:unread:"
assertStdoutGrep --stderr --matches=1 "^2:$SIDA3:unread:"
}
doc_listConversations="List multiple conversations, with different numbers of messages" doc_listConversations="List multiple conversations, with different numbers of messages"
setup_listConversations() { setup_listConversations() {
setup_servald setup_servald