Refactor meshms ply handling into separate source file

This commit is contained in:
Jeremy Lakeman 2016-06-28 13:09:02 +09:30
parent a71c7ce698
commit dbded493c1
12 changed files with 424 additions and 431 deletions

View File

@ -13,6 +13,7 @@ HDRS= fifo.h \
httpd.h \
instance.h \
meshms.h \
message_ply.h \
serval_types.h \
serval.h \
server.h \

519
meshms.c
View File

@ -30,10 +30,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "keyring.h"
#include "dataformats.h"
#include "commandline.h"
#define MESHMS_BLOCK_TYPE_ACK 0x01
#define MESHMS_BLOCK_TYPE_MESSAGE 0x02 // NUL-terminated UTF8 string
#define MESHMS_BLOCK_TYPE_TIME 0x03 // local timestamp record
#include "overlay_buffer.h"
static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_sid, const uint64_t offset);
@ -60,6 +57,13 @@ static enum meshms_status get_my_conversation_bundle(const keyring_identity *id,
// always consider the content encrypted, we don't need to rely on the manifest itself.
rhizome_manifest_set_crypt(m, PAYLOAD_ENCRYPTED);
assert(m->haveSecret);
// The 'meshms' automated test depends on this message; do not alter.
DEBUGF(meshms, "MESHMS CONVERSATION BUNDLE bid=%s secret=%s",
alloca_tohex_rhizome_bid_t(m->cryptoSignPublic),
alloca_tohex(m->cryptoSignSecret, RHIZOME_BUNDLE_KEY_BYTES)
);
if (m->haveSecret == NEW_BUNDLE_ID) {
rhizome_manifest_set_service(m, RHIZOME_SERVICE_FILE);
rhizome_manifest_set_name(m, "");
@ -90,11 +94,6 @@ static enum meshms_status get_my_conversation_bundle(const keyring_identity *id,
rhizome_bundle_result_free(&result);
return MESHMS_STATUS_SID_LOCKED;
}
// The 'meshms' automated test depends on this message; do not alter.
DEBUGF(meshms, "MESHMS CONVERSATION BUNDLE bid=%s secret=%s",
alloca_tohex_rhizome_bid_t(m->cryptoSignPublic),
alloca_tohex(m->cryptoSignSecret, RHIZOME_BUNDLE_KEY_BYTES)
);
rhizome_bundle_result_free(&result);
} else {
if (strcmp(m->service, RHIZOME_SERVICE_FILE) != 0) {
@ -175,7 +174,7 @@ static enum meshms_status get_database_conversations(const keyring_identity *id,
struct meshms_conversations *ptr = add_conv(conv, &their_sid);
if (!ptr)
break;
struct meshms_ply *p;
struct message_ply *p;
if (them==sender){
p=&ptr->their_ply;
}else{
@ -207,252 +206,6 @@ static enum meshms_status find_or_create_conv(keyring_identity *id, const sid_t
return status;
}
static size_t append_footer(unsigned char *buffer, char type, size_t message_len)
{
assert(message_len <= MESHMS_MESSAGE_MAX_LEN);
message_len = (message_len << 4) | (type&0xF);
write_uint16(buffer, message_len);
return 2;
}
// append a timestamp as a uint32_t with 1s precision
static size_t append_timestamp(uint8_t *buffer)
{
if (!config.rhizome.reliable_clock)
return 0;
write_uint32(buffer, gettime());
size_t ofs=4;
return ofs+append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_TIME, ofs);
}
static enum meshms_status ply_read_open(struct meshms_ply_read *ply, const rhizome_bid_t *bid, rhizome_manifest *m)
{
DEBUGF(meshms, "Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
switch (rhizome_retrieve_manifest(bid, m)) {
case RHIZOME_BUNDLE_STATUS_SAME:
break;
case RHIZOME_BUNDLE_STATUS_NEW: // bundle not found
return MESHMS_STATUS_PROTOCOL_FAULT;
case RHIZOME_BUNDLE_STATUS_BUSY:
// TODO
default:
return MESHMS_STATUS_ERROR;
}
enum rhizome_payload_status pstatus = rhizome_open_decrypt_read(m, &ply->read);
DEBUGF(meshms, "pstatus=%d", pstatus);
if (pstatus == RHIZOME_PAYLOAD_STATUS_NEW) {
WARNF("Payload was not found for manifest %s, %"PRIu64, alloca_tohex_rhizome_bid_t(m->cryptoSignPublic), m->version);
return MESHMS_STATUS_PROTOCOL_FAULT;
}
if (pstatus == RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL)
return MESHMS_STATUS_SID_LOCKED;
if (pstatus != RHIZOME_PAYLOAD_STATUS_STORED && pstatus != RHIZOME_PAYLOAD_STATUS_EMPTY)
return MESHMS_STATUS_ERROR;
assert(m->filesize != RHIZOME_SIZE_UNSET);
ply->read.offset = ply->read.length = m->filesize;
return MESHMS_STATUS_OK;
}
static void ply_read_close(struct meshms_ply_read *ply)
{
if (ply->record){
free(ply->record);
ply->record=NULL;
}
ply->record_size=0;
ply->buff.len=0;
rhizome_read_close(&ply->read);
}
// read the next record from the ply (backwards)
// returns MESHMS_STATUS_UPDATED if the read advances to a new record, MESHMS_STATUS_OK if at the
// end of records
static enum meshms_status ply_read_prev(struct meshms_ply_read *ply)
{
ply->record_end_offset = ply->read.offset;
unsigned char footer[2];
if (ply->read.offset <= sizeof footer) {
DEBUG(meshms, "EOF");
return MESHMS_STATUS_OK;
}
ply->read.offset -= sizeof footer;
ssize_t read = rhizome_read_buffered(&ply->read, &ply->buff, footer, sizeof footer);
if (read == -1) {
WHYF("rhizome_read_buffered() failed");
return MESHMS_STATUS_ERROR;
}
if ((size_t) read != sizeof footer) {
WHYF("Expected %zu bytes read, got %zu", (size_t) sizeof footer, (size_t) read);
return MESHMS_STATUS_PROTOCOL_FAULT;
}
// (rhizome_read automatically advances the offset by the number of bytes read)
ply->record_length=read_uint16(footer);
ply->type = ply->record_length & 0xF;
ply->record_length = ply->record_length>>4;
DEBUGF(meshms, "Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset);
// need to allow for advancing the tail and cutting a message in half.
if (ply->record_length + sizeof footer > ply->read.offset){
DEBUGF(meshms, "EOF");
return MESHMS_STATUS_OK;
}
ply->read.offset -= ply->record_length + sizeof(footer);
uint64_t record_start = ply->read.offset;
if (ply->record_size < ply->record_length){
ply->record_size = ply->record_length;
unsigned char *b = erealloc(ply->record, ply->record_size);
if (!b)
return MESHMS_STATUS_ERROR;
ply->record = b;
}
read = rhizome_read_buffered(&ply->read, &ply->buff, ply->record, ply->record_length);
if (read == -1) {
return WHYF("rhizome_read_buffered() failed");
return MESHMS_STATUS_ERROR;
}
if ((size_t) read != ply->record_length) {
WHYF("Expected %u bytes read, got %zu", ply->record_length, (size_t) read);
return MESHMS_STATUS_PROTOCOL_FAULT;
}
ply->read.offset = record_start;
return MESHMS_STATUS_UPDATED;
}
// keep reading past messages until you find this type.
static enum meshms_status ply_find_prev(struct meshms_ply_read *ply, char type)
{
enum meshms_status status;
while ((status = ply_read_prev(ply)) == MESHMS_STATUS_UPDATED && ply->type != type)
;
return status;
}
static enum meshms_status append_meshms_buffer(const keyring_identity *id, const sid_t *recipient, struct meshms_ply *ply, unsigned char *buffer, int len)
{
enum meshms_status status = MESHMS_STATUS_ERROR;
rhizome_manifest *mout = NULL;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
if (ply->found){
switch (rhizome_retrieve_manifest(&ply->bundle_id, m)) {
case RHIZOME_BUNDLE_STATUS_SAME:
break;
case RHIZOME_BUNDLE_STATUS_NEW: // bundle not found
status = MESHMS_STATUS_PROTOCOL_FAULT;
goto end;
case RHIZOME_BUNDLE_STATUS_BUSY:
// TODO
default:
status = MESHMS_STATUS_ERROR;
goto end;
}
rhizome_authenticate_author(m);
if (!m->haveSecret || m->authorship != AUTHOR_AUTHENTIC) {
status = MESHMS_STATUS_PROTOCOL_FAULT;
goto end;
}
assert(m->author_identity == id);
} else {
DEBUGF(meshms, "Creating ply for sender=%s recipient=%s",
alloca_tohex_sid_t(*id->box_pk),
alloca_tohex_sid_t(*recipient)
);
rhizome_manifest_set_service(m, RHIZOME_SERVICE_MESHMS2);
rhizome_manifest_set_sender(m, id->box_pk);
rhizome_manifest_set_recipient(m, recipient);
rhizome_manifest_set_filesize(m, 0);
rhizome_manifest_set_tail(m, 0);
rhizome_manifest_set_author_identity(m, id);
struct rhizome_bundle_result result = rhizome_fill_manifest(m, NULL);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_NEW:
case RHIZOME_BUNDLE_STATUS_SAME:
case RHIZOME_BUNDLE_STATUS_DUPLICATE:
status = MESHMS_STATUS_OK;
break;
case RHIZOME_BUNDLE_STATUS_ERROR:
case RHIZOME_BUNDLE_STATUS_INVALID:
case RHIZOME_BUNDLE_STATUS_INCONSISTENT:
WHYF("Error creating ply manifest: %s", alloca_rhizome_bundle_result(result));
status = MESHMS_STATUS_ERROR;
break;
case RHIZOME_BUNDLE_STATUS_BUSY:
// TODO
case RHIZOME_BUNDLE_STATUS_OLD:
case RHIZOME_BUNDLE_STATUS_FAKE:
case RHIZOME_BUNDLE_STATUS_NO_ROOM:
case RHIZOME_BUNDLE_STATUS_MANIFEST_TOO_BIG:
WARNF("Cannot create ply manifest: %s", alloca_rhizome_bundle_result(result));
status = MESHMS_STATUS_PROTOCOL_FAULT;
break;
case RHIZOME_BUNDLE_STATUS_READONLY:
INFOF("Cannot create ply manifest: %s", alloca_rhizome_bundle_result(result));
status = MESHMS_STATUS_SID_LOCKED;
break;
}
rhizome_bundle_result_free(&result);
if (status!=MESHMS_STATUS_OK)
goto end;
assert(m->haveSecret);
assert(m->payloadEncryption == PAYLOAD_ENCRYPTED);
ply->bundle_id = m->cryptoSignPublic;
ply->found = 1;
}
assert(m->haveSecret);
assert(m->authorship == AUTHOR_AUTHENTIC);
enum rhizome_payload_status pstatus = rhizome_append_journal_buffer(m, 0, buffer, len);
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW) {
status = MESHMS_STATUS_ERROR;
goto end;
}
struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 1);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_ERROR:
// error has already been logged
status = MESHMS_STATUS_ERROR;
break;
case RHIZOME_BUNDLE_STATUS_NEW:
status = MESHMS_STATUS_UPDATED;
break;
case RHIZOME_BUNDLE_STATUS_SAME:
case RHIZOME_BUNDLE_STATUS_DUPLICATE:
case RHIZOME_BUNDLE_STATUS_OLD:
status = MESHMS_STATUS_PROTOCOL_FAULT;
WARNF("MeshMS ply manifest (version=%"PRIu64") gazumped by Rhizome store (version=%"PRIu64")",
m->version, mout->version);
break;
case RHIZOME_BUNDLE_STATUS_NO_ROOM:
status = MESHMS_STATUS_PROTOCOL_FAULT;
WARNF("MeshMS ply manifest evicted from store");
break;
case RHIZOME_BUNDLE_STATUS_INCONSISTENT:
status = MESHMS_STATUS_PROTOCOL_FAULT;
WARNF("MeshMS ply manifest not consistent with payload");
break;
case RHIZOME_BUNDLE_STATUS_FAKE:
case RHIZOME_BUNDLE_STATUS_READONLY:
status = MESHMS_STATUS_PROTOCOL_FAULT;
WARNF("MeshMS ply manifest is not signed");
break;
case RHIZOME_BUNDLE_STATUS_INVALID:
case RHIZOME_BUNDLE_STATUS_MANIFEST_TOO_BIG:
status = MESHMS_STATUS_PROTOCOL_FAULT;
WARNF("MeshMS ply manifest is invalid");
break;
case RHIZOME_BUNDLE_STATUS_BUSY:
status = MESHMS_STATUS_PROTOCOL_FAULT;
WARNF("MeshMS ply manifest not stored due to database locking");
break;
}
rhizome_bundle_result_free(&result);
end:
if (mout && mout!=m)
rhizome_manifest_free(mout);
if (m)
rhizome_manifest_free(m);
return status;
}
// update if any conversations are unread or need to be acked.
// return MESHMS_STATUS_UPDATED if the conversation index needs to be saved.
static enum meshms_status update_conversation(const keyring_identity *id, struct meshms_conversations *conv)
@ -462,82 +215,72 @@ static enum meshms_status update_conversation(const keyring_identity *id, struct
// Nothing to be done if they have never sent us anything
if (!conv->their_ply.found)
return MESHMS_STATUS_OK;
rhizome_manifest *m_ours = NULL;
rhizome_manifest *m_theirs = rhizome_new_manifest();
if (!m_theirs)
return MESHMS_STATUS_ERROR;
struct meshms_ply_read ply;
bzero(&ply, sizeof(ply));
enum meshms_status status = MESHMS_STATUS_ERROR;
DEBUG(meshms, "Locating their last message");
if (meshms_failed(status = ply_read_open(&ply, &conv->their_ply.bundle_id, m_theirs)))
goto end;
if (meshms_failed(status = ply_find_prev(&ply, MESHMS_BLOCK_TYPE_MESSAGE)))
goto end;
if (conv->their_last_message == ply.record_end_offset){
// nothing has changed since last time
status = MESHMS_STATUS_OK;
goto end;
uint64_t last_offset=0;
{
struct message_ply_read ply;
bzero(&ply, sizeof ply);
if (message_ply_read_open(&ply, &conv->their_ply.bundle_id)!=0)
return MESHMS_STATUS_ERROR;
DEBUG(meshms, "Locating their last message");
if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_MESSAGE)==0){
last_offset = ply.record_end_offset;
DEBUGF(meshms, "Found last message @%"PRId64, last_offset);
}
message_ply_read_close(&ply);
}
conv->their_last_message = ply.record_end_offset;
DEBUGF(meshms, "Found last message @%"PRId64, conv->their_last_message);
ply_read_close(&ply);
// Perhaps only an ack has been added
if (last_offset == 0 || conv->their_last_message == last_offset)
return MESHMS_STATUS_OK;
// find our previous ack
uint64_t previous_ack = 0;
if (conv->my_ply.found){
struct message_ply_read ply;
bzero(&ply, sizeof ply);
if (message_ply_read_open(&ply, &conv->my_ply.bundle_id)!=0)
return MESHMS_STATUS_ERROR;
DEBUG(meshms, "Locating our previous ack");
m_ours = rhizome_new_manifest();
if (!m_ours) {
status = MESHMS_STATUS_ERROR;
goto end;
}
if (meshms_failed(status = ply_read_open(&ply, &conv->my_ply.bundle_id, m_ours)))
goto end;
if (meshms_failed(status = ply_find_prev(&ply, MESHMS_BLOCK_TYPE_ACK)))
goto end;
if (status == MESHMS_STATUS_UPDATED) {
if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_ACK)==0){
if (unpack_uint(ply.record, ply.record_length, &previous_ack) == -1)
previous_ack=0;
status = MESHMS_STATUS_OK;
else
DEBUGF(meshms, "Previous ack is %"PRId64, previous_ack);
}
DEBUGF(meshms, "Previous ack is %"PRId64, previous_ack);
ply_read_close(&ply);
message_ply_read_close(&ply);
}else{
DEBUGF(meshms, "No outgoing ply");
status = MESHMS_STATUS_PROTOCOL_FAULT;
}
if (previous_ack >= conv->their_last_message){
// their last message has already been acked
status = MESHMS_STATUS_UPDATED;
goto end;
}
// append an ack for their message
DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, previous_ack, conv->their_last_message);
unsigned char buffer[30];
int ofs=0;
ofs+=pack_uint(&buffer[ofs], conv->their_last_message);
if (previous_ack)
ofs+=pack_uint(&buffer[ofs], conv->their_last_message - previous_ack);
ofs+=append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_ACK, ofs);
ofs+=append_timestamp(buffer+ofs);
status = append_meshms_buffer(id, &conv->them, &conv->my_ply, buffer, ofs);
DEBUGF(meshms, "status=%d", status);
end:
ply_read_close(&ply);
if (m_ours)
rhizome_manifest_free(m_ours);
if (m_theirs)
rhizome_manifest_free(m_theirs);
// if it's all good, remember the size of their ply at the time we examined it.
if (!meshms_failed(status))
// Note that we may have already acked this message, but failed to record it in our conversation list bundle
enum meshms_status status = MESHMS_STATUS_UPDATED;
if (previous_ack < last_offset){
// append an ack for their message
DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, previous_ack, last_offset);
unsigned char buffer[30];
struct overlay_buffer *b = ob_static(buffer, sizeof buffer);
message_ply_append_ack(b, last_offset, previous_ack);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, &conv->them, &conv->my_ply, b)!=0)
status = MESHMS_STATUS_ERROR;
ob_free(b);
}
if (!meshms_failed(status)){
// if it's all good, remember the size of their ply at the time we examined it.
conv->their_last_message = last_offset;
conv->their_size = conv->their_ply.size;
}
return status;
}
@ -554,12 +297,13 @@ static enum meshms_status update_conversations(const keyring_identity *id, struc
return status;
if (status == MESHMS_STATUS_UPDATED){
rstatus = MESHMS_STATUS_UPDATED;
DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them));
// bump to head of list
*ptr = n->_next;
n->_next = *conv;
*conv = n;
continue;
if (n != *conv){
DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them));
*ptr = n->_next;
n->_next = *conv;
*conv = n;
continue;
}
}
}
ptr = &(*ptr)->_next;
@ -864,24 +608,21 @@ enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *
iter->timestamp = 0;
// If I have never sent a message (or acked any of theirs), there are no messages in the thread.
if (iter->_conv->my_ply.found) {
if ((iter->_my_manifest = rhizome_new_manifest()) == NULL)
int r = message_ply_read_open(&iter->_my_reader, &iter->_conv->my_ply.bundle_id);
if (r != 0)
goto error;
if (meshms_failed(status = ply_read_open(&iter->_my_reader, &iter->_conv->my_ply.bundle_id, iter->_my_manifest)))
goto fail;
if (iter->_conv->their_ply.found) {
if ((iter->_their_manifest = rhizome_new_manifest()) == NULL)
r = message_ply_read_open(&iter->_their_reader, &iter->_conv->their_ply.bundle_id);
if (r != 0)
goto error;
if (meshms_failed(status = ply_read_open(&iter->_their_reader, &iter->_conv->their_ply.bundle_id, iter->_their_manifest)))
goto fail;
// Find their latest ACK so we know which of my messages have been delivered.
if (meshms_failed(status = ply_find_prev(&iter->_their_reader, MESHMS_BLOCK_TYPE_ACK)))
goto fail;
if (status == MESHMS_STATUS_UPDATED) {
if (message_ply_find_prev(&iter->_their_reader, MESSAGE_BLOCK_TYPE_ACK)==0){
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->latest_ack_my_offset) == -1)
iter->latest_ack_my_offset = 0;
else
else{
iter->latest_ack_offset = iter->_their_reader.record_end_offset;
DEBUGF(meshms, "Found their last ack @%"PRId64, iter->latest_ack_my_offset);
DEBUGF(meshms, "Found their last ack @%"PRId64, iter->latest_ack_offset);
}
}
// Re-seek to end of their ply.
iter->_their_reader.read.offset = iter->_their_reader.read.length;
@ -906,16 +647,8 @@ int meshms_message_iterator_is_open(const struct meshms_message_iterator *iter)
void meshms_message_iterator_close(struct meshms_message_iterator *iter)
{
DEBUGF(meshms, "iter=%p", iter);
if (iter->_my_manifest) {
ply_read_close(&iter->_my_reader);
rhizome_manifest_free(iter->_my_manifest);
iter->_my_manifest = NULL;
}
if (iter->_their_manifest){
ply_read_close(&iter->_their_reader);
rhizome_manifest_free(iter->_their_manifest);
iter->_their_manifest = NULL;
}
message_ply_read_close(&iter->_my_reader);
message_ply_read_close(&iter->_their_reader);
meshms_free_conversations(iter->_conv);
iter->_conv = NULL;
}
@ -923,53 +656,48 @@ void meshms_message_iterator_close(struct meshms_message_iterator *iter)
enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *iter)
{
assert(iter->_conv != NULL);
if (iter->_conv->my_ply.found) {
assert(iter->_my_manifest != NULL);
if (iter->_conv->their_ply.found)
assert(iter->_their_manifest != NULL);
}
enum meshms_status status = MESHMS_STATUS_UPDATED;
while (status == MESHMS_STATUS_UPDATED) {
if (iter->_in_ack) {
DEBUGF(meshms, "Reading other log from %"PRId64", to %"PRId64, iter->_their_reader.read.offset, iter->_end_range);
if (meshms_failed(status = ply_read_prev(&iter->_their_reader)))
break;
iter->which_ply = THEIR_PLY;
if (status == MESHMS_STATUS_UPDATED && iter->_their_reader.read.offset >= iter->_end_range) {
switch (iter->_their_reader.type) {
case MESHMS_BLOCK_TYPE_ACK:
iter->type = ACK_RECEIVED;
iter->offset = iter->_their_reader.record_end_offset;
iter->text = NULL;
iter->text_length = 0;
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1)
iter->ack_offset = 0;
iter->read = 0;
return status;
case MESHMS_BLOCK_TYPE_MESSAGE:
iter->type = MESSAGE_RECEIVED;
iter->offset = iter->_their_reader.record_end_offset;
iter->text = (const char *)iter->_their_reader.record;
iter->text_length = iter->_their_reader.record_length;
if ( iter->_their_reader.record_length != 0
&& iter->_their_reader.record[iter->_their_reader.record_length - 1] == '\0'
) {
iter->read = iter->_their_reader.record_end_offset <= iter->_conv->read_offset;
// eof or other read errors, skip over messages (the tail is allowed to advance)
if (message_ply_read_prev(&iter->_their_reader)==0){
iter->which_ply = THEIR_PLY;
if (iter->_their_reader.read.offset >= iter->_end_range) {
switch (iter->_their_reader.type) {
case MESSAGE_BLOCK_TYPE_ACK:
iter->type = ACK_RECEIVED;
iter->offset = iter->_their_reader.record_end_offset;
iter->text = NULL;
iter->text_length = 0;
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1)
iter->ack_offset = 0;
iter->read = 0;
return status;
}
WARN("Malformed MeshMS2 ply journal, missing NUL terminator");
return MESHMS_STATUS_PROTOCOL_FAULT;
case MESSAGE_BLOCK_TYPE_MESSAGE:
iter->type = MESSAGE_RECEIVED;
iter->offset = iter->_their_reader.record_end_offset;
iter->text = (const char *)iter->_their_reader.record;
iter->text_length = iter->_their_reader.record_length;
if ( iter->_their_reader.record_length != 0
&& iter->_their_reader.record[iter->_their_reader.record_length - 1] == '\0'
) {
iter->read = iter->_their_reader.record_end_offset <= iter->_conv->read_offset;
return status;
}
WARN("Malformed MeshMS2 ply journal, missing NUL terminator");
return MESHMS_STATUS_PROTOCOL_FAULT;
}
continue;
}
continue;
}
iter->_in_ack = 0;
status = MESHMS_STATUS_UPDATED;
}
else if ((status = ply_read_prev(&iter->_my_reader)) == MESHMS_STATUS_UPDATED) {
}else if (message_ply_read_prev(&iter->_my_reader) == 0) {
DEBUGF(meshms, "Offset %"PRId64", type %d, read_offset %"PRId64, iter->_my_reader.read.offset, iter->_my_reader.type, iter->read_offset);
iter->which_ply = MY_PLY;
switch (iter->_my_reader.type) {
case MESHMS_BLOCK_TYPE_TIME:
case MESSAGE_BLOCK_TYPE_TIME:
if (iter->_my_reader.record_length<4){
WARN("Malformed MeshMS2 ply journal, expected 4 byte timestamp");
return MESHMS_STATUS_PROTOCOL_FAULT;
@ -977,7 +705,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->timestamp = read_uint32(iter->_my_reader.record);
DEBUGF(meshms, "Parsed timestamp %ds old", gettime() - iter->timestamp);
break;
case MESHMS_BLOCK_TYPE_ACK:
case MESSAGE_BLOCK_TYPE_ACK:
// Read the received messages up to the ack'ed offset
if (iter->_conv->their_ply.found) {
int ofs = unpack_uint(iter->_my_reader.record, iter->_my_reader.record_length, (uint64_t*)&iter->_their_reader.read.offset);
@ -998,7 +726,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->_in_ack = 1;
}
break;
case MESHMS_BLOCK_TYPE_MESSAGE:
case MESSAGE_BLOCK_TYPE_MESSAGE:
iter->type = MESSAGE_SENT;
iter->offset = iter->_my_reader.record_end_offset;
iter->text = (const char *)iter->_my_reader.record;
@ -1006,6 +734,8 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->delivered = iter->latest_ack_my_offset && iter->_my_reader.record_end_offset <= iter->latest_ack_my_offset;
return status;
}
}else{
status = MESHMS_STATUS_OK;
}
}
return status;
@ -1014,13 +744,12 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len)
{
assert(message_len != 0);
if (message_len > MESHMS_MESSAGE_MAX_LEN) {
if (message_len > MESSAGE_PLY_MAX_LEN) {
WHY("message too long");
return MESHMS_STATUS_ERROR;
}
struct meshms_conversations *conv = NULL;
enum meshms_status status = MESHMS_STATUS_ERROR;
unsigned char buffer[message_len + 4 + 6];
keyring_identity *id = keyring_find_identity(keyring, sender);
if (!id)
@ -1030,15 +759,18 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
goto end;
assert(conv != NULL);
// construct a message payload
// TODO, new format here for compressed text?
strncpy((char*)buffer, message, message_len);
// ensure message is NUL terminated
if (message[message_len - 1] != '\0')
buffer[message_len++] = '\0';
message_len += append_footer(buffer + message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len);
message_len += append_timestamp(buffer + message_len);
status = append_meshms_buffer(id, recipient, &conv->my_ply, buffer, message_len);
struct overlay_buffer *b = ob_new();
message_ply_append_message(b, message, message_len);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, recipient, &conv->my_ply, b)==0)
status = MESHMS_STATUS_UPDATED;
ob_free(b);
end:
meshms_free_conversations(conv);
@ -1288,8 +1020,7 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_
{
unsigned ret=0;
while (conv){
int cmp = their_sid ? cmp_sid_t(&conv->them, their_sid) : 0;
if (!their_sid || cmp==0){
if (!their_sid || cmp_sid_t(&conv->them, their_sid)==0){
// update read offset
// - never past their last message
// - never rewind, only advance

View File

@ -29,8 +29,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#endif
#include "rhizome.h"
#define MESHMS_MESSAGE_MAX_LEN 4095
#include "message_ply.h"
/* The result of a MeshMS operation. Negative indicates failure, zero or
* positive success.
@ -49,23 +48,14 @@ __MESHMS_INLINE int meshms_failed(enum meshms_status status) {
const char *meshms_status_message(enum meshms_status);
// the manifest details for one half of a conversation
struct meshms_ply {
rhizome_bid_t bundle_id;
uint64_t version;
uint64_t tail;
uint64_t size;
uint8_t found;
};
struct meshms_conversations {
struct meshms_conversations *_next;
// who are we talking to?
sid_t them;
struct meshms_ply my_ply;
struct meshms_ply their_ply;
struct message_ply my_ply;
struct message_ply their_ply;
// what is the offset of their last message
uint64_t their_last_message;
@ -75,21 +65,6 @@ struct meshms_conversations {
uint64_t their_size;
};
// cursor state for reading one half of a conversation
struct meshms_ply_read {
// rhizome payload
struct rhizome_read read;
// block buffer
struct rhizome_read_buffer buff;
// details of the current record
uint64_t record_end_offset;
uint16_t record_length;
size_t record_size;
char type;
// raw record data
unsigned char *record;
};
/* Fetch the list of all MeshMS conversations into a binary tree whose nodes
* are all allocated by malloc(3).
*/
@ -158,10 +133,8 @@ struct meshms_message_iterator {
// Private implementation -- could change, so don't use them.
sid_t _my_sid;
struct meshms_conversations *_conv;
rhizome_manifest *_my_manifest;
rhizome_manifest *_their_manifest;
struct meshms_ply_read _my_reader;
struct meshms_ply_read _their_reader;
struct message_ply_read _my_reader;
struct message_ply_read _their_reader;
uint64_t _end_range;
bool_t _in_ack;
};

View File

@ -617,7 +617,7 @@ static int send_mime_part_header(struct http_request *hr, const struct mime_part
if (r->u.sendmsg.received_message)
return http_response_form_part(r, 400, "Duplicate", PART_MESSAGE, NULL, 0);
r->u.sendmsg.current_part = PART_MESSAGE;
form_buf_malloc_init(&r->u.sendmsg.message, MESHMS_MESSAGE_MAX_LEN);
form_buf_malloc_init(&r->u.sendmsg.message, MESSAGE_PLY_MAX_LEN);
}
else
return http_response_form_part(r, 415, "Unsupported", h->content_disposition.name, NULL, 0);
@ -648,7 +648,7 @@ static int restful_meshms_sendmessage_end(struct http_request *hr)
if (!r->u.sendmsg.received_message)
return http_response_form_part(r, 400, "Missing", PART_MESSAGE, NULL, 0);
assert(r->u.sendmsg.message.length > 0);
assert(r->u.sendmsg.message.length <= MESHMS_MESSAGE_MAX_LEN);
assert(r->u.sendmsg.message.length <= MESSAGE_PLY_MAX_LEN);
enum meshms_status status;
if (meshms_failed(status = meshms_send_message(&r->sid1, &r->sid2, r->u.sendmsg.message.buffer, r->u.sendmsg.message.length)))
return http_request_meshms_response(r, 0, NULL, status);

223
message_ply.c Normal file
View File

@ -0,0 +1,223 @@
#include "serval.h"
#include "dataformats.h"
#include "rhizome.h"
#include "message_ply.h"
#include "log.h"
#include "debug.h"
#include "conf.h"
#include "overlay_buffer.h"
static int message_ply_load_manifest(const keyring_identity *id, struct message_ply *ply, rhizome_manifest *m)
{
assert(ply->found);
if (rhizome_retrieve_manifest(&ply->bundle_id, m) != RHIZOME_BUNDLE_STATUS_SAME)
return -1;
rhizome_authenticate_author(m);
if (!m->haveSecret || m->authorship != AUTHOR_AUTHENTIC)
return -1;
assert(m->author_identity == id);
return 0;
}
static int message_ply_fill_manifest(const keyring_identity *id, const sid_t *recipient, struct message_ply *ply, rhizome_manifest *m)
{
assert(!ply->found);
rhizome_manifest_set_sender(m, id->box_pk);
rhizome_manifest_set_recipient(m, recipient);
rhizome_manifest_set_filesize(m, 0);
rhizome_manifest_set_tail(m, 0);
rhizome_manifest_set_author_identity(m, id);
int ret=-1;
struct rhizome_bundle_result result = rhizome_fill_manifest(m, NULL);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_NEW:
case RHIZOME_BUNDLE_STATUS_SAME:
case RHIZOME_BUNDLE_STATUS_DUPLICATE:
ret = 0;
break;
case RHIZOME_BUNDLE_STATUS_ERROR:
case RHIZOME_BUNDLE_STATUS_INVALID:
case RHIZOME_BUNDLE_STATUS_INCONSISTENT:
WHYF("Error creating ply manifest: %s", alloca_rhizome_bundle_result(result));
break;
case RHIZOME_BUNDLE_STATUS_BUSY:
// TODO
case RHIZOME_BUNDLE_STATUS_OLD:
case RHIZOME_BUNDLE_STATUS_FAKE:
case RHIZOME_BUNDLE_STATUS_NO_ROOM:
case RHIZOME_BUNDLE_STATUS_MANIFEST_TOO_BIG:
WARNF("Cannot create ply manifest: %s", alloca_rhizome_bundle_result(result));
break;
case RHIZOME_BUNDLE_STATUS_READONLY:
INFOF("Cannot create ply manifest: %s", alloca_rhizome_bundle_result(result));
break;
}
rhizome_bundle_result_free(&result);
if (ret==0){
assert(m->haveSecret);
assert(!recipient || m->payloadEncryption == PAYLOAD_ENCRYPTED);
ply->bundle_id = m->cryptoSignPublic;
ply->found = 1;
}
return ret;
}
int message_ply_append(const keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b)
{
rhizome_manifest *mout = NULL;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return -1;
int ret=-1;
if (ply->found){
if (message_ply_load_manifest(id, ply, m)!=0)
goto end;
} else {
rhizome_manifest_set_service(m, service);
if (message_ply_fill_manifest(id, recipient, ply, m)!=0)
goto end;
}
enum rhizome_payload_status pstatus = rhizome_append_journal_buffer(m, 0, ob_ptr(b), ob_position(b));
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW)
goto end;
struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 1);
if (result.status != RHIZOME_BUNDLE_STATUS_NEW){
WARNF("Cannot create message ply manifest: %s", alloca_rhizome_bundle_result(result));
rhizome_bundle_result_free(&result);
goto end;
}
rhizome_bundle_result_free(&result);
ret = 0;
end:
if (mout && mout!=m)
rhizome_manifest_free(mout);
if (m)
rhizome_manifest_free(m);
return ret;
}
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid)
{
DEBUGF(meshms, "Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
return -1;
int ret=-1;
if (rhizome_retrieve_manifest(bid, m) == RHIZOME_BUNDLE_STATUS_SAME
&& rhizome_open_decrypt_read(m, &ply->read) == RHIZOME_PAYLOAD_STATUS_STORED){
assert(m->filesize != RHIZOME_SIZE_UNSET);
ply->read.offset = ply->read.length = m->filesize;
ret = 0;
}
rhizome_manifest_free(m);
return ret;
}
void message_ply_read_close(struct message_ply_read *ply)
{
if (ply->record){
free(ply->record);
ply->record=NULL;
}
ply->record_size=0;
ply->buff.len=0;
rhizome_read_close(&ply->read);
}
// read the next record from the ply (backwards)
// returns -1 if there is an error, or if at the end of records
int message_ply_read_prev(struct message_ply_read *ply)
{
ply->record_end_offset = ply->read.offset;
unsigned char footer[2];
if (ply->read.offset <= sizeof footer) {
DEBUG(meshms, "EOF");
return -1;
}
ply->read.offset -= sizeof footer;
ssize_t read = rhizome_read_buffered(&ply->read, &ply->buff, footer, sizeof footer);
if (read == -1)
return WHYF("rhizome_read_buffered() failed");
if ((size_t) read != sizeof footer)
return WHYF("Expected %zu bytes read, got %zu", (size_t) sizeof footer, (size_t) read);
// (rhizome_read automatically advances the offset by the number of bytes read)
{
uint16_t r = read_uint16(footer);
ply->type = r & 0xF;
ply->record_length = r >> 4;
}
DEBUGF(meshms, "Found record %d, length %d @%"PRId64, ply->type, ply->record_length, ply->record_end_offset);
// need to allow for advancing the tail and cutting a message in half.
if (ply->record_length + sizeof footer > ply->read.offset){
DEBUGF(meshms, "EOF");
return -1;
}
ply->read.offset -= ply->record_length + sizeof(footer);
uint64_t record_start = ply->read.offset;
if (ply->record_size < ply->record_length){
ply->record_size = ply->record_length;
unsigned char *b = erealloc(ply->record, ply->record_size);
if (!b)
return -1;
ply->record = b;
}
read = rhizome_read_buffered(&ply->read, &ply->buff, ply->record, ply->record_length);
if (read == -1)
return WHYF("rhizome_read_buffered() failed");
if ((size_t) read != ply->record_length)
return WHYF("Expected %u bytes read, got %zu", ply->record_length, (size_t) read);
ply->read.offset = record_start;
return 0;
}
// keep reading past messages until you find this type.
int message_ply_find_prev(struct message_ply_read *ply, const char message_type)
{
int ret;
while ((ret = message_ply_read_prev(ply)) == 0 && ply->type != message_type)
;
return ret;
}
static void append_footer(struct overlay_buffer *b, char type)
{
size_t message_len = ob_position(b) - ob_mark(b);
assert(message_len <= MESSAGE_PLY_MAX_LEN);
ob_append_ui16_rv(b, (message_len << 4) | (type&0xF));
}
// append a timestamp as a uint32_t with 1s precision
void message_ply_append_timestamp(struct overlay_buffer *b)
{
if (!config.rhizome.reliable_clock)
return;
ob_checkpoint(b);
ob_append_ui32_rv(b, gettime());
append_footer(b, MESSAGE_BLOCK_TYPE_TIME);
}
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset)
{
ob_checkpoint(b);
ob_append_packed_ui64(b, message_offset);
if (previous_ack_offset)
ob_append_packed_ui64(b, message_offset - previous_ack_offset);
append_footer(b, MESSAGE_BLOCK_TYPE_ACK);
}
void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len)
{
ob_checkpoint(b);
ob_append_strn(b, message, message_len);
append_footer(b, MESSAGE_BLOCK_TYPE_MESSAGE);
}

45
message_ply.h Normal file
View File

@ -0,0 +1,45 @@
#ifndef __SERVAL_DNA__MESSAGE_PLY_H
#define __SERVAL_DNA__MESSAGE_PLY_H
#define MESSAGE_PLY_MAX_LEN 4095
#define MESSAGE_BLOCK_TYPE_ACK 0x01
#define MESSAGE_BLOCK_TYPE_MESSAGE 0x02 // NUL-terminated UTF8 string
#define MESSAGE_BLOCK_TYPE_TIME 0x03 // local timestamp record
// the manifest details for one ply
struct message_ply {
rhizome_bid_t bundle_id;
uint64_t version;
uint64_t tail;
uint64_t size;
uint8_t found;
};
// cursor state for reading one ply
struct message_ply_read {
// rhizome payload
struct rhizome_read read;
// block buffer
struct rhizome_read_buffer buff;
// details of the current record
uint64_t record_end_offset;
uint16_t record_length;
size_t record_size;
char type;
// raw record data
unsigned char *record;
};
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid);
void message_ply_read_close(struct message_ply_read *ply);
int message_ply_read_prev(struct message_ply_read *ply);
int message_ply_find_prev(struct message_ply_read *ply, char type);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset);
void message_ply_append_timestamp(struct overlay_buffer *b);
void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len);
int message_ply_append(const struct keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b);
#endif

View File

@ -274,6 +274,12 @@ void _ob_append_str(struct __sourceloc whence, struct overlay_buffer *b, const c
_ob_append_bytes(whence, b, (const uint8_t*)str, strlen(str)+1);
}
void _ob_append_strn(struct __sourceloc whence, struct overlay_buffer *b, const char *str, size_t max_len)
{
_ob_append_bytes(whence, b, (const uint8_t*)str, strnlen(str, max_len));
_ob_append_byte(whence, b, 0);
}
void _ob_append_ui16(struct __sourceloc __whence, struct overlay_buffer *b, uint16_t v)
{
const int bytes = 2;
@ -626,6 +632,11 @@ size_t ob_position(struct overlay_buffer *b)
return b->position;
}
size_t ob_mark(struct overlay_buffer *b)
{
return b->checkpointLength;
}
size_t ob_limit(struct overlay_buffer *b)
{
return b->sizeLimit;

View File

@ -66,6 +66,7 @@ void _ob_append_ui64_rv(struct __sourceloc whence, struct overlay_buffer *b, uin
void _ob_append_packed_ui32(struct __sourceloc whence, struct overlay_buffer *b, uint32_t v);
void _ob_append_packed_ui64(struct __sourceloc whence, struct overlay_buffer *b, uint64_t v);
void _ob_append_str(struct __sourceloc whence, struct overlay_buffer *b, const char *str);
void _ob_append_strn(struct __sourceloc whence, struct overlay_buffer *b, const char *str, size_t max_len);
#define ob_new() _ob_new(__WHENCE__)
#define ob_static(bytes, size) _ob_static(__WHENCE__, bytes, size)
@ -94,6 +95,7 @@ void _ob_append_str(struct __sourceloc whence, struct overlay_buffer *b, const c
#define ob_append_packed_ui32(b, v) _ob_append_packed_ui32(__WHENCE__, b, v)
#define ob_append_packed_ui64(b, v) _ob_append_packed_ui64(__WHENCE__, b, v)
#define ob_append_str(b, s) _ob_append_str(__WHENCE__, b, s)
#define ob_append_strn(b, s, count) _ob_append_strn(__WHENCE__, b, s, count)
// get one byte, -ve number indicates failure
int ob_peek(struct overlay_buffer *b);
@ -118,6 +120,7 @@ uint64_t ob_get_packed_ui64(struct overlay_buffer *b);
size_t ob_position(struct overlay_buffer *b);
size_t ob_limit(struct overlay_buffer *b);
size_t ob_remaining(struct overlay_buffer *b);
size_t ob_mark(struct overlay_buffer *b);
int _ob_overrun(struct __sourceloc, struct overlay_buffer *b);
// get the raw pointer of the whole buffer
unsigned char* ob_ptr(struct overlay_buffer *b);

View File

@ -1124,7 +1124,7 @@ int rhizome_manifest_remove_field(rhizome_manifest *m, const char *field_label,
*/
int rhizome_manifest_validate(rhizome_manifest *m)
{
return rhizome_manifest_validate_reason(m) == NULL ? 1 : 0;
return (m->finalised || rhizome_manifest_validate_reason(m) == NULL) ? 1 : 0;
}
/* If all essential (transport) fields are present and well formed then sets the m->finalised field

View File

@ -1044,7 +1044,8 @@ enum rhizome_payload_status rhizome_open_read(struct rhizome_read *read, const r
if (sqlite_exec_uint64(&read->length,"SELECT length FROM FILES WHERE id = ?",
RHIZOME_FILEHASH_T, &read->id, END) == -1)
return RHIZOME_PAYLOAD_STATUS_ERROR;
assert(read->length>0);
if (sqlite_exec_uint64(&read->blob_rowid,
"SELECT rowid "
"FROM FILEBLOBS "
@ -1240,6 +1241,10 @@ ssize_t rhizome_read_buffered(struct rhizome_read *read, struct rhizome_read_buf
void rhizome_read_close(struct rhizome_read *read)
{
if (read->length == 0)
// bzero'd & never opened, or already closed
return;
if (read->blob_fd != -1) {
DEBUGF(rhizome_store, "Closing store fd %d", read->blob_fd);
close(read->blob_fd);
@ -1258,6 +1263,7 @@ void rhizome_read_close(struct rhizome_read *read)
RHIZOME_FILEHASH_T, &read->id,
END);
}
read->length = 0;
}
struct cache_entry{

View File

@ -61,6 +61,7 @@ SERVAL_DAEMON_SOURCES = \
main.c \
radio_link.c \
meshms.c \
message_ply.c \
keyring_restful.c \
meshms_restful.c \
msp_client.c \

View File

@ -75,7 +75,6 @@ test_MessageDelivery() {
tfw_log "CONV_BID=$CONV_BID CONV_SECRET=$CONV_SECRET"
# 5. mark the first message as read
executeOk_servald meshms read messages $SIDA2 $SIDA1 5
tfw_cat --stderr
check_meshms_bundles
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$"