Refactor MeshMS message-list code to use iterator

This commit is contained in:
Andrew Bettison 2014-01-24 11:41:31 +10:30
parent 8897563d09
commit 879395b121
2 changed files with 218 additions and 143 deletions

298
meshms.c
View File

@ -32,22 +32,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MESHMS_BLOCK_TYPE_MESSAGE 0x02
#define MESHMS_BLOCK_TYPE_BID_REFERENCE 0x03
// cursor state for reading one half of a conversation
struct ply_read {
// rhizome payload
struct rhizome_read read;
// block buffer
struct rhizome_read_buffer buff;
// details of the current record
uint64_t record_end_offset;
uint16_t record_length;
size_t buffer_size;
char type;
// raw record data
unsigned char *buffer;
};
void meshms_free_conversations(struct meshms_conversations *conv)
{
if (conv) {
@ -229,7 +213,7 @@ static int append_footer(unsigned char *buffer, char type, int payload_len)
return 2;
}
static int ply_read_open(struct ply_read *ply, const rhizome_bid_t *bid, rhizome_manifest *m)
static int ply_read_open(struct meshms_ply_read *ply, const rhizome_bid_t *bid, rhizome_manifest *m)
{
if (config.debug.meshms)
DEBUGF("Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
@ -245,7 +229,7 @@ static int ply_read_open(struct ply_read *ply, const rhizome_bid_t *bid, rhizome
return 0;
}
static void ply_read_close(struct ply_read *ply)
static void ply_read_close(struct meshms_ply_read *ply)
{
if (ply->buffer){
free(ply->buffer);
@ -258,7 +242,7 @@ static void ply_read_close(struct ply_read *ply)
// read the next record from the ply (backwards)
// returns 1 on EOF, -1 on failure
static int ply_read_next(struct ply_read *ply)
static int ply_read_next(struct meshms_ply_read *ply)
{
ply->record_end_offset = ply->read.offset;
unsigned char footer[2];
@ -311,7 +295,7 @@ static int ply_read_next(struct ply_read *ply)
}
// keep reading past messages until you find this type.
static int ply_find_next(struct ply_read *ply, char type){
static int ply_find_next(struct meshms_ply_read *ply, char type){
while(1){
int ret = ply_read_next(ply);
if (ret || ply->type==type)
@ -394,7 +378,7 @@ static int update_conversation(const sid_t *my_sid, struct meshms_conversations
if (!m_theirs)
return -1;
struct ply_read ply;
struct meshms_ply_read ply;
bzero(&ply, sizeof(ply));
int ret=-1;
@ -740,6 +724,119 @@ void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *i
}
}
int meshms_message_iterator_open(struct meshms_message_iterator *iter, const sid_t *sender, const sid_t *recipient)
{
bzero(iter, sizeof *iter);
if ((iter->_conv = find_or_create_conv(sender, recipient)) == NULL)
goto fail;
iter->received_read_offset = iter->_conv->read_offset;
// If sender has never sent a message, (or acked any recipient's), there are no messages in the
// thread.
if (iter->_conv->found_my_ply) {
if ((iter->_manifest_sender = rhizome_new_manifest()) == NULL)
goto fail;
if (ply_read_open(&iter->_read_sender, &iter->_conv->my_ply.bundle_id, iter->_manifest_sender))
goto fail;
if (iter->_conv->found_their_ply) {
if ((iter->_manifest_recipient = rhizome_new_manifest()) == NULL)
goto fail;
if (ply_read_open(&iter->_read_recipient, &iter->_conv->their_ply.bundle_id, iter->_manifest_recipient))
goto fail;
// Find the recipient's ACK so we know which messages have been delivered.
int r = ply_find_next(&iter->_read_recipient, MESHMS_BLOCK_TYPE_ACK);
if (r == 0) {
if (unpack_uint(iter->_read_recipient.buffer, iter->_read_recipient.record_length, &iter->sent_ack_offset) == -1)
iter->sent_ack_offset = 0;
else
iter->recipient_ack_offset = iter->_read_recipient.record_end_offset;
if (config.debug.meshms)
DEBUGF("Found recipient last ack @%"PRId64, iter->sent_ack_offset);
}
}
} else {
if (config.debug.meshms)
DEBUGF("Did not find sender's ply; no messages in thread");
}
iter->_in_ack = 0;
return 0;
fail:
meshms_message_iterator_close(iter);
return -1;
}
void meshms_message_iterator_close(struct meshms_message_iterator *iter)
{
if (iter->_manifest_sender) {
ply_read_close(&iter->_read_sender);
rhizome_manifest_free(iter->_manifest_sender);
iter->_manifest_sender = NULL;
}
if (iter->_manifest_recipient){
ply_read_close(&iter->_read_recipient);
rhizome_manifest_free(iter->_manifest_recipient);
iter->_manifest_recipient = NULL;
}
meshms_free_conversations(iter->_conv);
}
int meshms_message_iterator_next(struct meshms_message_iterator *iter)
{
assert(iter->_conv != NULL);
if (iter->_conv->found_my_ply) {
assert(iter->_manifest_sender != NULL);
if (iter->_conv->found_their_ply)
assert(iter->_manifest_recipient != NULL);
}
int ret = 0;
while (ret == 0) {
if (iter->_in_ack) {
if (config.debug.meshms)
DEBUGF("Reading other log from %"PRId64", to %"PRId64, iter->_read_recipient.read.offset, iter->_end_range);
ret = ply_find_next(&iter->_read_recipient, MESHMS_BLOCK_TYPE_MESSAGE);
if (ret == 0 && iter->_read_recipient.read.offset >= iter->_end_range) {
iter->offset = iter->_read_recipient.record_end_offset;
iter->text = (const char *)iter->_read_recipient.buffer;
iter->direction = RECEIVED;
iter->read = iter->_read_recipient.record_end_offset <= iter->_conv->read_offset;
return 0;
}
iter->_in_ack = 0;
}
if ((ret = ply_read_next(&iter->_read_sender)) == 0) {
if (config.debug.meshms)
DEBUGF("Offset %"PRId64", type %d, received_read_offset %"PRId64, iter->_read_sender.read.offset, iter->_read_sender.type, iter->received_read_offset);
switch (iter->_read_sender.type) {
case MESHMS_BLOCK_TYPE_ACK:
// Read the received messages up to the ack'ed offset
if (iter->_conv->found_their_ply) {
int ofs = unpack_uint(iter->_read_sender.buffer, iter->_read_sender.record_length, (uint64_t*)&iter->_read_recipient.read.offset);
if (ofs == -1)
return WHYF("Malformed ACK");
uint64_t end_range;
int x = unpack_uint(iter->_read_sender.buffer + ofs, iter->_read_sender.record_length - ofs, &end_range);
if (x == -1)
iter->_end_range = 0;
else
iter->_end_range = iter->_read_recipient.read.offset - end_range;
// TODO tail
// just in case we don't have the full bundle anymore
if (iter->_read_recipient.read.offset > iter->_read_recipient.read.length)
iter->_read_recipient.read.offset = iter->_read_recipient.read.length;
iter->_in_ack = 1;
}
break;
case MESHMS_BLOCK_TYPE_MESSAGE:
iter->offset = iter->_read_sender.record_end_offset;
iter->text = (const char *)iter->_read_sender.buffer;
iter->direction = SENT;
iter->delivered = iter->sent_ack_offset && iter->_read_sender.record_end_offset <= iter->sent_ack_offset;
return 0;
}
}
}
return ret;
}
// output the list of existing conversations for a given local identity
int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_context *context)
{
@ -844,7 +941,6 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
if (cli_arg(parsed, "sender_sid", &my_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, "") == -1)
return -1;
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
@ -852,7 +948,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
if (rhizome_opendb() == -1){
keyring_free(keyring);
return -1;
}
}
sid_t my_sid, their_sid;
if (str_to_sid_t(&my_sid, my_sidhex) == -1){
keyring_free(keyring);
@ -861,141 +957,57 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
if (str_to_sid_t(&their_sid, their_sidhex) == -1){
keyring_free(keyring);
return WHY("invalid recipient SID");
}
struct meshms_conversations *conv=find_or_create_conv(&my_sid, &their_sid);
if (!conv){
}
struct meshms_message_iterator iter;
if (meshms_message_iterator_open(&iter, &my_sid, &their_sid) == -1) {
keyring_free(keyring);
return -1;
}
int ret=-1;
const char *names[]={
"_id","offset","type","message"
};
cli_columns(context, 4, names);
rhizome_manifest *m_ours=NULL, *m_theirs=NULL;
struct ply_read read_ours, read_theirs;
// if we've never sent a message, (or acked theirs), there is nothing to show
if (!conv->found_my_ply){
ret=0;
cli_row_count(context, 0);
if (config.debug.meshms)
DEBUGF("Did not find my ply");
goto end;
}
// start reading messages from both ply's in reverse order
bzero(&read_ours, sizeof(read_ours));
bzero(&read_theirs, sizeof(read_theirs));
m_ours = rhizome_new_manifest();
if (!m_ours)
goto end;
if (ply_read_open(&read_ours, &conv->my_ply.bundle_id, m_ours))
goto end;
uint64_t their_last_ack=0;
uint64_t their_ack_offset=0;
int64_t unread_mark=conv->read_offset;
if (conv->found_their_ply){
m_theirs = rhizome_new_manifest();
if (!m_theirs)
goto end;
if (ply_read_open(&read_theirs, &conv->their_ply.bundle_id, m_theirs))
goto end;
// find their last ACK so we know if messages have been received
int r = ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_ACK);
if (r==0){
if (unpack_uint(read_theirs.buffer, read_theirs.record_length, &their_last_ack) == -1)
their_last_ack=0;
else
their_ack_offset = read_theirs.record_end_offset;
if (config.debug.meshms)
DEBUGF("Found their last ack @%"PRId64, their_last_ack);
}
}
int id=0;
while(ply_read_next(&read_ours)==0){
if (config.debug.meshms)
DEBUGF("Offset %"PRId64", type %d, read_offset %"PRId64, read_ours.read.offset, read_ours.type, conv->read_offset);
if (their_last_ack && their_last_ack >= read_ours.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, their_ack_offset, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
their_last_ack = 0;
}
switch(read_ours.type){
case MESHMS_BLOCK_TYPE_ACK:
// read their message list, and insert all messages that are included in the ack range
if (conv->found_their_ply){
int ofs=unpack_uint(read_ours.buffer, read_ours.record_length, (uint64_t*)&read_theirs.read.offset);
if (ofs == -1)
break;
uint64_t end_range;
int x = unpack_uint(read_ours.buffer+ofs, read_ours.record_length - ofs, &end_range);
if (x == -1)
end_range=0;
else
end_range = read_theirs.read.offset - end_range;
// TODO tail
// just incase we don't have the full bundle anymore
if (read_theirs.read.offset > read_theirs.read.length)
read_theirs.read.offset = read_theirs.read.length;
if (config.debug.meshms)
DEBUGF("Reading other log from %"PRId64", to %"PRId64, read_theirs.read.offset, end_range);
while(ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_MESSAGE)==0){
if (read_theirs.read.offset < end_range)
break;
if (unread_mark >= (int64_t)read_theirs.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, unread_mark, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");
unread_mark = -1;
}
cli_put_long(context, id++, ":");
cli_put_long(context, read_theirs.record_end_offset, ":");
cli_put_string(context, "<", ":");
cli_put_string(context, (char *)read_theirs.buffer, "\n");
}
bool_t marked_delivered = 0;
bool_t marked_read = 0;
int id = 0;
int ret;
while ((ret = meshms_message_iterator_next(&iter)) == 0) {
switch (iter.direction) {
case SENT:
if (iter.delivered && !marked_delivered){
cli_put_long(context, id++, ":");
cli_put_long(context, iter.recipient_ack_offset, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
marked_delivered = 1;
}
break;
case MESHMS_BLOCK_TYPE_MESSAGE:
// TODO new message format here
cli_put_long(context, id++, ":");
cli_put_long(context, read_ours.record_end_offset, ":");
cli_put_long(context, iter.offset, ":");
cli_put_string(context, ">", ":");
cli_put_string(context, (char *)read_ours.buffer, "\n");
cli_put_string(context, iter.text, "\n");
break;
case RECEIVED:
if (iter.read && !marked_read) {
cli_put_long(context, id++, ":");
cli_put_long(context, iter.received_read_offset, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");
marked_read = 1;
}
// TODO new message format here
cli_put_long(context, id++, ":");
cli_put_long(context, iter.offset, ":");
cli_put_string(context, "<", ":");
cli_put_string(context, iter.text, "\n");
break;
}
}
cli_row_count(context, id);
ret=0;
end:
if (m_ours){
rhizome_manifest_free(m_ours);
ply_read_close(&read_ours);
if (ret != -1) {
cli_row_count(context, id);
ret = 0;
}
if (m_theirs){
rhizome_manifest_free(m_theirs);
ply_read_close(&read_theirs);
}
meshms_free_conversations(conv);
meshms_message_iterator_close(&iter);
keyring_free(keyring);
return ret;
}

View File

@ -56,6 +56,21 @@ struct meshms_conversations {
uint64_t their_size;
};
// cursor state for reading one half of a conversation
struct meshms_ply_read {
// rhizome payload
struct rhizome_read read;
// block buffer
struct rhizome_read_buffer buff;
// details of the current record
uint64_t record_end_offset;
uint16_t record_length;
size_t buffer_size;
char type;
// raw record data
unsigned char *buffer;
};
/* Fetch the list of all MeshMS conversations into a binary tree whose nodes
* are all allocated by malloc(3).
*/
@ -64,6 +79,13 @@ void meshms_free_conversations(struct meshms_conversations *conv);
/* For iterating over a binary tree of all MeshMS conversations, as created by
* meshms_conversations_list().
*
* struct meshms_conversation_iterator it;
* meshms_conversation_iterator_start(&it, conv);
* while (it.current) {
* ...
* meshms_conversation_iterator_advance(&it);
* }
*/
struct meshms_conversation_iterator {
struct meshms_conversations *current;
@ -71,4 +93,45 @@ struct meshms_conversation_iterator {
void meshms_conversation_iterator_start(struct meshms_conversation_iterator *, struct meshms_conversations *);
void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *);
/* For iterating through the messages in a single MeshMS conversation; both
* plys threaded (interleaved) in the order as seen by the sender.
*
* struct meshms_message_iterator it;
* if (meshms_message_iterator_open(&it, &sender_sid, &recip_sid) == -1)
* return -1;
* int ret;
* while ((ret = meshms_message_iterator_next(&it)) == 0) {
* ...
* }
* meshms_message_iterator_close(&it);
* if (ret == -1)
* return -1;
* ...
*/
struct meshms_message_iterator {
// Public fields that remain fixed for the life of the iterator.
uint64_t recipient_ack_offset; // offset in recipient ply of most recent ack
uint64_t sent_ack_offset; // offset in sender ply of most recent message acked by recipient
uint64_t received_read_offset; // offset in recipient ply of most recent message read by (displayed to) sender
// Public fields that change per message.
uint64_t offset;
const char *text; // NUL terminated text of message
enum { SENT, RECEIVED } direction;
union {
bool_t delivered; // for SENT
bool_t read; // for RECEIVED
};
// Private implementation -- could change, so don't use them.
struct meshms_conversations *_conv;
rhizome_manifest *_manifest_sender;
rhizome_manifest *_manifest_recipient;
struct meshms_ply_read _read_sender;
struct meshms_ply_read _read_recipient;
uint64_t _end_range;
bool_t _in_ack;
};
int meshms_message_iterator_open(struct meshms_message_iterator *, const sid_t *sender, const sid_t *recipient);
void meshms_message_iterator_close(struct meshms_message_iterator *);
int meshms_message_iterator_next(struct meshms_message_iterator *);
#endif // __SERVAL_DNA__MESHMS_H