serval-dna/meshms.c
Andrew Bettison 9ebef81a49 Formalise "rhizome add file" exit status
Formalise add-bundle result in "enum rhizome_bundle_status"

Rewrite rhizome_manifest_finalise(), rhizome_find_duplicate() and
rhizome_add_manifest() to return enum rhizome_bundle_status

New function rhizome_manifest_check_stored() that compares a manifest
with its stored counterpart and returns enum rhizome_bundle_status

Remove redundant rhizome_manifest_check_sanity(), consolidating all
manifest validation rules in rhizome_manifest_validate(), which now
checks the 'id' field is present, and that 'sender' and 'recipient' are
both present for MeshMS

Correct manifest finalisation logic: set the 'finalised' flag in
rhizome_manifest_validate(), not in rhizome_manifest_verify() (which
sets 'selfSigned'), and consistently clear 'finalised' flag in all
attribute setter functions

Remove manifest 'ttl' field and all references thereof (leaving unused
space in Rhizome BAR)

Rename some payload functions for clarity
2013-12-21 14:37:18 +10:30

1037 lines
30 KiB
C

/*
Serval DNA MeshMS
Copyright (C) 2013 Serval Project Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <assert.h>
#include "serval.h"
#include "rhizome.h"
#include "log.h"
#include "conf.h"
#include "crypto.h"
#include "strlcpy.h"
#include "keyring.h"
#include "dataformats.h"
#define MESHMS_BLOCK_TYPE_ACK 0x01
#define MESHMS_BLOCK_TYPE_MESSAGE 0x02
#define MESHMS_BLOCK_TYPE_BID_REFERENCE 0x03
// the manifest details for one half of a conversation
struct ply{
rhizome_bid_t bundle_id;
uint64_t version;
uint64_t tail;
uint64_t size;
};
struct conversations{
// binary tree
struct conversations *_left;
struct conversations *_right;
// who are we talking to?
sid_t them;
char found_my_ply;
struct ply my_ply;
char found_their_ply;
struct ply their_ply;
// what is the offset of their last message
uint64_t their_last_message;
// what is the last message we marked as read
uint64_t read_offset;
// our cached value for the last known size of their ply
uint64_t their_size;
};
// cursor state for reading one half of a conversation
struct ply_read{
// rhizome payload
struct rhizome_read read;
// block buffer
struct rhizome_read_buffer buff;
// details of the current record
uint64_t record_end_offset;
uint16_t record_length;
size_t buffer_size;
char type;
// raw record data
unsigned char *buffer;
};
static int meshms_conversations_list(const sid_t *my_sid, const sid_t *their_sid, struct conversations **conv);
static void free_conversations(struct conversations *conv){
if (!conv)
return;
free_conversations(conv->_left);
free_conversations(conv->_right);
free(conv);
}
static int get_my_conversation_bundle(const sid_t *my_sidp, rhizome_manifest *m)
{
/* Find our private key */
unsigned cn=0, in=0, kp=0;
if (!keyring_find_sid(keyring,&cn,&in,&kp,my_sidp))
return WHYF("SID was not found in keyring: %s", alloca_tohex_sid_t(*my_sidp));
char seed[1024];
snprintf(seed, sizeof(seed),
"incorrection%sconcentrativeness",
alloca_tohex(keyring->contexts[cn]->identities[in]
->keypairs[kp]->private_key, crypto_box_curve25519xsalsa20poly1305_SECRETKEYBYTES));
if (rhizome_get_bundle_from_seed(m, seed) == -1)
return -1;
// always consider the content encrypted, we don't need to rely on the manifest itself.
rhizome_manifest_set_crypt(m, PAYLOAD_ENCRYPTED);
assert(m->haveSecret);
if (m->haveSecret == NEW_BUNDLE_ID) {
rhizome_manifest_set_service(m, RHIZOME_SERVICE_FILE);
rhizome_manifest_set_name(m, "");
if (rhizome_fill_manifest(m, NULL, my_sidp) == -1)
return WHY("Invalid manifest");
if (config.debug.meshms) {
char secret[RHIZOME_BUNDLE_KEY_STRLEN + 1];
rhizome_bytes_to_hex_upper(m->cryptoSignSecret, secret, RHIZOME_BUNDLE_KEY_BYTES);
// The 'meshms' automated test depends on this message; do not alter.
DEBUGF("MESHMS CONVERSATION BUNDLE bid=%s secret=%s",
alloca_tohex_rhizome_bid_t(m->cryptoSignPublic),
secret
);
}
} else {
if (strcmp(m->service, RHIZOME_SERVICE_FILE) != 0)
return WHYF("Invalid manifest, service=%s but should be %s", m->service, RHIZOME_SERVICE_MESHMS2);
}
return 0;
}
static struct conversations *add_conv(struct conversations **conv, const sid_t *them)
{
struct conversations **ptr = conv;
while(*ptr){
int cmp = cmp_sid_t(&(*ptr)->them, them);
if (cmp == 0)
break;
if (cmp < 0)
ptr = &(*ptr)->_left;
else
ptr = &(*ptr)->_right;
}
if (!*ptr){
*ptr = emalloc_zero(sizeof(struct conversations));
if (*ptr)
(*ptr)->them = *them;
}
return *ptr;
}
// find matching conversations
// if their_sid == my_sid, return all conversations with any recipient
static int get_database_conversations(const sid_t *my_sid, const sid_t *their_sid, struct conversations **conv)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
"SELECT id, version, filesize, tail, sender, recipient"
" FROM manifests"
" WHERE service = ?3"
" AND (sender=?1 or recipient=?1)"
" AND (sender=?2 or recipient=?2)",
SID_T, my_sid,
SID_T, their_sid ? their_sid : my_sid,
STATIC_TEXT, RHIZOME_SERVICE_MESHMS2,
END
);
if (!statement)
return -1;
if (config.debug.meshms) {
const char *my_sid_hex = alloca_tohex_sid_t(*my_sid);
const char *their_sid_hex = alloca_tohex_sid_t(*(their_sid ? their_sid : my_sid));
DEBUGF("Looking for conversations for %s, %s", my_sid_hex, their_sid_hex);
}
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
const char *id_hex = (const char *)sqlite3_column_text(statement, 0);
uint64_t version = sqlite3_column_int64(statement, 1);
int64_t size = sqlite3_column_int64(statement, 2);
int64_t tail = sqlite3_column_int64(statement, 3);
const char *sender = (const char *)sqlite3_column_text(statement, 4);
const char *recipient = (const char *)sqlite3_column_text(statement, 5);
if (config.debug.meshms)
DEBUGF("found id %s, sender %s, recipient %s", id_hex, sender, recipient);
rhizome_bid_t bid;
if (str_to_rhizome_bid_t(&bid, id_hex) == -1) {
WHYF("invalid Bundle ID hex: %s -- skipping", alloca_str_toprint(id_hex));
continue;
}
const char *them = recipient;
sid_t their_sid;
if (str_to_sid_t(&their_sid, them) == -1) {
WHYF("invalid SID hex: %s -- skipping", alloca_str_toprint(them));
continue;
}
if (cmp_sid_t(&their_sid, my_sid) == 0) {
them = sender;
if (str_to_sid_t(&their_sid, them) == -1) {
WHYF("invalid SID hex: %s -- skipping", alloca_str_toprint(them));
continue;
}
}
struct conversations *ptr = add_conv(conv, &their_sid);
if (!ptr)
break;
struct ply *p;
if (them==sender){
ptr->found_their_ply=1;
p=&ptr->their_ply;
}else{
ptr->found_my_ply=1;
p=&ptr->my_ply;
}
p->bundle_id = bid;
p->version = version;
p->tail = tail;
p->size = size;
}
sqlite3_finalize(statement);
return 0;
}
static struct conversations * find_or_create_conv(const sid_t *my_sid, const sid_t *their_sid)
{
struct conversations *conv=NULL;
if (meshms_conversations_list(my_sid, their_sid, &conv))
return NULL;
if (!conv){
conv = emalloc_zero(sizeof(struct conversations));
conv->them = *their_sid;
}
return conv;
}
static int create_ply(const sid_t *my_sid, struct conversations *conv, rhizome_manifest *m)
{
if (config.debug.meshms)
DEBUGF("Creating ply for my_sid=%s them=%s",
alloca_tohex_sid_t(conv->them),
alloca_tohex_sid_t(*my_sid));
rhizome_manifest_set_service(m, RHIZOME_SERVICE_MESHMS2);
rhizome_manifest_set_sender(m, my_sid);
rhizome_manifest_set_recipient(m, &conv->them);
rhizome_manifest_set_filesize(m, 0);
rhizome_manifest_set_tail(m, 0);
if (rhizome_fill_manifest(m, NULL, my_sid))
return -1;
assert(m->haveSecret);
assert(m->payloadEncryption == PAYLOAD_ENCRYPTED);
conv->my_ply.bundle_id = m->cryptoSignPublic;
conv->found_my_ply = 1;
return 0;
}
static int append_footer(unsigned char *buffer, char type, int payload_len)
{
payload_len = (payload_len << 4) | (type&0xF);
write_uint16(buffer, payload_len);
return 2;
}
static int ply_read_open(struct ply_read *ply, const rhizome_bid_t *bid, rhizome_manifest *m)
{
if (config.debug.meshms)
DEBUGF("Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
if (rhizome_retrieve_manifest(bid, m))
return -1;
int ret = rhizome_open_decrypt_read(m, &ply->read);
if (ret == 1)
WARNF("Payload was not found for manifest %s, %"PRIu64, alloca_tohex_rhizome_bid_t(m->cryptoSignPublic), m->version);
if (ret != 0)
return ret;
assert(m->filesize != RHIZOME_SIZE_UNSET);
ply->read.offset = ply->read.length = m->filesize;
return 0;
}
static int ply_read_close(struct ply_read *ply)
{
if (ply->buffer){
free(ply->buffer);
ply->buffer=NULL;
}
ply->buffer_size=0;
ply->buff.len=0;
return rhizome_read_close(&ply->read);
}
// read the next record from the ply (backwards)
// returns 1 on EOF, -1 on failure
static int ply_read_next(struct ply_read *ply)
{
ply->record_end_offset = ply->read.offset;
unsigned char footer[2];
if (ply->read.offset <= sizeof footer) {
if (config.debug.meshms)
DEBUGF("EOF");
return 1;
}
ply->read.offset -= sizeof footer;
ssize_t read;
read = rhizome_read_buffered(&ply->read, &ply->buff, footer, sizeof footer);
if (read == -1)
return WHYF("rhizome_read_buffered() failed");
if ((size_t) read != sizeof footer)
return WHYF("Expected %zu bytes read, got %zu", (size_t) sizeof footer, (size_t) read);
// (rhizome_read automatically advances the offset by the number of bytes read)
ply->record_length=read_uint16(footer);
ply->type = ply->record_length & 0xF;
ply->record_length = ply->record_length>>4;
if (config.debug.meshms)
DEBUGF("Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset);
// need to allow for advancing the tail and cutting a message in half.
if (ply->record_length + sizeof footer > ply->read.offset){
if (config.debug.meshms)
DEBUGF("EOF");
return 1;
}
ply->read.offset -= ply->record_length + sizeof(footer);
uint64_t record_start = ply->read.offset;
if (ply->buffer_size < ply->record_length){
ply->buffer_size = ply->record_length;
unsigned char *b=realloc(ply->buffer, ply->buffer_size);
if (!b)
return WHY("realloc() failed");
ply->buffer = b;
}
read = rhizome_read_buffered(&ply->read, &ply->buff, ply->buffer, ply->record_length);
if (read == -1)
return WHYF("rhizome_read_buffered() failed");
if ((size_t) read != ply->record_length)
return WHYF("Expected %u bytes read, got %zu", ply->record_length, (size_t) read);
ply->read.offset = record_start;
return 0;
}
// keep reading past messages until you find this type.
static int ply_find_next(struct ply_read *ply, char type){
while(1){
int ret = ply_read_next(ply);
if (ret || ply->type==type)
return ret;
}
}
static int append_meshms_buffer(const sid_t *my_sid, struct conversations *conv, unsigned char *buffer, int len)
{
int ret=-1;
rhizome_manifest *mout = NULL;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
if (conv->found_my_ply){
if (rhizome_retrieve_manifest(&conv->my_ply.bundle_id, m))
goto end;
rhizome_authenticate_author(m);
if (!m->haveSecret || m->authorship != AUTHOR_AUTHENTIC)
goto end;
}else{
if (create_ply(my_sid, conv, m))
goto end;
}
assert(m->haveSecret);
assert(m->authorship == AUTHOR_AUTHENTIC);
if (rhizome_append_journal_buffer(m, 0, buffer, len))
goto end;
if (rhizome_manifest_finalise(m, &mout, 1) == -1)
goto end;
ret=0;
end:
if (mout && mout!=m)
rhizome_manifest_free(mout);
if (m)
rhizome_manifest_free(m);
return ret;
}
// update if any conversations are unread or need to be acked.
// return -1 for failure, 1 if the conversation index needs to be saved.
static int update_conversation(const sid_t *my_sid, struct conversations *conv){
if (config.debug.meshms)
DEBUG("Checking if conversation needs to be acked");
// Nothing to be done if they have never sent us anything
if (!conv->found_their_ply)
return 0;
rhizome_manifest *m_ours = NULL;
rhizome_manifest *m_theirs = rhizome_new_manifest();
if (!m_theirs)
return -1;
struct ply_read ply;
bzero(&ply, sizeof(ply));
int ret=-1;
if (config.debug.meshms)
DEBUG("Locating their last message");
if (ply_read_open(&ply, &conv->their_ply.bundle_id, m_theirs))
goto end;
ret = ply_find_next(&ply, MESHMS_BLOCK_TYPE_MESSAGE);
if (ret!=0){
// no messages indicates that we didn't do anthing
if (ret>0)
ret=0;
goto end;
}
if (conv->their_last_message == ply.record_end_offset){
// nothing has changed since last time
ret=0;
goto end;
}
conv->their_last_message = ply.record_end_offset;
if (config.debug.meshms)
DEBUGF("Found last message @%"PRId64, conv->their_last_message);
ply_read_close(&ply);
// find our previous ack
uint64_t previous_ack = 0;
if (conv->found_my_ply){
if (config.debug.meshms)
DEBUG("Locating our previous ack");
m_ours = rhizome_new_manifest();
if (!m_ours)
goto end;
if (ply_read_open(&ply, &conv->my_ply.bundle_id, m_ours))
goto end;
ret = ply_find_next(&ply, MESHMS_BLOCK_TYPE_ACK);
if (ret == -1)
goto end;
if (ret==0){
if (unpack_uint(ply.buffer, ply.record_length, &previous_ack) == -1)
previous_ack=0;
}
if (config.debug.meshms)
DEBUGF("Previous ack is %"PRId64, previous_ack);
ply_read_close(&ply);
}else{
if (config.debug.meshms)
DEBUGF("No outgoing ply");
}
if (previous_ack >= conv->their_last_message){
// their last message has already been acked
ret=1;
goto end;
}
// append an ack for their message
if (config.debug.meshms)
DEBUGF("Creating ACK for %"PRId64" - %"PRId64, previous_ack, conv->their_last_message);
unsigned char buffer[24];
int ofs=0;
ofs+=pack_uint(&buffer[ofs], conv->their_last_message);
if (previous_ack)
ofs+=pack_uint(&buffer[ofs], conv->their_last_message - previous_ack);
ofs+=append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_ACK, ofs);
ret = append_meshms_buffer(my_sid, conv, buffer, ofs);
end:
ply_read_close(&ply);
if (m_ours)
rhizome_manifest_free(m_ours);
if (m_theirs)
rhizome_manifest_free(m_theirs);
// if it's all good, remember the size of their ply at the time we examined it.
if (ret>=0)
conv->their_size = conv->their_ply.size;
return ret;
}
// update conversations, and return 1 if the conversation index should be saved
static int update_conversations(const sid_t *my_sid, struct conversations *conv){
if (!conv)
return 0;
int ret = 0;
if (update_conversations(my_sid, conv->_left))
ret=1;
if (conv->their_size != conv->their_ply.size){
if (update_conversation(my_sid, conv)>0)
ret=1;
}
if (update_conversations(my_sid, conv->_right))
ret=1;
return ret;
}
// read our cached conversation list from our rhizome payload
// if we can't load the existing data correctly, just ignore it.
static int read_known_conversations(rhizome_manifest *m, const sid_t *their_sid, struct conversations **conv)
{
if (m->haveSecret==NEW_BUNDLE_ID)
return 0;
struct rhizome_read read;
bzero(&read, sizeof(read));
struct rhizome_read_buffer buff;
bzero(&buff, sizeof(buff));
int ret = rhizome_open_decrypt_read(m, &read);
if (ret == -1)
goto end;
unsigned char version=0xFF;
ssize_t r = rhizome_read_buffered(&read, &buff, &version, 1);
ret = -1;
if (r == -1)
goto end;
if (version != 1) {
WARNF("Expected version 1 (got 0x%02x)", version);
goto end;
}
while (1) {
sid_t sid;
r = rhizome_read_buffered(&read, &buff, sid.binary, sizeof sid.binary);
if (r != sizeof sid.binary)
break;
if (config.debug.meshms)
DEBUGF("Reading existing conversation for %s", alloca_tohex_sid_t(sid));
if (their_sid && cmp_sid_t(&sid, their_sid) != 0)
continue;
struct conversations *ptr = add_conv(conv, &sid);
if (!ptr)
goto end;
unsigned char details[8*3];
r = rhizome_read_buffered(&read, &buff, details, sizeof details);
if (r == -1)
break;
int bytes = r;
int ofs = 0;
int unpacked = unpack_uint(details, bytes, &ptr->their_last_message);
if (unpacked == -1)
break;
ofs += unpacked;
unpacked = unpack_uint(details+ofs, bytes-ofs, &ptr->read_offset);
if (unpacked == -1)
break;
ofs += unpacked;
unpacked = unpack_uint(details+ofs, bytes-ofs, &ptr->their_size);
if (unpacked == -1)
break;
ofs += unpacked;
read.offset += ofs - bytes;
}
ret = 0;
end:
rhizome_read_close(&read);
return ret;
}
static ssize_t write_conversation(struct rhizome_write *write, struct conversations *conv)
{
size_t len=0;
if (!conv)
return len;
{
unsigned char buffer[sizeof(conv->them) + (8*3)];
if (write)
bcopy(conv->them.binary, buffer, sizeof(conv->them));
len+=sizeof(conv->them);
if (write){
len+=pack_uint(&buffer[len], conv->their_last_message);
len+=pack_uint(&buffer[len], conv->read_offset);
len+=pack_uint(&buffer[len], conv->their_size);
int ret=rhizome_write_buffer(write, buffer, len);
if (ret == -1)
return ret;
}else{
len+=measure_packed_uint(conv->their_last_message);
len+=measure_packed_uint(conv->read_offset);
len+=measure_packed_uint(conv->their_size);
}
DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %zu",
alloca_tohex_sid_t(conv->them),
conv->their_last_message,
conv->read_offset,
conv->their_size,
len);
}
// write the two child nodes
ssize_t ret = write_conversation(write, conv->_left);
if (ret == -1)
return ret;
len += (size_t) ret;
ret = write_conversation(write, conv->_right);
if (ret == -1)
return ret;
len += (size_t) ret;
return len;
}
static int write_known_conversations(rhizome_manifest *m, struct conversations *conv)
{
rhizome_manifest *mout=NULL;
struct rhizome_write write;
bzero(&write, sizeof(write));
int ret=-1;
// TODO rebalance tree...
// measure the final payload first
ssize_t len=write_conversation(NULL, conv);
if (len == -1)
goto end;
// then write it
rhizome_manifest_set_version(m, m->version + 1);
rhizome_manifest_set_filesize(m, (size_t)len + 1);
if (rhizome_write_open_manifest(&write, m) == -1)
goto end;
unsigned char version=1;
if (rhizome_write_buffer(&write, &version, 1) == -1)
goto end;
if (write_conversation(&write, conv) == -1)
goto end;
if (rhizome_finish_write(&write))
goto end;
rhizome_manifest_set_filehash(m, &write.id);
if (rhizome_manifest_finalise(m, &mout, 1) == -1)
goto end;
ret=0;
end:
if (ret)
rhizome_fail_write(&write);
if (mout && m!=mout)
rhizome_manifest_free(mout);
return ret;
}
// read information about existing conversations from a rhizome payload
static int meshms_conversations_list(const sid_t *my_sid, const sid_t *their_sid, struct conversations **conv)
{
int ret=-1;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
if (get_my_conversation_bundle(my_sid, m))
goto end;
// read conversations payload
if (read_known_conversations(m, their_sid, conv))
goto end;
if (get_database_conversations(my_sid, their_sid, conv))
goto end;
if (update_conversations(my_sid, *conv) && !their_sid){
if (write_known_conversations(m, *conv))
goto end;
}
ret=0;
end:
rhizome_manifest_free(m);
return ret;
}
// recursively traverse the conversation tree in sorted order and output the details of each conversation
static int output_conversations(struct cli_context *context, struct conversations *conv,
int output, int offset, int count){
if (!conv)
return 0;
int traverse_count = output_conversations(context, conv->_left, output, offset, count);
if (count <0 || output + traverse_count < offset + count){
if (output + traverse_count >= offset){
cli_put_long(context, output + traverse_count, ":");
cli_put_hexvalue(context, conv->them.binary, sizeof(conv->them), ":");
cli_put_string(context, conv->read_offset < conv->their_last_message ? "unread":"", ":");
cli_put_long(context, conv->their_last_message, ":");
cli_put_long(context, conv->read_offset, "\n");
}
traverse_count++;
}
traverse_count += output_conversations(context, conv->_right, output + traverse_count, offset, count);
return traverse_count;
}
// output the list of existing conversations for a given local identity
int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_context *context){
const char *sidhex, *offset_str, *count_str;
if (cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "offset", &offset_str, NULL, "0")==-1
|| cli_arg(parsed, "count", &count_str, NULL, "-1")==-1)
return -1;
sid_t sid;
fromhex(sid.binary, sidhex, sizeof(sid.binary));
int offset=atoi(offset_str);
int count=atoi(count_str);
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1){
keyring_free(keyring);
return -1;
}
struct conversations *conv=NULL;
if (meshms_conversations_list(&sid, NULL, &conv)){
keyring_free(keyring);
return -1;
}
const char *names[]={
"_id","recipient","read", "last_message", "read_offset"
};
cli_columns(context, 5, names);
int rows = output_conversations(context, conv, 0, offset, count);
cli_row_count(context, rows);
free_conversations(conv);
keyring_free(keyring);
return 0;
}
int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *my_sidhex, *their_sidhex, *message;
if (cli_arg(parsed, "sender_sid", &my_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "payload", &message, NULL, "") == -1)
return -1;
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1){
keyring_free(keyring);
return -1;
}
sid_t my_sid, their_sid;
if (str_to_sid_t(&my_sid, my_sidhex) == -1)
return WHY("invalid sender SID");
if (str_to_sid_t(&their_sid, their_sidhex) == -1)
return WHY("invalid recipient SID");
struct conversations *conv = find_or_create_conv(&my_sid, &their_sid);
if (!conv) {
keyring_free(keyring);
return -1;
}
// construct a message payload
int message_len = strlen(message)+1;
// TODO, new format here.
unsigned char buffer[message_len+3];
strcpy((char*)buffer, message); // message
message_len+=append_footer(buffer+message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len);
int ret = append_meshms_buffer(&my_sid, conv, buffer, message_len);
free_conversations(conv);
keyring_free(keyring);
return ret;
}
int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context *context)
{
const char *my_sidhex, *their_sidhex;
if (cli_arg(parsed, "sender_sid", &my_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, "") == -1)
return -1;
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1){
keyring_free(keyring);
return -1;
}
sid_t my_sid, their_sid;
if (str_to_sid_t(&my_sid, my_sidhex) == -1){
keyring_free(keyring);
return WHY("invalid sender SID");
}
if (str_to_sid_t(&their_sid, their_sidhex) == -1){
keyring_free(keyring);
return WHY("invalid recipient SID");
}
struct conversations *conv=find_or_create_conv(&my_sid, &their_sid);
if (!conv){
keyring_free(keyring);
return -1;
}
int ret=-1;
const char *names[]={
"_id","offset","type","message"
};
cli_columns(context, 4, names);
rhizome_manifest *m_ours=NULL, *m_theirs=NULL;
struct ply_read read_ours, read_theirs;
// if we've never sent a message, (or acked theirs), there is nothing to show
if (!conv->found_my_ply){
ret=0;
cli_row_count(context, 0);
if (config.debug.meshms)
DEBUGF("Did not find my ply");
goto end;
}
// start reading messages from both ply's in reverse order
bzero(&read_ours, sizeof(read_ours));
bzero(&read_theirs, sizeof(read_theirs));
m_ours = rhizome_new_manifest();
if (!m_ours)
goto end;
if (ply_read_open(&read_ours, &conv->my_ply.bundle_id, m_ours))
goto end;
uint64_t their_last_ack=0;
uint64_t their_ack_offset=0;
int64_t unread_mark=conv->read_offset;
if (conv->found_their_ply){
m_theirs = rhizome_new_manifest();
if (!m_theirs)
goto end;
if (ply_read_open(&read_theirs, &conv->their_ply.bundle_id, m_theirs))
goto end;
// find their last ACK so we know if messages have been received
int r = ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_ACK);
if (r==0){
if (unpack_uint(read_theirs.buffer, read_theirs.record_length, &their_last_ack) == -1)
their_last_ack=0;
else
their_ack_offset = read_theirs.record_end_offset;
if (config.debug.meshms)
DEBUGF("Found their last ack @%"PRId64, their_last_ack);
}
}
int id=0;
while(ply_read_next(&read_ours)==0){
if (config.debug.meshms)
DEBUGF("Offset %"PRId64", type %d, read_offset %"PRId64, read_ours.read.offset, read_ours.type, conv->read_offset);
if (their_last_ack && their_last_ack >= read_ours.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, their_ack_offset, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
their_last_ack = 0;
}
switch(read_ours.type){
case MESHMS_BLOCK_TYPE_ACK:
// read their message list, and insert all messages that are included in the ack range
if (conv->found_their_ply){
int ofs=unpack_uint(read_ours.buffer, read_ours.record_length, (uint64_t*)&read_theirs.read.offset);
if (ofs == -1)
break;
uint64_t end_range;
int x = unpack_uint(read_ours.buffer+ofs, read_ours.record_length - ofs, &end_range);
if (x == -1)
end_range=0;
else
end_range = read_theirs.read.offset - end_range;
// TODO tail
// just incase we don't have the full bundle anymore
if (read_theirs.read.offset > read_theirs.read.length)
read_theirs.read.offset = read_theirs.read.length;
if (config.debug.meshms)
DEBUGF("Reading other log from %"PRId64", to %"PRId64, read_theirs.read.offset, end_range);
while(ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_MESSAGE)==0){
if (read_theirs.read.offset < end_range)
break;
if (unread_mark >= (int64_t)read_theirs.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, unread_mark, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");
unread_mark = -1;
}
cli_put_long(context, id++, ":");
cli_put_long(context, read_theirs.record_end_offset, ":");
cli_put_string(context, "<", ":");
cli_put_string(context, (char *)read_theirs.buffer, "\n");
}
}
break;
case MESHMS_BLOCK_TYPE_MESSAGE:
// TODO new message format here
cli_put_long(context, id++, ":");
cli_put_long(context, read_ours.record_end_offset, ":");
cli_put_string(context, ">", ":");
cli_put_string(context, (char *)read_ours.buffer, "\n");
break;
}
}
cli_row_count(context, id);
ret=0;
end:
if (m_ours){
rhizome_manifest_free(m_ours);
ply_read_close(&read_ours);
}
if (m_theirs){
rhizome_manifest_free(m_theirs);
ply_read_close(&read_theirs);
}
free_conversations(conv);
keyring_free(keyring);
return ret;
}
static int mark_read(struct conversations *conv, const sid_t *their_sid, const char *offset_str){
int ret=0;
if (conv){
int cmp = their_sid ? cmp_sid_t(&conv->them, their_sid) : 0;
if (!their_sid || cmp<0){
ret+=mark_read(conv->_left, their_sid, offset_str);
}
if (!their_sid || cmp==0){
// update read offset
// - never rewind
// - never past their last message
uint64_t offset = conv->their_last_message;
if (offset_str){
uint64_t x = atol(offset_str);
if (x<offset)
offset=x;
}
if (offset > conv->read_offset){
if (config.debug.meshms)
DEBUGF("Moving read marker for %s, from %"PRId64" to %"PRId64,
alloca_tohex_sid_t(conv->them), conv->read_offset, offset);
conv->read_offset = offset;
ret++;
}
}
if (!their_sid || cmp>0){
ret+=mark_read(conv->_right, their_sid, offset_str);
}
}
return ret;
}
int app_meshms_mark_read(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *my_sidhex, *their_sidhex, *offset_str;
if (cli_arg(parsed, "sender_sid", &my_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, NULL) == -1
|| cli_arg(parsed, "offset", &offset_str, NULL, NULL)==-1)
return -1;
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1){
keyring_free(keyring);
return -1;
}
sid_t my_sid, their_sid;
fromhex(my_sid.binary, my_sidhex, sizeof(my_sid.binary));
if (their_sidhex)
fromhex(their_sid.binary, their_sidhex, sizeof(their_sid.binary));
int ret=-1;
struct conversations *conv=NULL;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
if (get_my_conversation_bundle(&my_sid, m))
goto end;
// read all conversations, so we can write them again
if (read_known_conversations(m, NULL, &conv))
goto end;
// read the full list of conversations from the database too
if (get_database_conversations(&my_sid, NULL, &conv))
goto end;
// check if any incoming conversations need to be acked or have new messages and update the read offset
int changed = update_conversations(&my_sid, conv);
if (mark_read(conv, their_sidhex?&their_sid:NULL, offset_str))
changed =1;
if (changed){
// save the conversation list
if (write_known_conversations(m, conv))
goto end;
}
ret=0;
end:
if (m)
rhizome_manifest_free(m);
free_conversations(conv);
keyring_free(keyring);
return ret;
}