Merge branch 'meshmsv2' into development

This commit is contained in:
Jeremy Lakeman 2013-08-05 13:05:44 +09:30
commit 8a1c0a39e3
18 changed files with 1613 additions and 324 deletions

View File

@ -2306,6 +2306,14 @@ struct cli_schema command_line_options[]={
"Get specified configuration variable."},
{app_vomp_console,{"console",NULL}, 0,
"Test phone call life-cycle from the console"},
{app_meshms_conversations,{"meshms","list","conversations" KEYRING_PIN_OPTIONS, "<sid>","[<offset>]","[<count>]",NULL},0,
"List MeshMS threads that include <sid>"},
{app_meshms_list_messages,{"meshms","list","messages" KEYRING_PIN_OPTIONS, "<sender_sid>","<recipient_sid>",NULL},0,
"List MeshMS messages between <sender_sid> and <recipient_sid>"},
{app_meshms_send_message,{"meshms","send","message" KEYRING_PIN_OPTIONS, "<sender_sid>", "<recipient_sid>", "<payload>",NULL},0,
"Send a MeshMS message from <sender_sid> to <recipient_sid>"},
{app_meshms_mark_read,{"meshms","read","messages" KEYRING_PIN_OPTIONS, "<sender_sid>", "[<recipient_sid>]", "[<offset>]",NULL},0,
"Mark incoming messages from this recipient as read."},
{app_rhizome_append_manifest, {"rhizome", "append", "manifest", "<filepath>", "<manifestpath>", NULL}, 0,
"Append a manifest to the end of the file it belongs to."},
{app_rhizome_hash_file,{"rhizome","hash","file","<filepath>",NULL}, 0,

View File

@ -258,6 +258,7 @@ ATOM(bool_t, rhizome, 0, boolean,, "")
ATOM(bool_t, rhizome_tx, 0, boolean,, "")
ATOM(bool_t, rhizome_rx, 0, boolean,, "")
ATOM(bool_t, rhizome_ads, 0, boolean,, "")
ATOM(bool_t, meshms, 0, boolean,, "")
ATOM(bool_t, manifests, 0, boolean,, "")
ATOM(bool_t, vomp, 0, boolean,, "")
ATOM(bool_t, trace, 0, boolean,, "")

View File

@ -1,3 +1,5 @@
#include "crypto_sign_edwards25519sha512batch.h"
#include "nacl/src/crypto_sign_edwards25519sha512batch_ref/ge.h"
#include "serval.h"
#include "overlay_address.h"
#include "crypto.h"
@ -95,3 +97,21 @@ int crypto_sign_message(struct subscriber *source, unsigned char *content, int b
*content_len+=sig_length;
return ret;
}
int crypto_sign_compute_public_key(const unsigned char *skin, unsigned char *pk)
{
IN();
unsigned char h[64];
ge_p3 A;
crypto_hash_sha512(h,skin,32);
h[0] &= 248;
h[31] &= 63;
h[31] |= 64;
ge_scalarmult_base(&A,h);
ge_p3_tobytes(pk,&A);
RETURN(0);
OUT();
}

View File

@ -13,5 +13,6 @@ int crypto_create_signature(unsigned char *key,
unsigned char *content, int content_len,
unsigned char *signature, int *sig_length);
int crypto_sign_message(struct subscriber *source, unsigned char *content, int buffer_len, int *content_len);
int crypto_sign_compute_public_key(const unsigned char *skin, unsigned char *pk);
#endif

960
meshms.c Normal file
View File

@ -0,0 +1,960 @@
#include "serval.h"
#include "rhizome.h"
#include "log.h"
#include "conf.h"
#include "crypto.h"
#include "strlcpy.h"
#define MESHMS_BLOCK_TYPE_ACK 0x01
#define MESHMS_BLOCK_TYPE_MESSAGE 0x02
#define MESHMS_BLOCK_TYPE_BID_REFERENCE 0x03
// the manifest details for one half of a conversation
struct ply{
char bundle_id[RHIZOME_MANIFEST_ID_STRLEN+1];
uint64_t version;
uint64_t tail;
uint64_t size;
};
struct conversations{
// binary tree
struct conversations *_left;
struct conversations *_right;
// who are we talking to?
sid_t them;
char found_my_ply;
struct ply my_ply;
char found_their_ply;
struct ply their_ply;
// what is the offset of their last message
uint64_t their_last_message;
// what is the last message we marked as read
uint64_t read_offset;
// our cached value for the last known size of their ply
uint64_t their_size;
};
// cursor state for reading one half of a conversation
struct ply_read{
// rhizome payload
struct rhizome_read read;
// block buffer
struct rhizome_read_buffer buff;
// details of the current record
uint64_t record_end_offset;
uint16_t record_length;
int buffer_size;
char type;
// raw record data
unsigned char *buffer;
};
static int meshms_conversations_list(const sid_t *my_sid, const sid_t *their_sid, struct conversations **conv);
static void free_conversations(struct conversations *conv){
if (!conv)
return;
free_conversations(conv->_left);
free_conversations(conv->_right);
free(conv);
}
static int get_my_conversation_bundle(const sid_t *my_sid, rhizome_manifest *m)
{
/* Find our private key */
int cn=0,in=0,kp=0;
if (!keyring_find_sid(keyring,&cn,&in,&kp,my_sid->binary))
return WHYF("SID was not found in keyring: %s", alloca_tohex_sid(my_sid->binary));
char seed[1024];
snprintf(seed, sizeof(seed),
"incorrection%sconcentrativeness",
alloca_tohex(keyring->contexts[cn]->identities[in]
->keypairs[kp]->private_key, crypto_box_curve25519xsalsa20poly1305_SECRETKEYBYTES));
int ret = rhizome_get_bundle_from_seed(m, seed);
if (ret<0)
return -1;
// always consider the content encrypted, we don't need to rely on the manifest itself.
m->payloadEncryption=1;
if (m->haveSecret==NEW_BUNDLE_ID){
rhizome_fill_manifest(m, NULL, NULL, NULL);
}
return 0;
}
static struct conversations *add_conv(struct conversations **conv, const sid_t *them){
struct conversations **ptr=conv;
while(*ptr){
int cmp = memcmp((*ptr)->them.binary, them, sizeof((*ptr)->them));
if (cmp==0)
break;
if (cmp<0)
ptr = &(*ptr)->_left;
else
ptr = &(*ptr)->_right;
}
if (!*ptr){
*ptr = emalloc_zero(sizeof(struct conversations));
if (*ptr)
memcpy((*ptr)->them.binary, them->binary, sizeof((*ptr)->them));
}
return *ptr;
}
// find matching conversations
// if their_sid_hex == my_sid_hex, return all conversations with any recipient
static int get_database_conversations(const sid_t *my_sid, const sid_t *their_sid, struct conversations **conv){
const char *my_sid_hex = alloca_tohex_sid(my_sid->binary);
const char *their_sid_hex = alloca_tohex_sid(their_sid?their_sid->binary:my_sid->binary);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry,
"SELECT id, version, filesize, tail, sender, recipient "
"FROM manifests "
"WHERE service = '"RHIZOME_SERVICE_MESHMS2"' "
"AND (sender=?1 or recipient=?1) "
"AND (sender=?2 or recipient=?2)");
if (!statement)
return -1;
int ret = sqlite3_bind_text(statement, 1, my_sid_hex, -1, SQLITE_STATIC);
if (ret!=SQLITE_OK)
goto end;
ret = sqlite3_bind_text(statement, 2, their_sid_hex, -1, SQLITE_STATIC);
if (ret!=SQLITE_OK)
goto end;
if (config.debug.meshms)
DEBUGF("Looking for conversations for %s, %s", my_sid_hex, their_sid_hex);
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
const char *id = (const char *)sqlite3_column_text(statement, 0);
long long version = sqlite3_column_int64(statement, 1);
long long size = sqlite3_column_int64(statement, 2);
long long tail = sqlite3_column_int64(statement, 3);
const char *sender = (const char *)sqlite3_column_text(statement, 4);
const char *recipient = (const char *)sqlite3_column_text(statement, 5);
const char *them = recipient;
if (strcasecmp(them, my_sid_hex)==0)
them=sender;
sid_t their_sid;
fromhex(their_sid.binary, them, sizeof(their_sid));
if (config.debug.meshms)
DEBUGF("found id %s, sender %s, recipient %s", id, sender, recipient);
struct conversations *ptr = add_conv(conv, &their_sid);
if (!ptr)
goto end;
struct ply *p;
if (them==sender){
ptr->found_their_ply=1;
p=&ptr->their_ply;
}else{
ptr->found_my_ply=1;
p=&ptr->my_ply;
}
strncpy(p->bundle_id, id, RHIZOME_MANIFEST_ID_STRLEN+1);
p->version = version;
p->tail = tail;
p->size = size;
}
end:
if (ret!=SQLITE_OK){
WHYF("Query failed: %s", sqlite3_errmsg(rhizome_db));
free_conversations(*conv);
*conv=NULL;
}
sqlite3_finalize(statement);
return (ret==SQLITE_OK)?0:-1;
}
static struct conversations * find_or_create_conv(const sid_t *my_sid, const sid_t *their_sid){
struct conversations *conv=NULL;
if (meshms_conversations_list(my_sid, their_sid, &conv))
return NULL;
if (!conv){
conv = emalloc_zero(sizeof(struct conversations));
bcopy(their_sid->binary, conv->them.binary, sizeof(sid_t));
}
return conv;
}
static int create_ply(const sid_t *my_sid, struct conversations *conv, rhizome_manifest *m){
m->journalTail = 0;
const char *my_sidhex = alloca_tohex_sid(my_sid->binary);
const char *their_sidhex = alloca_tohex_sid(conv->them.binary);
rhizome_manifest_set(m, "service", RHIZOME_SERVICE_MESHMS2);
rhizome_manifest_set(m, "sender", my_sidhex);
rhizome_manifest_set(m, "recipient", their_sidhex);
rhizome_manifest_set_ll(m, "tail", m->journalTail);
if (rhizome_fill_manifest(m, NULL, my_sid, NULL))
return -1;
rhizome_manifest_get(m, "id", conv->my_ply.bundle_id, sizeof(conv->my_ply.bundle_id));
conv->found_my_ply=1;
return 0;
}
static int append_footer(unsigned char *buffer, char type, int payload_len){
payload_len = (payload_len << 4) | (type&0xF);
write_uint16(buffer, payload_len);
return 2;
}
static int ply_read_open(struct ply_read *ply, const char *id, rhizome_manifest *m){
if (config.debug.meshms)
DEBUGF("Opening ply %s", id);
if (rhizome_retrieve_manifest(id, m))
return -1;
if (rhizome_open_decrypt_read(m, NULL, &ply->read, 0))
return -1;
ply->read.offset = ply->read.length = m->fileLength;
return 0;
}
static int ply_read_close(struct ply_read *ply){
if (ply->buffer){
free(ply->buffer);
ply->buffer=NULL;
}
ply->buffer_size=0;
ply->buff.len=0;
return rhizome_read_close(&ply->read);
}
// read the next record from the ply (backwards)
// returns 1 on EOF, -1 on failure
static int ply_read_next(struct ply_read *ply){
ply->record_end_offset=ply->read.offset;
unsigned char footer[2];
ply->read.offset-=sizeof(footer);
if (ply->read.offset<=0){
if (config.debug.meshms)
DEBUGF("EOF");
return 1;
}
if (rhizome_read_buffered(&ply->read, &ply->buff, footer, sizeof(footer)) < sizeof(footer))
return -1;
// (rhizome_read automatically advances the offset by the number of bytes read)
ply->record_length=read_uint16(footer);
ply->type = ply->record_length & 0xF;
ply->record_length = ply->record_length>>4;
if (config.debug.meshms)
DEBUGF("Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset);
// need to allow for advancing the tail and cutting a message in half.
if (ply->record_length + sizeof(footer) > ply->read.offset){
if (config.debug.meshms)
DEBUGF("EOF");
return 1;
}
ply->read.offset -= ply->record_length + sizeof(footer);
uint64_t record_start = ply->read.offset;
if (ply->buffer_size < ply->record_length){
ply->buffer_size = ply->record_length;
unsigned char *b=realloc(ply->buffer, ply->buffer_size);
if (!b)
return WHY("realloc() failed");
ply->buffer = b;
}
int read = rhizome_read(&ply->read, ply->buffer, ply->record_length);
if (read!=ply->record_length)
return WHYF("Expected %d bytes read, got %d", ply->record_length, read);
ply->read.offset = record_start;
return 0;
}
// keep reading past messages until you find this type.
static int ply_find_next(struct ply_read *ply, char type){
while(1){
int ret = ply_read_next(ply);
if (ret || ply->type==type)
return ret;
}
}
static int append_meshms_buffer(const sid_t *my_sid, struct conversations *conv, unsigned char *buffer, int len){
int ret=-1;
rhizome_manifest *mout = NULL;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
if (conv->found_my_ply){
if (rhizome_retrieve_manifest(conv->my_ply.bundle_id, m))
goto end;
if (rhizome_find_bundle_author(m))
goto end;
}else{
if (create_ply(my_sid, conv, m))
goto end;
}
if (rhizome_append_journal_buffer(m, NULL, 0, buffer, len))
goto end;
if (rhizome_manifest_finalise(m, &mout))
goto end;
ret=0;
end:
if (mout && mout!=m)
rhizome_manifest_free(mout);
if (m)
rhizome_manifest_free(m);
return ret;
}
// update if any conversations are unread or need to be acked.
// return -1 for failure, 1 if the conversation index needs to be saved.
static int update_conversation(const sid_t *my_sid, struct conversations *conv){
if (config.debug.meshms)
DEBUG("Checking if conversation needs to be acked");
// Nothing to be done if they have never sent us anything
if (!conv->found_their_ply)
return 0;
rhizome_manifest *m_ours = NULL;
rhizome_manifest *m_theirs = rhizome_new_manifest();
if (!m_theirs)
return -1;
struct ply_read ply;
bzero(&ply, sizeof(ply));
int ret=-1;
if (config.debug.meshms)
DEBUG("Locating their last message");
// find the offset of their last message
if (rhizome_retrieve_manifest(conv->their_ply.bundle_id, m_theirs))
goto end;
if (ply_read_open(&ply, conv->their_ply.bundle_id, m_theirs))
goto end;
ret = ply_find_next(&ply, MESHMS_BLOCK_TYPE_MESSAGE);
if (ret!=0){
// no messages indicates that we didn't do anthing
if (ret>0)
ret=0;
goto end;
}
if (conv->their_last_message == ply.record_end_offset){
// nothing has changed since last time
ret=0;
goto end;
}
conv->their_last_message = ply.record_end_offset;
if (config.debug.meshms)
DEBUGF("Found last message @%"PRId64, conv->their_last_message);
ply_read_close(&ply);
// find our previous ack
uint64_t previous_ack = 0;
if (conv->found_my_ply){
if (config.debug.meshms)
DEBUG("Locating our previous ack");
m_ours = rhizome_new_manifest();
if (!m_ours)
goto end;
if (rhizome_retrieve_manifest(conv->my_ply.bundle_id, m_ours))
goto end;
if (ply_read_open(&ply, conv->my_ply.bundle_id, m_ours))
goto end;
ret = ply_find_next(&ply, MESHMS_BLOCK_TYPE_ACK);
if (ret<0)
goto end;
if (ret==0){
if (unpack_uint(ply.buffer, ply.record_length, &previous_ack)<0)
previous_ack=0;
}
if (config.debug.meshms)
DEBUGF("Previous ack is %"PRId64, previous_ack);
ply_read_close(&ply);
}else{
if (config.debug.meshms)
DEBUGF("No outgoing ply");
}
if (previous_ack >= conv->their_last_message){
// their last message has already been acked
ret=1;
goto end;
}
// append an ack for their message
if (config.debug.meshms)
DEBUGF("Creating ACK for %"PRId64" - %"PRId64, previous_ack, conv->their_last_message);
unsigned char buffer[24];
int ofs=0;
ofs+=pack_uint(&buffer[ofs], conv->their_last_message);
if (previous_ack)
ofs+=pack_uint(&buffer[ofs], conv->their_last_message - previous_ack);
ofs+=append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_ACK, ofs);
ret = append_meshms_buffer(my_sid, conv, buffer, ofs);
end:
ply_read_close(&ply);
if (m_ours)
rhizome_manifest_free(m_ours);
if (m_theirs)
rhizome_manifest_free(m_theirs);
// if it's all good, remember the size of their ply at the time we examined it.
if (ret>=0)
conv->their_size = conv->their_ply.size;
return ret;
}
// update conversations, and return 1 if the conversation index should be saved
static int update_conversations(const sid_t *my_sid, struct conversations *conv){
if (!conv)
return 0;
int ret = 0;
if (update_conversations(my_sid, conv->_left))
ret=1;
if (conv->their_size != conv->their_ply.size){
if (update_conversation(my_sid, conv)>0)
ret=1;
}
if (update_conversations(my_sid, conv->_right))
ret=1;
return ret;
}
// read our cached conversation list from our rhizome payload
// if we can't load the existing data correctly, just ignore it.
static int read_known_conversations(rhizome_manifest *m, const sid_t *their_sid, struct conversations **conv){
if (m->haveSecret==NEW_BUNDLE_ID)
return 0;
struct rhizome_read read;
bzero(&read, sizeof(read));
struct rhizome_read_buffer buff;
bzero(&buff, sizeof(buff));
int ret = rhizome_open_decrypt_read(m, NULL, &read, 0);
if (ret<0)
goto end;
unsigned char version=0xFF;
ret=rhizome_read_buffered(&read, &buff, &version, 1);
if (version!=1){
WARN("Expected version 1");
goto end;
}
while (1){
sid_t sid;
ret=rhizome_read_buffered(&read, &buff, sid.binary, sizeof(sid));
if (ret<sizeof(sid))
break;
if (config.debug.meshms)
DEBUGF("Reading existing conversation for %s", alloca_tohex_sid(sid.binary));
if (their_sid && memcmp(sid.binary, their_sid->binary, sizeof(sid)))
continue;
struct conversations *ptr = add_conv(conv, &sid);
if (!ptr)
goto end;
unsigned char details[8*3];
ret = rhizome_read_buffered(&read, &buff, details, sizeof(details));
if (ret<0)
break;
int bytes=ret;
int ofs=0;
ret=unpack_uint(details, bytes, &ptr->their_last_message);
if (ret<0)
break;
ofs+=ret;
ret=unpack_uint(details+ofs,bytes-ofs, &ptr->read_offset);
if (ret<0)
break;
ofs+=ret;
ret=unpack_uint(details+ofs,bytes-ofs, &ptr->their_size);
if (ret<0)
break;
ofs+=ret;
read.offset += ofs - bytes;
}
end:
rhizome_read_close(&read);
return 0;
}
static int write_conversation(struct rhizome_write *write, struct conversations *conv){
int len=0;
if (!conv)
return len;
{
unsigned char buffer[sizeof(conv->them) + (8*3)];
if (write)
bcopy(conv->them.binary, buffer, sizeof(conv->them));
len+=sizeof(conv->them);
if (write){
len+=pack_uint(&buffer[len], conv->their_last_message);
len+=pack_uint(&buffer[len], conv->read_offset);
len+=pack_uint(&buffer[len], conv->their_size);
int ret=rhizome_write_buffer(write, buffer, len);
if (ret<0)
return ret;
}else{
len+=measure_packed_uint(conv->their_last_message);
len+=measure_packed_uint(conv->read_offset);
len+=measure_packed_uint(conv->their_size);
}
DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %d",
alloca_tohex_sid(conv->them.binary),
conv->their_last_message,
conv->read_offset,
conv->their_size,
len);
}
// write the two child nodes
int ret=write_conversation(write, conv->_left);
if (ret<0)
return ret;
len+=ret;
ret=write_conversation(write, conv->_right);
if (ret<0)
return ret;
len+=ret;
return len;
}
static int write_known_conversations(rhizome_manifest *m, struct conversations *conv){
rhizome_manifest *mout=NULL;
struct rhizome_write write;
bzero(&write, sizeof(write));
int ret=-1;
// TODO rebalance tree...
// measure the final payload first
int len=write_conversation(NULL, conv);
if (len<0)
goto end;
// then write it
m->version++;
rhizome_manifest_set_ll(m,"version",m->version);
m->fileLength = len+1;
rhizome_manifest_set_ll(m,"filesize",m->fileLength);
if (rhizome_write_open_manifest(&write, m))
goto end;
unsigned char version=1;
if (rhizome_write_buffer(&write, &version, 1)<0)
goto end;
if (write_conversation(&write, conv)<0)
goto end;
if (rhizome_finish_write(&write))
goto end;
strlcpy(m->fileHexHash, write.id, SHA512_DIGEST_STRING_LENGTH);
rhizome_manifest_set(m, "filehash", m->fileHexHash);
if (rhizome_manifest_finalise(m,&mout))
goto end;
ret=0;
end:
if (ret)
rhizome_fail_write(&write);
if (mout && m!=mout)
rhizome_manifest_free(mout);
return ret;
}
// read information about existing conversations from a rhizome payload
static int meshms_conversations_list(const sid_t *my_sid, const sid_t *their_sid, struct conversations **conv){
int ret=-1;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
if (get_my_conversation_bundle(my_sid, m))
goto end;
// read conversations payload
if (read_known_conversations(m, their_sid, conv))
goto end;
if (get_database_conversations(my_sid, their_sid, conv))
goto end;
if (update_conversations(my_sid, *conv) && !their_sid){
if (write_known_conversations(m, *conv))
goto end;
}
ret=0;
end:
rhizome_manifest_free(m);
return ret;
}
// recursively traverse the conversation tree in sorted order and output the details of each conversation
static int output_conversations(struct cli_context *context, struct conversations *conv,
int output, int offset, int count){
if (!conv)
return 0;
int traverse_count = output_conversations(context, conv->_left, output, offset, count);
if (count <0 || output + traverse_count < offset + count){
if (output + traverse_count >= offset){
cli_put_long(context, output + traverse_count, ":");
cli_put_hexvalue(context, conv->them.binary, sizeof(conv->them), ":");
cli_put_string(context, conv->read_offset < conv->their_last_message ? "unread":"", ":");
cli_put_long(context, conv->their_last_message, ":");
cli_put_long(context, conv->read_offset, "\n");
}
traverse_count++;
}
traverse_count += output_conversations(context, conv->_right, output + traverse_count, offset, count);
return traverse_count;
}
// output the list of existing conversations for a given local identity
int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_context *context){
const char *sidhex, *offset_str, *count_str;
if (cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "offset", &offset_str, NULL, "0")==-1
|| cli_arg(parsed, "count", &count_str, NULL, "-1")==-1)
return -1;
sid_t sid;
fromhex(sid.binary, sidhex, sizeof(sid.binary));
int offset=atoi(offset_str);
int count=atoi(count_str);
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1)
return -1;
struct conversations *conv=NULL;
if (meshms_conversations_list(&sid, NULL, &conv))
return -1;
const char *names[]={
"_id","recipient","read", "last_message", "read_offset"
};
cli_columns(context, 5, names);
int rows = output_conversations(context, conv, 0, offset, count);
cli_row_count(context, rows);
free_conversations(conv);
return 0;
}
int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context *context){
const char *my_sidhex, *their_sidhex, *message;
if (cli_arg(parsed, "sender_sid", &my_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "payload", &message, NULL, "") == -1)
return -1;
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1)
return -1;
sid_t my_sid, their_sid;
fromhex(my_sid.binary, my_sidhex, sizeof(my_sid.binary));
fromhex(their_sid.binary, their_sidhex, sizeof(their_sid.binary));
struct conversations *conv=find_or_create_conv(&my_sid, &their_sid);
if (!conv)
return -1;
// construct a message payload
int message_len = strlen(message)+1;
// TODO, new format here.
unsigned char buffer[message_len+3];
strcpy((char*)buffer, message); // message
message_len+=append_footer(buffer+message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len);
int ret = append_meshms_buffer(&my_sid, conv, buffer, message_len);
free_conversations(conv);
return ret;
}
int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context *context){
const char *my_sidhex, *their_sidhex;
if (cli_arg(parsed, "sender_sid", &my_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, "") == -1)
return -1;
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1)
return -1;
sid_t my_sid, their_sid;
fromhex(my_sid.binary, my_sidhex, sizeof(my_sid.binary));
fromhex(their_sid.binary, their_sidhex, sizeof(their_sid.binary));
struct conversations *conv=find_or_create_conv(&my_sid, &their_sid);
if (!conv)
return -1;
int ret=-1;
const char *names[]={
"_id","offset","type","message"
};
cli_columns(context, 4, names);
rhizome_manifest *m_ours=NULL, *m_theirs=NULL;
struct ply_read read_ours, read_theirs;
// if we've never sent a message, (or acked theirs), there is nothing to show
if (!conv->found_my_ply){
ret=0;
cli_row_count(context, 0);
if (config.debug.meshms)
DEBUGF("Did not find my ply");
goto end;
}
// start reading messages from both ply's in reverse order
bzero(&read_ours, sizeof(read_ours));
bzero(&read_theirs, sizeof(read_theirs));
m_ours = rhizome_new_manifest();
if (!m_ours)
goto end;
if (ply_read_open(&read_ours, conv->my_ply.bundle_id, m_ours))
goto end;
uint64_t their_last_ack=0;
uint64_t their_ack_offset=0;
int64_t unread_mark=conv->read_offset;
if (conv->found_their_ply){
rhizome_manifest *m_theirs = rhizome_new_manifest();
if (!m_theirs)
goto end;
if (ply_read_open(&read_theirs, conv->their_ply.bundle_id, m_theirs))
goto end;
// find their last ACK so we know if messages have been received
int r = ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_ACK);
if (r==0){
if (unpack_uint(read_theirs.buffer, read_theirs.record_length, &their_last_ack)<0)
their_last_ack=0;
else
their_ack_offset = read_theirs.record_end_offset;
if (config.debug.meshms)
DEBUGF("Found their last ack @%"PRId64, their_last_ack);
}
}
int id=0;
while(ply_read_next(&read_ours)==0){
if (config.debug.meshms)
DEBUGF("Offset %"PRId64", type %d, read_offset %"PRId64, read_ours.read.offset, read_ours.type, conv->read_offset);
if (their_last_ack && their_last_ack >= read_ours.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, their_ack_offset, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
their_last_ack = 0;
}
switch(read_ours.type){
case MESHMS_BLOCK_TYPE_ACK:
// read their message list, and insert all messages that are included in the ack range
if (conv->found_their_ply){
int ofs=unpack_uint(read_ours.buffer, read_ours.record_length, (uint64_t*)&read_theirs.read.offset);
if (ofs<0)
break;
uint64_t end_range;
int x = unpack_uint(read_ours.buffer+ofs, read_ours.record_length - ofs, &end_range);
if (x<0)
end_range=0;
else
end_range = read_theirs.read.offset - end_range;
// TODO tail
// just incase we don't have the full bundle anymore
if (read_theirs.read.offset > read_theirs.read.length)
read_theirs.read.offset = read_theirs.read.length;
if (config.debug.meshms)
DEBUGF("Reading other log from %"PRId64", to %"PRId64, read_theirs.read.offset, end_range);
while(ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_MESSAGE)==0){
if (read_theirs.read.offset < end_range)
break;
if (unread_mark >= (int64_t)read_theirs.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, unread_mark, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");
unread_mark = -1;
}
cli_put_long(context, id++, ":");
cli_put_long(context, read_theirs.record_end_offset, ":");
cli_put_string(context, "<", ":");
cli_put_string(context, (char *)read_theirs.buffer, "\n");
}
}
break;
case MESHMS_BLOCK_TYPE_MESSAGE:
// TODO new message format here
cli_put_long(context, id++, ":");
cli_put_long(context, read_ours.record_end_offset, ":");
cli_put_string(context, ">", ":");
cli_put_string(context, (char *)read_ours.buffer, "\n");
break;
}
}
cli_row_count(context, id);
ret=0;
end:
if (m_ours){
rhizome_manifest_free(m_ours);
ply_read_close(&read_ours);
}
if (m_theirs){
rhizome_manifest_free(m_theirs);
ply_read_close(&read_theirs);
}
free_conversations(conv);
return ret;
}
static int mark_read(struct conversations *conv, const sid_t *their_sid, const char *offset_str){
int ret=0;
if (conv){
int cmp = their_sid?memcmp(conv->them.binary, their_sid->binary, sizeof(sid_t)):0;
if (!their_sid || cmp<0){
ret+=mark_read(conv->_left, their_sid, offset_str);
}
if (!their_sid || cmp==0){
// update read offset
// - never rewind
// - never past their last message
uint64_t offset = conv->their_last_message;
if (offset_str){
uint64_t x = atol(offset_str);
if (x<offset)
offset=x;
}
if (offset > conv->read_offset){
if (config.debug.meshms)
DEBUGF("Moving read marker for %s, from %"PRId64" to %"PRId64,
alloca_tohex_sid(conv->them.binary), conv->read_offset, offset);
conv->read_offset = offset;
ret++;
}
}
if (!their_sid || cmp>0){
ret+=mark_read(conv->_right, their_sid, offset_str);
}
}
return ret;
}
int app_meshms_mark_read(const struct cli_parsed *parsed, struct cli_context *context){
const char *my_sidhex, *their_sidhex, *offset_str;
if (cli_arg(parsed, "sender_sid", &my_sidhex, str_is_subscriber_id, "") == -1
|| cli_arg(parsed, "recipient_sid", &their_sidhex, str_is_subscriber_id, NULL) == -1
|| cli_arg(parsed, "offset", &offset_str, NULL, NULL)==-1)
return -1;
if (create_serval_instance_dir() == -1)
return -1;
if (!(keyring = keyring_open_instance_cli(parsed)))
return -1;
if (rhizome_opendb() == -1)
return -1;
sid_t my_sid, their_sid;
fromhex(my_sid.binary, my_sidhex, sizeof(my_sid.binary));
if (their_sidhex)
fromhex(their_sid.binary, their_sidhex, sizeof(their_sid.binary));
int ret=-1;
struct conversations *conv=NULL;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
if (get_my_conversation_bundle(&my_sid, m))
goto end;
// read all conversations, so we can write them again
if (read_known_conversations(m, NULL, &conv))
goto end;
// read the full list of conversations from the database too
if (get_database_conversations(&my_sid, NULL, &conv))
goto end;
// check if any incoming conversations need to be acked or have new messages and update the read offset
int changed = update_conversations(&my_sid, conv);
if (mark_read(conv, their_sidhex?&their_sid:NULL, offset_str))
changed =1;
if (changed){
// save the conversation list
if (write_known_conversations(m, conv))
goto end;
}
ret=0;
end:
if (m)
rhizome_manifest_free(m);
free_conversations(conv);
return ret;
}

View File

@ -276,6 +276,40 @@ int ob_append_ui64(struct overlay_buffer *b, uint64_t v)
return 0;
}
int measure_packed_uint(uint64_t v){
int ret=0;
do{
v>>=7;
ret++;
}while(v);
return ret;
}
int pack_uint(unsigned char *buffer, uint64_t v){
int ret=0;
do{
*buffer++=(v&0x7f) | (v>0x7f?0x80:0);
v>>=7;
ret++;
}while(v);
return ret;
}
int unpack_uint(unsigned char *buffer, int buff_size, uint64_t *v){
int i=0;
*v=0;
while(1){
if (i>=buff_size)
return -1;
char byte = buffer[i];
*v |= (byte&0x7f)<<(i*7);
i++;
if (!(byte&0x80))
break;
}
return i;
}
int ob_append_packed_ui32(struct overlay_buffer *b, uint32_t v)
{
do{

View File

@ -137,36 +137,22 @@ int rhizome_bundle_import_files(rhizome_manifest *m, const char *manifest_path,
// This feels like a hack...
m->manifest_bytes=m->manifest_all_bytes;
/* Do we already have this manifest or newer? */
int64_t dbVersion = -1;
const char *id=rhizome_manifest_get(m, "id", NULL, 0);
if (sqlite_exec_int64(&dbVersion, "SELECT version FROM MANIFESTS WHERE id='%s';", id) == -1)
return WHY("Select failure");
if (dbVersion>=m->version)
return 2;
int status = rhizome_import_file(m, filepath);
if (status<0)
return status;
status = rhizome_manifest_check_duplicate(m, NULL, 0);
if (status)
return status;
return rhizome_add_manifest(m, 1);
}
/* Import a bundle from a finalised manifest struct. The dataFileName element must give the path
of a readable file containing the payload unless the payload is null (zero length). The logic is
all in rhizome_add_manifest(). This function just wraps that function and manages object buffers
and lifetimes.
*/
int rhizome_bundle_import(rhizome_manifest *m, int ttl)
{
if (config.debug.rhizome)
DEBUGF("(m=%p, ttl=%d)", m, ttl);
int ret = rhizome_manifest_check_duplicate(m, NULL, 0);
if (ret == 0) {
ret = rhizome_add_manifest(m, ttl);
if (ret == -1)
WHY("rhizome_add_manifest() failed");
}
return ret;
}
int rhizome_manifest_check_sanity(rhizome_manifest *m_in)
{
/* Ensure manifest meets basic sanity checks. */
@ -188,7 +174,8 @@ int rhizome_manifest_check_sanity(rhizome_manifest *m_in)
const char *name = rhizome_manifest_get(m_in, "name", NULL, 0);
if (name == NULL)
return WHY("Manifest missing 'name' field");
} else if (strcasecmp(service, RHIZOME_SERVICE_MESHMS) == 0) {
} else if (strcasecmp(service, RHIZOME_SERVICE_MESHMS) == 0
|| strcasecmp(service, RHIZOME_SERVICE_MESHMS2) == 0) {
if (sender == NULL || !sender[0])
return WHY("MeshMS Manifest missing 'sender' field");
if (!str_is_subscriber_id(sender))
@ -226,7 +213,6 @@ int rhizome_manifest_bind_id(rhizome_manifest *m_in)
manifests on receiver nodes works easily. We might implement something that strips the id
variable out of the manifest when sending it, or some other scheme to avoid sending all the
extra bytes. */
rhizome_manifest_set(m_in, "id", alloca_tohex_bid(m_in->cryptoSignPublic));
if (!is_sid_any(m_in->author)) {
/* Set the BK using the provided authorship information.
Serval Security Framework defines BK as being:
@ -255,30 +241,6 @@ int rhizome_manifest_bind_id(rhizome_manifest *m_in)
return 0;
}
/* Check if a manifest is already stored for the same payload with the same details.
This catches the case of "rhizome add file <filename>" on the same file more than once.
(Debounce!) */
int rhizome_manifest_check_duplicate(rhizome_manifest *m_in, rhizome_manifest **m_out, int check_author)
{
if (config.debug.rhizome) DEBUG("Checking for duplicate");
if (m_out) *m_out = NULL;
rhizome_manifest *dupm = NULL;
if (rhizome_find_duplicate(m_in, &dupm, check_author) == -1)
return WHY("Errors encountered searching for duplicate manifest");
if (dupm) {
/* If the caller wants the duplicate manifest, it must be finalised, otherwise discarded. */
if (m_out) {
*m_out = dupm;
}
else
rhizome_manifest_free(dupm);
if (config.debug.rhizome) DEBUG("Found a duplicate");
return 2;
}
if (config.debug.rhizome) DEBUG("No duplicate found");
return 0;
}
int rhizome_add_manifest(rhizome_manifest *m_in,int ttl)
{
if (config.debug.rhizome)

View File

@ -108,6 +108,8 @@ typedef struct rhizome_manifest {
except the creator. */
unsigned char cryptoSignPublic[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES];
unsigned char cryptoSignSecret[crypto_sign_edwards25519sha512batch_SECRETKEYBYTES];
/* Whether we have the secret for this manifest on hand */
int haveSecret;
int var_count;
char *vars[MAX_MANIFEST_VARS];
@ -121,7 +123,13 @@ typedef struct rhizome_manifest {
*/
unsigned char signatureTypes[MAX_MANIFEST_VARS];
int errors; /* if non-zero, then manifest should not be trusted */
// errors only involve the correctness of fields that are mandatory for
// proper operation of the transport and storage layer
int errors;
// a warning indicates that the manifest cannot be perfectly understood by this version of rhizome
// during add, the manifest should not be finalised and imported
// during extract an error should be displayed.
int warnings;
time_ms_t inserttime;
/* Set non-zero after variables have been packed and
@ -149,8 +157,6 @@ typedef struct rhizome_manifest {
unsigned char payloadKey[RHIZOME_CRYPT_KEY_BYTES];
unsigned char payloadNonce[crypto_stream_xsalsa20_NONCEBYTES];
/* Whether we have the secret for this manifest on hand */
int haveSecret;
/* Whether the manifest contains a signature that corresponds to the
manifest id (ie public key) */
int selfSigned;
@ -174,6 +180,7 @@ typedef struct rhizome_manifest {
*/
#define RHIZOME_SERVICE_FILE "file"
#define RHIZOME_SERVICE_MESHMS "MeshMS1"
#define RHIZOME_SERVICE_MESHMS2 "MeshMS2"
extern int64_t rhizome_space;
extern unsigned short rhizome_http_server_port;
@ -210,6 +217,7 @@ struct rhizome_cleanup_report {
int rhizome_cleanup(struct rhizome_cleanup_report *report);
int rhizome_manifest_createid(rhizome_manifest *m);
int rhizome_get_bundle_from_seed(rhizome_manifest *m, const char *seed);
int rhizome_strn_is_manifest_id(const char *text);
int rhizome_str_is_manifest_id(const char *text);
int rhizome_strn_is_bundle_key(const char *text);
@ -258,12 +266,10 @@ int rhizome_manifest_add_group(rhizome_manifest *m,char *groupid);
int rhizome_clean_payload(const char *fileidhex);
int rhizome_store_file(rhizome_manifest *m,const unsigned char *key);
int rhizome_bundle_import_files(rhizome_manifest *m, const char *manifest_path, const char *filepath);
int rhizome_bundle_import(rhizome_manifest *m, int ttl);
int rhizome_fill_manifest(rhizome_manifest *m, const char *filepath, const sid_t *authorSid, rhizome_bk_t *bsk);
int rhizome_manifest_verify(rhizome_manifest *m);
int rhizome_manifest_check_sanity(rhizome_manifest *m_in);
int rhizome_manifest_check_duplicate(rhizome_manifest *m_in,rhizome_manifest **m_out, int check_author);
int rhizome_manifest_bind_id(rhizome_manifest *m_in);
int rhizome_manifest_finalise(rhizome_manifest *m, rhizome_manifest **mout);
@ -320,7 +326,7 @@ int _sqlite_vexec_strbuf_retry(struct __sourceloc, sqlite_retry_state *retry, st
double rhizome_manifest_get_double(rhizome_manifest *m,char *var,double default_value);
int rhizome_manifest_extract_signature(rhizome_manifest *m,int *ofs);
int rhizome_update_file_priority(const char *fileid);
int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found, int check_author);
int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found);
int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar);
int64_t rhizome_bar_version(const unsigned char *bar);
uint64_t rhizome_bar_bidprefix_ll(unsigned char *bar);
@ -413,6 +419,12 @@ struct rhizome_write{
sqlite3_blob *sql_blob;
};
struct rhizome_read_buffer{
uint64_t offset;
unsigned char data[RHIZOME_CRYPT_PAGE_SIZE];
int len;
};
struct rhizome_read{
char id[SHA512_DIGEST_STRING_LENGTH+1];
@ -685,6 +697,7 @@ int rhizome_exists(const char *fileHash);
int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int64_t file_length, int priority);
int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffer, int data_size);
int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsigned char *buffer, int data_size);
int rhizome_write_open_manifest(struct rhizome_write *write, rhizome_manifest *m);
int rhizome_write_file(struct rhizome_write *write, const char *filename);
int rhizome_fail_write(struct rhizome_write *write);
int rhizome_finish_write(struct rhizome_write *write);
@ -701,6 +714,7 @@ int rhizome_crypt_xor_block(unsigned char *buffer, int buffer_size, int64_t stre
const unsigned char *key, const unsigned char *nonce);
int rhizome_open_read(struct rhizome_read *read, const char *fileid, int hash);
int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_length);
int rhizome_read_buffered(struct rhizome_read *read, struct rhizome_read_buffer *buffer, unsigned char *data, int len);
int rhizome_read_close(struct rhizome_read *read);
int rhizome_store_delete(const char *id);
int rhizome_open_decrypt_read(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhizome_read *read_state, int hash);

View File

@ -84,29 +84,22 @@ int rhizome_manifest_verify(rhizome_manifest *m)
else return 0;
}
int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bufferP)
int read_whole_file(const char *filename, unsigned char *buffer, int buffer_size)
{
FILE *f = fopen(filename, "r");
if (f == NULL)
return WHYF("Could not open file %s for reading", filename);
int ret = fread(buffer, 1, buffer_size, f);
if (ret == -1)
ret = WHY_perror("fread");
if (fclose(f) == EOF)
ret = WHY_perror("fclose");
return ret;
}
int rhizome_manifest_parse(rhizome_manifest *m)
{
IN();
if (bufferP>MAX_MANIFEST_BYTES) RETURN(WHY("Buffer too big"));
if (!m) RETURN(WHY("Null manifest"));
if (bufferP) {
m->manifest_bytes=bufferP;
memcpy(m->manifestdata, filename, m->manifest_bytes);
} else {
FILE *f = fopen(filename, "r");
if (f == NULL)
RETURN(WHYF("Could not open manifest file %s for reading.", filename));
m->manifest_bytes = fread(m->manifestdata, 1, MAX_MANIFEST_BYTES, f);
int ret = 0;
if (m->manifest_bytes == -1)
ret = WHY_perror("fread");
if (fclose(f) == EOF)
ret = WHY_perror("fclose");
if (ret == -1)
RETURN(-1);
}
m->manifest_all_bytes=m->manifest_bytes;
m->var_count=0;
m->journalTail=-1;
@ -118,6 +111,7 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
int have_date = 0;
int have_filesize = 0;
int have_filehash = 0;
int ofs = 0;
while (ofs < m->manifest_bytes && m->manifestdata[ofs]) {
char line[1024];
@ -143,9 +137,7 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
p = strchr(line, '=');
if (p == NULL || p == line) {
m->errors++;
WARNF(bufferP ? "Malformed manifest line in buffer %p: %s"
: "Malformed manifest line in file %s: %s",
filename, alloca_toprint(80, line, linelen));
WARNF("Malformed manifest line: %s", alloca_toprint(80, line, linelen));
} else {
*p++ = '\0';
char *var = line;
@ -161,6 +153,9 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
} else {
m->vars[m->var_count] = strdup(var);
m->values[m->var_count] = strdup(value);
// if any of these fields are not well formed, the manifest is invalid and cannot be imported
if (strcasecmp(var, "id") == 0) {
have_id = 1;
if (fromhexstr(m->cryptoSignPublic, value, RHIZOME_MANIFEST_ID_BYTES) == -1) {
@ -182,15 +177,6 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
str_toupper_inplace(m->values[m->var_count]);
strcpy(m->fileHexHash, m->values[m->var_count]);
}
} else if (strcasecmp(var, "BK") == 0) {
if (!rhizome_str_is_bundle_key(value)) {
if (config.debug.rejecteddata)
WARNF("Invalid BK: %s", value);
m->errors++;
} else {
/* Force to upper case to avoid case sensitive comparison problems later. */
str_toupper_inplace(m->values[m->var_count]);
}
} else if (strcasecmp(var, "filesize") == 0) {
have_filesize = 1;
char *ep = value;
@ -202,14 +188,6 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
} else {
m->fileLength = filesize;
}
} else if (strcasecmp(var, "service") == 0) {
have_service = 1;
if ( strcasecmp(value, RHIZOME_SERVICE_FILE) == 0
|| strcasecmp(value, RHIZOME_SERVICE_MESHMS) == 0) {
} else {
INFOF("Unsupported service: %s", value);
// This is not an error... older rhizome nodes must carry newer manifests.
}
} else if (strcasecmp(var, "version") == 0) {
have_version = 1;
char *ep = value;
@ -221,6 +199,40 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
} else {
m->version = version;
}
// since rhizome *MUST* be able to carry future manifest versions
// if any of these fields are not well formed, the manifest can still be imported and exported
// but the bundle should not be added or exported
} else if (strcasecmp(var, "tail") == 0) {
char *ep = value;
long long tail = strtoll(value, &ep, 10);
if (ep == value || *ep || tail < 0) {
if (config.debug.rejecteddata)
WARNF("Invalid tail: %s", value);
m->warnings++;
} else {
m->journalTail = tail;
}
} else if (strcasecmp(var, "BK") == 0) {
if (!rhizome_str_is_bundle_key(value)) {
if (config.debug.rejecteddata)
WARNF("Invalid BK: %s", value);
m->warnings++;
} else {
/* Force to upper case to avoid case sensitive comparison problems later. */
str_toupper_inplace(m->values[m->var_count]);
}
} else if (strcasecmp(var, "service") == 0) {
have_service = 1;
if ( strcasecmp(value, RHIZOME_SERVICE_FILE) == 0
|| strcasecmp(value, RHIZOME_SERVICE_MESHMS) == 0
|| strcasecmp(value, RHIZOME_SERVICE_MESHMS2) == 0) {
} else {
if (config.debug.rejecteddata)
WARNF("Unsupported service: %s", value);
m->warnings++;
}
} else if (strcasecmp(var, "date") == 0) {
have_date = 1;
char *ep = value;
@ -228,14 +240,14 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
if (ep == value || *ep || date < 0) {
if (config.debug.rejecteddata)
WARNF("Invalid date: %s", value);
m->errors++;
m->warnings++;
}
// TODO: store date in manifest struct
} else if (strcasecmp(var, "sender") == 0 || strcasecmp(var, "recipient") == 0) {
if (!str_is_subscriber_id(value)) {
if (config.debug.rejecteddata)
WARNF("Invalid %s: %s", var, value);
m->errors++;
m->warnings++;
} else {
/* Force to upper case to avoid case sensitive comparison problems later. */
str_toupper_inplace(m->values[m->var_count]);
@ -244,30 +256,18 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
if (value[0] == '\0') {
if (config.debug.rejecteddata)
WARN("Empty name");
m->errors++;
m->warnings++;
}
// TODO: complain if service is not MeshMS
} else if (strcasecmp(var, "crypt") == 0) {
if (!(strcmp(value, "0") == 0 || strcmp(value, "1") == 0)) {
if (config.debug.rejecteddata)
WARNF("Invalid crypt: %s", value);
m->errors++;
m->warnings++;
} else {
m->payloadEncryption = atoi(value);
}
} else if (strcasecmp(var, "tail") == 0) {
char *ep = value;
long long tail = strtoll(value, &ep, 10);
if (ep == value || *ep || tail < 0) {
if (config.debug.rejecteddata)
WARNF("Invalid tail: %s", value);
m->errors++;
} else {
m->journalTail = tail;
}
} else {
INFOF("Unsupported field: %s=%s", var, value);
// This is not an error... older rhizome nodes must carry newer manifests.
// An unknown field is not an error... older rhizome nodes must carry newer manifests.
}
m->var_count++;
}
@ -281,11 +281,7 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
int end_of_text=ofs;
m->manifest_bytes = end_of_text;
if (!have_service) {
if (config.debug.rejecteddata)
WARNF("Missing service field");
m->errors++;
}
// verify that all required fields are consistent.
if (!have_id) {
if (config.debug.rejecteddata)
WARNF("Missing manifest id field");
@ -296,11 +292,6 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
WARNF("Missing version field");
m->errors++;
}
if (!have_date) {
if (config.debug.rejecteddata)
WARNF("Missing date field");
m->errors++;
}
if (!have_filesize) {
if (config.debug.rejecteddata)
WARNF("Missing filesize field");
@ -317,9 +308,21 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
m->errors++;
}
// warn if expected fields are missing
if (!have_service) {
if (config.debug.rejecteddata)
WARNF("Missing service field");
m->warnings++;
}
if (!have_date) {
if (config.debug.rejecteddata)
WARNF("Missing date field");
m->warnings++;
}
// TODO Determine group membership here.
if (m->errors) {
if (m->errors || m->warnings) {
if (config.debug.rejecteddata)
dump("manifest body",m->manifestdata,m->manifest_bytes);
}
@ -328,6 +331,24 @@ int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bu
OUT();
}
int rhizome_read_manifest_file(rhizome_manifest *m, const char *filename, int bufferP)
{
if (!m)
return WHY("Null manifest");
if (bufferP>sizeof(m->manifestdata))
return WHY("Buffer too big");
if (bufferP) {
m->manifest_bytes=bufferP;
memcpy(m->manifestdata, filename, m->manifest_bytes);
} else {
m->manifest_bytes = read_whole_file(filename, m->manifestdata, sizeof(m->manifestdata));
if (m->manifest_bytes == -1)
return -1;
}
return rhizome_manifest_parse(m);
}
int rhizome_hash_file(rhizome_manifest *m,const char *filename,char *hash_out)
{
/* Gnarf! NaCl's crypto_hash() function needs the whole file passed in in one
@ -680,11 +701,8 @@ int rhizome_manifest_finalise(rhizome_manifest *m, rhizome_manifest **mout)
// if a manifest was supplied with an ID, don't bother to check for a duplicate.
// we only want to filter out added files with no existing manifest.
if (m->haveSecret==NEW_BUNDLE_ID){
if (rhizome_manifest_check_duplicate(m, mout, 1) == 2) {
/* duplicate found -- verify it so that we can write it out later */
rhizome_manifest_verify(*mout);
if (rhizome_find_duplicate(m, mout)==1)
RETURN(2);
}
}
*mout=m;
@ -759,20 +777,22 @@ int rhizome_fill_manifest(rhizome_manifest *m, const char *filepath, const sid_t
rhizome_manifest_set_ll(m,"version",m->version);
}
const char *id = rhizome_manifest_get(m, "id", NULL, 0);
if (id == NULL) {
if (config.debug.rhizome) DEBUG("creating new bundle");
if (rhizome_manifest_bind_id(m) == -1) {
return WHY("Could not bind manifest to an ID");
if (!m->haveSecret){
const char *id = rhizome_manifest_get(m, "id", NULL, 0);
if (id == NULL) {
if (config.debug.rhizome) DEBUG("creating new bundle");
if (rhizome_manifest_bind_id(m) == -1) {
return WHY("Could not bind manifest to an ID");
}
} else {
if (config.debug.rhizome) DEBUGF("modifying existing bundle bid=%s", id);
// Modifying an existing bundle. Make sure we can find the bundle secret.
if (rhizome_extract_privatekey_required(m, bsk))
return -1;
// TODO assert that new version > old version?
}
} else {
if (config.debug.rhizome) DEBUGF("modifying existing bundle bid=%s", id);
// Modifying an existing bundle. Make sure we can find the bundle secret.
if (rhizome_extract_privatekey_required(m, bsk))
return -1;
// TODO assert that new version > old version?
}
int crypt = rhizome_manifest_get_ll(m,"crypt");

View File

@ -24,6 +24,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "conf.h"
#include "str.h"
#include "rhizome.h"
#include "crypto.h"
#include <stdlib.h>
#include <ctype.h>
@ -38,9 +39,50 @@ unsigned char *rhizome_bundle_shared_secret(rhizome_manifest *m)
int rhizome_manifest_createid(rhizome_manifest *m)
{
m->haveSecret=NEW_BUNDLE_ID;
int r=crypto_sign_edwards25519sha512batch_keypair(m->cryptoSignPublic,m->cryptoSignSecret);
if (!r) return 0;
return WHY("Failed to create keypair for manifest ID.");
if (crypto_sign_edwards25519sha512batch_keypair(m->cryptoSignPublic,m->cryptoSignSecret))
return WHY("Failed to create keypair for manifest ID.");
rhizome_manifest_set(m, "id", alloca_tohex_bid(m->cryptoSignPublic));
return 0;
}
struct signing_key{
unsigned char Private[crypto_sign_edwards25519sha512batch_SECRETKEYBYTES];
unsigned char Public[crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES];
};
/* generate a keypair from a given seed string */
static int generate_keypair(const char *seed, struct signing_key *key)
{
unsigned char hash[crypto_hash_sha512_BYTES];
crypto_hash_sha512(hash, (unsigned char *)seed, strlen(seed));
// The first 256 bits of the hash will be used as the private key of the BID.
bcopy(hash, key->Private, sizeof(key->Private));
if (crypto_sign_compute_public_key(key->Private, key->Public))
return WHY("Could not generate public key");
return 0;
}
/* Generate a bundle id deterministically from the given seed.
* Then either fetch it from the database or initialise a new empty manifest */
int rhizome_get_bundle_from_seed(rhizome_manifest *m, const char *seed)
{
struct signing_key key;
if (generate_keypair(seed, &key))
return -1;
char *id = alloca_tohex_bid(key.Public);
int ret=rhizome_retrieve_manifest(id, m);
if (ret<0)
return -1;
m->haveSecret=(ret==0)?EXISTING_BUNDLE_ID:NEW_BUNDLE_ID;
bcopy(key.Public, m->cryptoSignPublic, sizeof(m->cryptoSignPublic));
bcopy(key.Private, m->cryptoSignSecret, sizeof(m->cryptoSignSecret));
if (ret>0)
rhizome_manifest_set(m, "id", alloca_tohex_bid(m->cryptoSignPublic));
return ret;
}
/* Given a Rhizome Secret (RS) and bundle ID (BID), XOR a bundle key 'bkin' (private or public) with
@ -357,20 +399,9 @@ int rhizome_verify_bundle_privatekey(rhizome_manifest *m,
const unsigned char *pkin)
{
IN();
unsigned char h[64];
unsigned char pk[32];
ge_p3 A;
int i;
crypto_hash_sha512(h,sk,32);
h[0] &= 248;
h[31] &= 63;
h[31] |= 64;
ge_scalarmult_base(&A,h);
ge_p3_tobytes(pk,&A);
crypto_sign_compute_public_key(sk,pk);
for (i = 0;i < 32;++i)
if (pkin[i] != pk[i]) {
if (m&&sk==m->cryptoSignSecret&&pkin==m->cryptoSignPublic)
@ -571,6 +602,10 @@ static void add_nonce(unsigned char *nonce, int64_t value){
*/
int rhizome_crypt_xor_block(unsigned char *buffer, int buffer_size, int64_t stream_offset,
const unsigned char *key, const unsigned char *nonce){
if (stream_offset<0)
return WHY("Invalid stream offset");
int64_t nonce_offset = stream_offset & ~(RHIZOME_CRYPT_PAGE_SIZE -1);
int offset=0;

View File

@ -1232,171 +1232,96 @@ int rhizome_update_file_priority(const char *fileid)
@author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found, int check_author)
int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found)
{
// TODO, add service, name, sender & recipient to manifests table so we can simply query them.
const char *service = rhizome_manifest_get(m, "service", NULL, 0);
const char *name = NULL;
const char *sender = NULL;
const char *recipient = NULL;
if (service == NULL) {
if (service == NULL)
return WHY("Manifest has no service");
} else if (strcasecmp(service, RHIZOME_SERVICE_FILE) == 0) {
name = rhizome_manifest_get(m, "name", NULL, 0);
if (!name) return WHY("Manifest has no name");
} else if (strcasecmp(service, RHIZOME_SERVICE_MESHMS) == 0) {
sender = rhizome_manifest_get(m, "sender", NULL, 0);
recipient = rhizome_manifest_get(m, "recipient", NULL, 0);
if (!sender) return WHY("Manifest has no sender");
if (!recipient) return WHY("Manifest has no recipient");
} else {
return WHYF("Unsupported service '%s'", service);
}
const char *name = rhizome_manifest_get(m, "name", NULL, 0);
const char *sender = rhizome_manifest_get(m, "sender", NULL, 0);
const char *recipient = rhizome_manifest_get(m, "recipient", NULL, 0);
char sqlcmd[1024];
strbuf b = strbuf_local(sqlcmd, sizeof sqlcmd);
strbuf_puts(b, "SELECT id, manifest, version, author FROM manifests WHERE ");
if (m->fileLength != 0) {
strbuf_puts(b, "filehash = ?");
} else
strbuf_puts(b, "filesize = 0");
strbuf_puts(b, "SELECT id, manifest, author FROM manifests WHERE filesize = ? AND service = ?");
if (m->fileLength != 0)
strbuf_puts(b, " AND filehash = ?");
if (name)
strbuf_puts(b, " AND name = ?");
if (sender)
strbuf_puts(b, " AND sender = ?");
if (recipient)
strbuf_puts(b, " AND recipient = ?");
if (strbuf_overrun(b))
return WHYF("SQL command too long: %s", strbuf_str(b));
int ret = 0;
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "%s", strbuf_str(b));
if (!statement)
return -1;
int field = 1;
char filehash[RHIZOME_FILEHASH_STRLEN + 1];
if (m->fileLength != 0) {
strncpy(filehash, m->fileHexHash, sizeof filehash);
str_toupper_inplace(filehash);
if (config.debug.rhizome)
DEBUGF("filehash=\"%s\"", filehash);
sqlite3_bind_text(statement, field++, filehash, -1, SQLITE_STATIC);
}
sqlite3_bind_int(statement, field++, m->fileLength);
sqlite3_bind_text(statement, field++, service, -1, SQLITE_STATIC);
if (m->fileLength != 0)
sqlite3_bind_text(statement, field++, m->fileHexHash, -1, SQLITE_STATIC);
if (name)
sqlite3_bind_text(statement, field++, name, -1, SQLITE_STATIC);
if (sender)
sqlite3_bind_text(statement, field++, sender, -1, SQLITE_STATIC);
if (recipient)
sqlite3_bind_text(statement, field++, recipient, -1, SQLITE_STATIC);
int rows = 0;
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
++rows;
if (config.debug.rhizome) DEBUGF("Row %d", rows);
if (!( sqlite3_column_count(statement) == 4
&& sqlite3_column_type(statement, 0) == SQLITE_TEXT
&& sqlite3_column_type(statement, 1) == SQLITE_BLOB
&& sqlite3_column_type(statement, 2) == SQLITE_INTEGER
&& ( sqlite3_column_type(statement, 3) == SQLITE_TEXT
|| sqlite3_column_type(statement, 3) == SQLITE_NULL
)
)) {
ret = WHY("Incorrect statement columns");
break;
}
const char *q_manifestid = (const char *) sqlite3_column_text(statement, 0);
size_t manifestidsize = sqlite3_column_bytes(statement, 0); // must call after sqlite3_column_text()
unsigned char manifest_id[RHIZOME_MANIFEST_ID_BYTES];
if ( manifestidsize != crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES * 2
|| fromhexstr(manifest_id, q_manifestid, RHIZOME_MANIFEST_ID_BYTES) == -1
) {
ret = WHYF("Malformed manifest.id from query: %s", q_manifestid);
break;
}
const char *manifestblob = (char *) sqlite3_column_blob(statement, 1);
size_t manifestblobsize = sqlite3_column_bytes(statement, 1); // must call after sqlite3_column_blob()
int64_t q_version = sqlite3_column_int64(statement, 2);
const char *q_author = (const char *) sqlite3_column_text(statement, 3);
if (config.debug.rhizome)
DEBUGF("Row %d", rows);
rhizome_manifest *blob_m = rhizome_new_manifest();
if (blob_m == NULL) {
ret = WHY("Out of manifests");
break;
}
const unsigned char *q_manifestid = sqlite3_column_text(statement, 0);
const char *manifestblob = (char *) sqlite3_column_blob(statement, 1);
size_t manifestblobsize = sqlite3_column_bytes(statement, 1); // must call after sqlite3_column_blob()
if (rhizome_read_manifest_file(blob_m, manifestblob, manifestblobsize) == -1) {
WARNF("MANIFESTS row id=%s has invalid manifest blob -- skipped", q_manifestid);
} else if (rhizome_manifest_verify(blob_m)) {
WARNF("MANIFESTS row id=%s fails verification -- skipped", q_manifestid);
} else {
const char *blob_service = rhizome_manifest_get(blob_m, "service", NULL, 0);
const char *blob_id = rhizome_manifest_get(blob_m, "id", NULL, 0);
int64_t blob_version = rhizome_manifest_get_ll(blob_m, "version");
const char *blob_filehash = rhizome_manifest_get(blob_m, "filehash", NULL, 0);
int64_t blob_filesize = rhizome_manifest_get_ll(blob_m, "filesize");
if (config.debug.rhizome)
DEBUGF("Consider manifest.service=%s manifest.id=%s manifest.version=%"PRId64, blob_service, q_manifestid, blob_version);
if (q_author) {
if (config.debug.rhizome)
strbuf_sprintf(b, " .author=%s", q_author);
stowSid(blob_m->author, 0, q_author);
}
/* Perform consistency checks, because we're paranoid. */
int inconsistent = 0;
if (blob_id && strcasecmp(blob_id, q_manifestid)) {
WARNF("MANIFESTS row id=%s has inconsistent blob with id=%s -- skipped", q_manifestid, blob_id);
++inconsistent;
}
if (blob_version != q_version) {
WARNF("MANIFESTS row id=%s has inconsistent blob: manifests.version=%"PRId64", blob.version=%"PRId64" -- skipped",
q_manifestid, q_version, blob_version);
++inconsistent;
}
if (blob_filesize != -1 && blob_filesize != m->fileLength) {
WARNF("MANIFESTS row id=%s has inconsistent blob: known file size %"PRId64", blob.filesize=%"PRId64" -- skipped",
q_manifestid, m->fileLength, blob_filesize);
++inconsistent;
}
if (m->fileLength != 0) {
if (!blob_filehash && strcasecmp(blob_filehash, m->fileHexHash)) {
WARNF("MANIFESTS row id=%s has inconsistent blob: manifests.filehash=%s, blob.filehash=%s -- skipped",
q_manifestid, m->fileHexHash, blob_filehash);
++inconsistent;
}
} else {
if (blob_filehash) {
WARNF("MANIFESTS row id=%s has inconsistent blob: blob.filehash should be absent -- skipped",
q_manifestid);
++inconsistent;
}
}
if (blob_service == NULL) {
WARNF("MANIFESTS row id=%s has blob with no 'service' -- skipped", q_manifestid);
++inconsistent;
}
if (!inconsistent) {
strbuf b = strbuf_alloca(1024);
if (strcasecmp(service, RHIZOME_SERVICE_FILE) == 0) {
const char *blob_name = rhizome_manifest_get(blob_m, "name", NULL, 0);
if (blob_name && !strcmp(blob_name, name)) {
if (config.debug.rhizome)
strbuf_sprintf(b, " name=\"%s\"", blob_name);
}else
++inconsistent;
} else if (strcasecmp(service, RHIZOME_SERVICE_MESHMS) == 0) {
const char *blob_sender = rhizome_manifest_get(blob_m, "sender", NULL, 0);
const char *blob_recipient = rhizome_manifest_get(blob_m, "recipient", NULL, 0);
if (blob_sender && !strcasecmp(blob_sender, sender) && blob_recipient && !strcasecmp(blob_recipient, recipient)) {
if (config.debug.rhizome)
strbuf_sprintf(b, " sender=%s recipient=%s", blob_sender, blob_recipient);
}else
++inconsistent;
}
}
if ((!inconsistent) && check_author) {
// check that we can re-author this manifest
if (rhizome_extract_privatekey(blob_m, NULL))
++inconsistent;
}
if (!inconsistent) {
*found = blob_m;
if (config.debug.rhizome)
DEBUGF("Found duplicate payload: service=%s%s version=%"PRIu64" hexhash=%s",
blob_service, strbuf_str(b), blob_m->version, blob_m->fileHexHash
);
ret = 1;
break;
}
goto next;
}
if (rhizome_manifest_verify(blob_m)) {
WARNF("MANIFESTS row id=%s fails verification -- skipped", q_manifestid);
goto next;
}
const char *q_author = (const char *) sqlite3_column_text(statement, 2);
if (q_author) {
if (config.debug.rhizome)
strbuf_sprintf(b, " .author=%s", q_author);
stowSid(blob_m->author, 0, q_author);
}
// check that we can re-author this manifest
if (rhizome_extract_privatekey(blob_m, NULL)){
goto next;
}
*found = blob_m;
if (config.debug.rhizome)
DEBUGF("Found duplicate payload, %s", q_manifestid);
ret = 1;
break;
next:
if (blob_m)
rhizome_manifest_free(blob_m);
}

View File

@ -396,7 +396,7 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m)
m->manifest_bytes, m->sig_count,(long long)m->fileLength);
dump("manifest", m->manifestdata, m->manifest_all_bytes);
}
return rhizome_bundle_import(m, m->ttl - 1 /* TTL */);
return rhizome_add_manifest(m, m->ttl - 1 /* TTL */);
}
static int schedule_fetch(struct rhizome_fetch_slot *slot)

View File

@ -163,16 +163,23 @@ static int write_get_lock(struct rhizome_write *write_state){
if (write_state->blob_fd>=0 || write_state->sql_blob)
return 0;
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") == -1)
return -1;
while(1){
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data",
write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob);
if (ret==SQLITE_OK)
if (ret==SQLITE_OK){
sqlite_retry_done(&retry, "sqlite3_blob_open");
return 0;
}
if (!sqlite_code_busy(ret))
return WHYF("sqlite3_blob_write() failed: %s",
sqlite3_errmsg(rhizome_db));
if (sqlite_retry(&retry, "sqlite3_blob_open")==0)
return -1;
return WHYF("Giving up");
}
}
@ -202,13 +209,15 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
while(1){
int ret=sqlite3_blob_write(write_state->sql_blob, buffer, data_size, file_offset);
if (ret==SQLITE_OK)
if (ret==SQLITE_OK){
sqlite_retry_done(&retry, "sqlite3_blob_write");
break;
}
if (!sqlite_code_busy(ret))
return WHYF("sqlite3_blob_write() failed: %s",
sqlite3_errmsg(rhizome_db));
if (sqlite_retry(&retry, "sqlite3_blob_write")==0)
return -1;
return WHY("Giving up");
}
}
@ -221,13 +230,22 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u
// close database locks
static int write_release_lock(struct rhizome_write *write_state){
int ret=0;
if (write_state->blob_fd>=0)
return 0;
if (write_state->sql_blob)
sqlite3_blob_close(write_state->sql_blob);
if (write_state->sql_blob){
ret = sqlite3_blob_close(write_state->sql_blob);
if (ret)
WHYF("sqlite3_blob_close() failed: %s",
sqlite3_errmsg(rhizome_db));
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1)
ret=-1;
}
write_state->sql_blob=NULL;
return 0;
return ret;
}
// Write data buffers in any order, the data will be cached and streamed into the database in file order.
@ -349,7 +367,8 @@ int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsi
last_offset = (*ptr)->offset + (*ptr)->data_size;
ptr = &((*ptr)->_next);
}
write_release_lock(write_state);
if (write_release_lock(write_state))
ret=-1;
return ret;
}
@ -383,7 +402,8 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename){
}
}
end:
write_release_lock(write);
if (write_release_lock(write))
ret=-1;
fclose(f);
return ret;
}
@ -448,7 +468,8 @@ int rhizome_finish_write(struct rhizome_write *write){
close(fd);
write->blob_fd=-1;
}
write_release_lock(write);
if (write_release_lock(write))
goto failure;
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&write->sha512_context, hash_out);
@ -588,16 +609,27 @@ static int rhizome_write_derive_key(rhizome_manifest *m, rhizome_bk_t *bsk, stru
return -1;
if (config.debug.rhizome)
DEBUGF("Encrypting payload contents");
DEBUGF("Encrypting payload contents for %s, %"PRId64, alloca_tohex_bid(m->cryptoSignPublic), m->version);
write->crypt=1;
write->tail = m->journalTail;
if (m->journalTail>0)
write->tail = m->journalTail;
bcopy(m->payloadKey, write->key, sizeof(write->key));
bcopy(m->payloadNonce, write->nonce, sizeof(write->nonce));
return 0;
}
int rhizome_write_open_manifest(struct rhizome_write *write, rhizome_manifest *m)
{
if (rhizome_open_write(write, NULL, m->fileLength, RHIZOME_PRIORITY_DEFAULT))
return -1;
if (rhizome_write_derive_key(m, NULL, write))
return -1;
return 0;
}
// import a file for a new bundle with an unknown file hash
// update the manifest with the details of the file
int rhizome_add_file(rhizome_manifest *m, const char *filepath)
@ -606,16 +638,10 @@ int rhizome_add_file(rhizome_manifest *m, const char *filepath)
struct rhizome_write write;
bzero(&write, sizeof(write));
if (rhizome_open_write(&write, NULL, m->fileLength, RHIZOME_PRIORITY_DEFAULT))
return -1;
if (rhizome_write_derive_key(m, NULL, &write))
return -1;
if (rhizome_write_file(&write, filepath)){
rhizome_fail_write(&write);
return -1;
}
if (rhizome_write_open_manifest(&write, m))
goto failure;
if (rhizome_write_file(&write, filepath))
goto failure;
if (rhizome_finish_write(&write))
goto failure;
@ -718,11 +744,12 @@ int rhizome_read(struct rhizome_read *read_state, unsigned char *buffer, int buf
if (read_state->hash){
if (buffer && bytes_read>0)
SHA512_Update(&read_state->sha512_context, buffer, bytes_read);
if (read_state->offset + bytes_read>=read_state->length){
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&read_state->sha512_context, hash_out);
if (strcasecmp(read_state->id, hash_out)){
WHYF("Expected hash=%s, got %s", read_state->id, hash_out);
RETURN(WHYF("Expected hash=%s, got %s", read_state->id, hash_out));
}
read_state->hash=0;
}
@ -740,6 +767,59 @@ int rhizome_read(struct rhizome_read *read_state, unsigned char *buffer, int buf
OUT();
}
/* Read len bytes from read->offset into data, using *buffer to cache any reads */
int rhizome_read_buffered(struct rhizome_read *read, struct rhizome_read_buffer *buffer, unsigned char *data, int len)
{
int bytes_copied=0;
while (len>0){
// make sure we only attempt to read data that actually exists
if (read->length !=-1 && read->offset + len > read->length)
len = read->length - read->offset;
// if we can supply either the beginning or end of the data from cache, do that first.
uint64_t ofs=read->offset - buffer->offset;
if (ofs>=0 && ofs<=buffer->len){
int size=len;
if (size > buffer->len - ofs)
size = buffer->len - ofs;
if (size>0){
// copy into the start of the data buffer
bcopy(buffer->data + ofs, data, size);
data+=size;
len-=size;
read->offset+=size;
bytes_copied+=size;
continue;
}
}
ofs = (read->offset+len) - buffer->offset;
if (ofs>0 && ofs<=buffer->len){
int size=len;
if (size > ofs)
size = ofs;
if (size>0){
// copy into the end of the data buffer
bcopy(buffer->data + ofs - size, data + len - size, size);
len-=size;
bytes_copied+=size;
continue;
}
}
// ok, so we need to read a new buffer to fulfill the request.
// remember the requested read offset so we can put it back
ofs = read->offset;
buffer->offset = read->offset = ofs & ~(RHIZOME_CRYPT_PAGE_SIZE -1);
buffer->len = rhizome_read(read, buffer->data, sizeof(buffer->data));
read->offset = ofs;
if (buffer->len<=0)
return buffer->len;
}
return bytes_copied;
}
int rhizome_read_close(struct rhizome_read *read)
{
if (read->blob_fd >=0){
@ -793,9 +873,10 @@ static int read_derive_key(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhizom
return WHY("Unable to decrypt bundle, valid key not found");
}
if (config.debug.rhizome)
DEBUGF("Decrypting file contents");
DEBUGF("Decrypting payload contents for %s, %"PRId64, alloca_tohex_bid(m->cryptoSignPublic), m->version);
read_state->tail = m->journalTail;
if (m->journalTail>0)
read_state->tail = m->journalTail;
bcopy(m->payloadKey, read_state->key, sizeof(read_state->key));
bcopy(m->payloadNonce, read_state->nonce, sizeof(read_state->nonce));
}
@ -901,7 +982,7 @@ int rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m,
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
// don't bother to decrypt the existing journal payload
ret = rhizome_open_read(&read_state, m->fileHexHash, 1);
ret = rhizome_open_read(&read_state, m->fileHexHash, advance_by>0?0:1);
if (ret)
goto failure;

View File

@ -670,6 +670,10 @@ int app_nonce_test(const struct cli_parsed *parsed, struct cli_context *context)
int app_rhizome_direct_sync(const struct cli_parsed *parsed, struct cli_context *context);
int app_monitor_cli(const struct cli_parsed *parsed, struct cli_context *context);
int app_vomp_console(const struct cli_parsed *parsed, struct cli_context *context);
int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_context *context);
int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context *context);
int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context *context);
int app_meshms_mark_read(const struct cli_parsed *parsed, struct cli_context *context);
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
@ -802,6 +806,10 @@ uint64_t read_uint64(unsigned char *o);
uint32_t read_uint32(unsigned char *o);
uint16_t read_uint16(unsigned char *o);
int pack_uint(unsigned char *buffer, uint64_t v);
int measure_packed_uint(uint64_t v);
int unpack_uint(unsigned char *buffer, int buff_size, uint64_t *v);
int slip_encode(int format,
unsigned char *src, int src_bytes, unsigned char *dst, int dst_len);
int slip_decode(struct slip_decode_state *state);

View File

@ -17,6 +17,7 @@ SERVAL_SOURCES = \
$(SERVAL_BASE)log.c \
$(SERVAL_BASE)lsif.c \
$(SERVAL_BASE)main.c \
$(SERVAL_BASE)meshms.c \
$(SERVAL_BASE)mdp_client.c \
$(SERVAL_BASE)os.c \
$(SERVAL_BASE)mem.c \

View File

@ -29,6 +29,7 @@ includeTests dnahelper
includeTests dnaprotocol
includeTests rhizomeops
includeTests rhizomeprotocol
includeTests meshms
includeTests directory_service
runTests "$@"

174
tests/meshms Executable file
View File

@ -0,0 +1,174 @@
#!/bin/bash
# Tests for MeshMS Messaging
#
# Copyright 2012 Serval Project, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
source "${0%/*}/../testframework.sh"
source "${0%/*}/../testdefs.sh"
source "${0%/*}/../testdefs_rhizome.sh"
teardown() {
stop_all_servald_servers
kill_all_servald_processes
assert_no_servald_processes
}
doc_MessageDelivery="Send messages, ack and read them in a 2 party conversation"
setup_MessageDelivery() {
setup_servald
set_instance +A
create_identities 2
executeOk_servald config \
set debug.meshms on \
set debug.rhizome on \
set log.console.level debug
}
test_MessageDelivery() {
# 1. empty list
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutIs --stdout --line=1 -e '4\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:type:message\n'
assertStdoutLineCount '==' 2
# 2. create a manifest with a single message and list it back
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
assertStdoutLineCount '==' 3
# 3. append a second message and list them both
executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you"
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 ":>:How are you\$"
assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
assertStdoutLineCount '==' 4
# 4. list the messages from the receivers point of view (which ACKs them)
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:<:Hi\$"
assertStdoutLineCount '==' 4
# 5. mark the first message as read
executeOk_servald meshms read messages $SIDA2 $SIDA1 5
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^2:5:<:Hi\$"
assertStdoutLineCount '==' 5
# 6. mark all messages as read
executeOk_servald meshms read messages $SIDA2
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^1:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:<:Hi\$"
assertStdoutLineCount '==' 5
# 7. list messages from the senders point of view after they have been delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 "^0:3:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^1:19:>:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:>:Hi\$"
assertStdoutLineCount '==' 5
}
doc_MessageThreading="Messages sent at the same time, thread differently"
setup_MessageThreading() {
setup_servald
foreach_instance +A +B create_single_identity
set_instance +A
executeOk_servald meshms send message $SIDA $SIDB "Hello can you hear me"
executeOk_servald meshms send message $SIDA $SIDB "Still waiting"
set_instance +B
executeOk_servald meshms send message $SIDB $SIDA "Help Im trapped in a test case factory"
executeOk_servald meshms send message $SIDB $SIDA "Never mind"
start_servald_instances +A +B
}
has_unread_messages() {
executeOk_servald meshms list conversations $1
if ! grep ":unread:" $_tfw_tmp/stdout; then
return 1
fi
}
messages_delivered() {
executeOk_servald meshms list messages $1 $2
if ! grep ":ACK:" $_tfw_tmp/stdout; then
return 1
fi
}
test_MessageThreading() {
set_instance +B
wait_until has_unread_messages $SIDB
executeOk_servald meshms list messages $SIDB $SIDA
assertStdoutGrep --stdout --matches=1 "^0:40:<:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^1:24:<:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^2:54:>:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^3:41:>:Help Im trapped in a test case factory\$"
assertStdoutLineCount '==' 6
set_instance +A
wait_until has_unread_messages $SIDA
wait_until messages_delivered $SIDA $SIDB
executeOk_servald meshms list messages $SIDA $SIDB
assertStdoutGrep --stdout --matches=1 "^0:54:<:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^1:41:<:Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^2:57:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^3:40:>:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^4:24:>:Hello can you hear me\$"
assertStdoutLineCount '==' 7
}
doc_listConversations="List multiple conversations, with different numbers of messages"
setup_listConversations() {
setup_servald
set_instance +A
create_identities 5
executeOk_servald config \
set debug.rhizome on \
set debug.meshms on \
set log.console.level debug
# create 3 threads, with all permutations of incoming and outgoing messages
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message1"
executeOk_servald meshms send message $SIDA3 $SIDA1 "Message2"
executeOk_servald meshms send message $SIDA1 $SIDA4 "Message3"
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message4"
}
test_listConversations() {
executeOk_servald meshms list conversations $SIDA1
tfw_cat --stdout
assertStdoutIs --stderr --line=1 -e '5\n'
assertStdoutIs --stderr --line=2 -e '_id:recipient:read:last_message:read_offset\n'
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$"
assertStdoutLineCount '==' 5
executeOk_servald meshms list conversations $SIDA1 1
tfw_cat --stderr
assertStdoutLineCount '==' 4
executeOk_servald meshms list conversations $SIDA1 1 1
tfw_cat --stderr
assertStdoutLineCount '==' 3
# mark all incoming messages as read
executeOk_servald meshms read messages $SIDA1
tfw_cat --stderr
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
assertStdoutLineCount '==' 5
}
runTests "$@"

View File

@ -207,4 +207,48 @@ reportfiles() {
sort -t- -k2,2 -k3,3n extrafiles
}
doc_stressmeshms="Stress test messaging with 4 instances"
setup_stressmeshms() {
setup_servald
foreach_instance +A +B +C +D create_single_identity
start_servald_instances +A +B +C +D
}
message_arrived() {
executeOk_servald meshms list messages $1 $2
if ! grep ":<:$3\$" $_tfw_tmp/stdout; then
return 1
fi
}
test_stressmeshms() {
for i in {1..10}
do
for instance in A B C D
do
local sidvar="SID$instance"
set_instance +$instance
assert_servald_server_status running
executeOk_servald meshms list conversations ${!sidvar}
for dest in A B C D
do
[ $instance = $dest ] && continue
local destsid="SID$dest"
executeOk_servald meshms send message ${!sidvar} ${!destsid} "$instance to $dest, Message $i"
done
done
done
for instance in A B C D
do
local sidvar="SID$instance"
set_instance +$instance
for dest in A B C D
do
[ $instance = $dest ] && continue
local destsid="SID$dest"
wait_until message_arrived ${!sidvar} ${!destsid} "$dest to $instance, Message 10"
executeOk_servald meshms list messages ${!sidvar} ${!destsid}
tfw_cat --stdout
done
done
}
runTests "$@"