Reduce redundant reading of message ply's

This commit is contained in:
Jeremy Lakeman 2016-08-17 11:28:36 +09:30
parent ce36e6f2c6
commit e13b9c3c94
12 changed files with 655 additions and 435 deletions

View File

@ -179,12 +179,12 @@ typedef struct httpd_request
struct newsince_position {
enum meshms_which_ply which_ply;
uint64_t offset;
uint64_t their_ack;
}
token,
current,
latest;
time_ms_t end_time;
uint64_t highest_ack_offset;
enum list_phase phase;
size_t rowcount;
struct meshms_message_iterator iter;

692
meshms.c
View File

@ -66,7 +66,7 @@ static enum meshms_status get_my_conversation_bundle(const keyring_identity *id,
if (m->haveSecret == NEW_BUNDLE_ID) {
rhizome_manifest_set_service(m, RHIZOME_SERVICE_FILE);
rhizome_manifest_set_name(m, "");
rhizome_manifest_set_author_identity(m, id);
// setting the author would imply needing a BK, which we don't need since the private key is seeded above.
struct rhizome_bundle_result result = rhizome_fill_manifest(m, NULL);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_NEW:
@ -123,26 +123,21 @@ static struct meshms_conversations *add_conv(struct meshms_conversations **conv,
// find matching conversations
// if their_sid == my_sid, return all conversations with any recipient
static enum meshms_status get_database_conversations(const keyring_identity *id, const sid_t *their_sid, struct meshms_conversations **conv)
static enum meshms_status get_database_conversations(const keyring_identity *id, struct meshms_conversations **conv)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
"SELECT id, version, filesize, tail, sender, recipient"
" FROM manifests"
" WHERE service = ?3"
" AND (sender=?1 or recipient=?1)"
" AND (sender=?2 or recipient=?2)",
" WHERE service = ?2"
" AND (sender=?1 or recipient=?1)",
SID_T, id->box_pk,
SID_T, their_sid ? their_sid : id->box_pk,
STATIC_TEXT, RHIZOME_SERVICE_MESHMS2,
END
);
if (!statement)
return MESHMS_STATUS_ERROR;
DEBUGF(meshms, "Looking for conversations for %s, %s",
alloca_tohex_sid_t(*id->box_pk),
alloca_tohex_sid_t(*(their_sid ? their_sid : id->box_pk))
);
DEBUGF(meshms, "Looking for conversations for %s", alloca_tohex_sid_t(*id->box_pk));
int r;
while ((r=sqlite_step_retry(&retry, statement)) == SQLITE_ROW) {
const char *id_hex = (const char *)sqlite3_column_text(statement, 0);
@ -151,7 +146,7 @@ static enum meshms_status get_database_conversations(const keyring_identity *id,
int64_t tail = sqlite3_column_int64(statement, 3);
const char *sender = (const char *)sqlite3_column_text(statement, 4);
const char *recipient = (const char *)sqlite3_column_text(statement, 5);
DEBUGF(meshms, "found id %s, sender %s, recipient %s", id_hex, sender, recipient);
DEBUGF(meshms, "found id %s, sender %s, recipient %s, size %"PRId64, id_hex, sender, recipient, size);
rhizome_bid_t bid;
if (str_to_rhizome_bid_t(&bid, id_hex) == -1) {
WHYF("invalid Bundle ID hex: %s -- skipping", alloca_str_toprint(id_hex));
@ -191,94 +186,151 @@ static enum meshms_status get_database_conversations(const keyring_identity *id,
return MESHMS_STATUS_OK;
}
static enum meshms_status find_or_create_conv(keyring_identity *id, const sid_t *their_sid, struct meshms_conversations **conv)
static enum meshms_status open_ply(struct message_ply *ply, struct message_ply_read *reader)
{
enum meshms_status status;
if (meshms_failed(status = meshms_conversations_list(id, NULL, their_sid, conv)))
return status;
if (*conv == NULL) {
if ((*conv = (struct meshms_conversations *) emalloc_zero(sizeof(struct meshms_conversations))) == NULL)
return MESHMS_STATUS_ERROR;
(*conv)->them = *their_sid;
status = MESHMS_STATUS_UPDATED;
if (ply->found
&& !message_ply_is_open(reader)
&& message_ply_read_open(reader, &ply->bundle_id)!=0)
return MESHMS_STATUS_ERROR;
return MESHMS_STATUS_OK;
}
static enum meshms_status update_their_stats(struct meshms_metadata *metadata, struct message_ply *ply, struct message_ply_read *reader)
{
DEBUGF(meshms, "Update their stats? (theirsize=%"PRIu64", plysize=%"PRIu64", lastmessage=%"PRIu64", lastackoffset=%"PRIu64", lastack=%"PRIu64")",
metadata->their_size,
ply->size,
metadata->their_last_message,
metadata->their_last_ack_offset,
metadata->their_last_ack
);
if (metadata->their_size != ply->size){
enum meshms_status status;
if (meshms_failed(status = open_ply(ply, reader)))
return status;
uint8_t found_their_msg=0;
uint8_t found_their_ack=0;
while((!found_their_msg || !found_their_ack) && message_ply_read_prev(reader) == 0){
// stop if we've seen these records before
if (reader->record_end_offset <= metadata->their_size)
break;
switch(reader->type){
case MESSAGE_BLOCK_TYPE_MESSAGE:
if (!found_their_msg){
found_their_msg = 1;
metadata->their_last_message = reader->record_end_offset;
DEBUGF(meshms, "Found their last message @%"PRIu64, metadata->their_last_message);
}
break;
case MESSAGE_BLOCK_TYPE_ACK:
if (!found_their_ack){
found_their_ack = 1;
uint64_t value=0;
metadata->their_last_ack_offset = reader->record_end_offset;
if (unpack_uint(reader->record, reader->record_length, &value) != -1){
metadata->their_last_ack = value;
}
DEBUGF(meshms, "Found their last ack @%"PRIu64" = %"PRIu64,
metadata->their_last_ack_offset, metadata->their_last_ack);
}
break;
}
}
metadata->their_size = ply->size;
message_ply_read_rewind(reader);
return MESHMS_STATUS_UPDATED;
}
return MESHMS_STATUS_OK;
}
static enum meshms_status update_my_stats(struct meshms_metadata *metadata, struct message_ply *ply, struct message_ply_read *reader)
{
DEBUGF(meshms, "Update my stats? (mysize=%"PRIu64", plysize=%"PRIu64", lastack=%"PRIu64")",
metadata->my_size,
ply->size,
metadata->my_last_ack);
if (metadata->my_size != ply->size){
enum meshms_status status;
if (meshms_failed(status = open_ply(ply, reader)))
return status;
if (message_ply_find_prev(reader, MESSAGE_BLOCK_TYPE_ACK)==0){
uint64_t my_ack = 0;
if (unpack_uint(reader->record, reader->record_length, &my_ack) != -1){
metadata->my_last_ack = my_ack;
DEBUGF(meshms, "Found my last ack %"PRId64, my_ack);
}
}
metadata->my_size = ply->size;
message_ply_read_rewind(reader);
return MESHMS_STATUS_UPDATED;
}
return MESHMS_STATUS_OK;
}
static enum meshms_status update_stats(struct meshms_conversations *conv)
{
enum meshms_status status = MESHMS_STATUS_OK;
struct message_ply_read reader;
bzero(&reader, sizeof reader);
enum meshms_status tmp_status = update_their_stats(&conv->metadata, &conv->their_ply, &reader);
message_ply_read_close(&reader);
if (meshms_failed(tmp_status))
return tmp_status;
if (tmp_status == MESHMS_STATUS_UPDATED)
status = tmp_status;
// Nothing else to be done if they have never sent us anything
if (!conv->metadata.their_last_message)
return status;
tmp_status = update_my_stats(&conv->metadata, &conv->my_ply, &reader);
message_ply_read_close(&reader);
if (meshms_failed(tmp_status))
return tmp_status;
if (tmp_status == MESHMS_STATUS_UPDATED)
status = tmp_status;
return status;
}
// update if any conversations are unread or need to be acked.
// create an ack if required.
// return MESHMS_STATUS_UPDATED if the conversation index needs to be saved.
static enum meshms_status update_conversation(const keyring_identity *id, struct meshms_conversations *conv)
{
DEBUG(meshms, "Checking if conversation needs to be acked");
// Nothing to be done if they have never sent us anything
if (!conv->their_ply.found)
return MESHMS_STATUS_OK;
uint64_t last_offset=0;
{
struct message_ply_read ply;
bzero(&ply, sizeof ply);
if (message_ply_read_open(&ply, &conv->their_ply.bundle_id)!=0)
return MESHMS_STATUS_ERROR;
enum meshms_status status = update_stats(conv);
if (meshms_failed(status))
return status;
DEBUG(meshms, "Locating their last message");
if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_MESSAGE)==0){
last_offset = ply.record_end_offset;
DEBUGF(meshms, "Found last message @%"PRId64, last_offset);
}
message_ply_read_close(&ply);
}
if (conv->metadata.my_last_ack >= conv->metadata.their_last_message)
return status;
// Perhaps only an ack has been added
if (last_offset == 0 || conv->their_last_message == last_offset)
return MESHMS_STATUS_OK;
// append an ack for their message
DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, conv->metadata.my_last_ack, conv->metadata.their_last_message);
unsigned char buffer[30];
struct overlay_buffer *b = ob_static(buffer, sizeof buffer);
// find our previous ack
uint64_t previous_ack = 0;
if (conv->my_ply.found){
struct message_ply_read ply;
bzero(&ply, sizeof ply);
if (message_ply_read_open(&ply, &conv->my_ply.bundle_id)!=0)
return MESHMS_STATUS_ERROR;
message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
DEBUG(meshms, "Locating our previous ack");
if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_ACK)==0){
if (unpack_uint(ply.record, ply.record_length, &previous_ack) == -1)
previous_ack=0;
else
DEBUGF(meshms, "Previous ack is %"PRId64, previous_ack);
}
message_ply_read_close(&ply);
if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, &conv->them, &conv->my_ply, b)!=0){
status = MESHMS_STATUS_ERROR;
}else{
DEBUGF(meshms, "No outgoing ply");
conv->metadata.my_last_ack = conv->metadata.their_last_message;
conv->metadata.my_size += ob_position(b);
status = MESHMS_STATUS_UPDATED;
}
// Note that we may have already acked this message, but failed to record it in our conversation list bundle
enum meshms_status status = MESHMS_STATUS_UPDATED;
if (previous_ack < last_offset){
// append an ack for their message
DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, previous_ack, last_offset);
unsigned char buffer[30];
struct overlay_buffer *b = ob_static(buffer, sizeof buffer);
message_ply_append_ack(b, last_offset, previous_ack);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, &conv->them, &conv->my_ply, b)!=0)
status = MESHMS_STATUS_ERROR;
ob_free(b);
}
if (!meshms_failed(status)){
// if it's all good, remember the size of their ply at the time we examined it.
conv->their_last_message = last_offset;
conv->their_size = conv->their_ply.size;
}
ob_free(b);
return status;
}
@ -290,19 +342,17 @@ static enum meshms_status update_conversations(const keyring_identity *id, struc
struct meshms_conversations **ptr = conv;
while (*ptr) {
struct meshms_conversations *n = *ptr;
if (n->their_size != n->their_ply.size) {
enum meshms_status status;
if (meshms_failed(status = update_conversation(id, n)))
return status;
if (status == MESHMS_STATUS_UPDATED){
rstatus = MESHMS_STATUS_UPDATED;
if (n != *conv){
DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them));
*ptr = n->_next;
n->_next = *conv;
*conv = n;
continue;
}
enum meshms_status status;
if (meshms_failed(status = update_conversation(id, n)))
return status;
if (status == MESHMS_STATUS_UPDATED){
rstatus = MESHMS_STATUS_UPDATED;
if (n != *conv){
DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them));
*ptr = n->_next;
n->_next = *conv;
*conv = n;
continue;
}
}
ptr = &(*ptr)->_next;
@ -312,7 +362,7 @@ static enum meshms_status update_conversations(const keyring_identity *id, struc
// read our cached conversation list from our rhizome payload
// if we can't load the existing data correctly, just ignore it.
static enum meshms_status read_known_conversations(rhizome_manifest *m, const sid_t *their_sid, struct meshms_conversations **conv)
static enum meshms_status read_known_conversations(rhizome_manifest *m, struct meshms_conversations **conv)
{
if (m->haveSecret==NEW_BUNDLE_ID)
return MESHMS_STATUS_OK;
@ -332,56 +382,87 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
if (pstatus != RHIZOME_PAYLOAD_STATUS_STORED && pstatus != RHIZOME_PAYLOAD_STATUS_EMPTY)
goto end;
unsigned char version=0xFF;
uint8_t version=0xFF;
ssize_t r = rhizome_read_buffered(&read, &buff, &version, 1);
if (r == -1)
goto end;
if (version != 1) {
WARNF("Expected version 1 (got 0x%02x)", version);
if (version < 1 || version > 2) {
WARNF("Unknown file format version (got 0x%02x)", version);
goto end;
}
while (1) {
sid_t sid;
r = rhizome_read_buffered(&read, &buff, sid.binary, sizeof sid.binary);
if (r == 0) {
uint8_t buffer[SID_SIZE + 12*3 + 1];
ssize_t bytes = rhizome_read_buffered(&read, &buff, buffer, sizeof buffer);
if (bytes == 0) {
status = MESHMS_STATUS_OK;
goto end;
}
if (r != sizeof sid.binary)
if (bytes < SID_SIZE+1)
break;
DEBUGF(meshms, "Reading existing conversation for %s", alloca_tohex_sid_t(sid));
// unpack the stored details first so we know where the next record is
unsigned char details[12*3];
r = rhizome_read_buffered(&read, &buff, details, sizeof details);
if (r == -1)
break;
int bytes = r;
uint64_t last_message=0;
uint64_t read_offset=0;
uint64_t their_size=0;
int ofs = 0;
int unpacked = unpack_uint(details, bytes, &last_message);
if (unpacked == -1)
break;
ofs += unpacked;
unpacked = unpack_uint(details+ofs, bytes-ofs, &read_offset);
if (unpacked == -1)
break;
ofs += unpacked;
unpacked = unpack_uint(details+ofs, bytes-ofs, &their_size);
if (unpacked == -1)
break;
ofs += unpacked;
const sid_t *sid = (sid_t *)&buffer[0];
int ofs = SID_SIZE;
uint8_t flags = 0; // TODO flags
struct meshms_metadata metadata;
bzero(&metadata, sizeof metadata);
int unpacked;
if (version==1){
// force re-reading ply details
uint64_t ignored=0;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1)
break;
ofs += unpacked;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1)
break;
ofs += unpacked;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1)
break;
ofs += unpacked;
}else if(version>=2){
uint64_t delta=0;
flags = buffer[ofs++];
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.their_size)) == -1)
break;
ofs += unpacked;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.my_size)) == -1)
break;
ofs += unpacked;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_message = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.read_offset = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_ack_offset = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.my_last_ack = metadata.their_size - delta;
if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
break;
ofs += unpacked;
metadata.their_last_ack = metadata.my_size - delta;
}
read.offset += ofs - bytes;
// skip uninteresting records
if (their_sid && cmp_sid_t(&sid, their_sid) != 0)
continue;
struct meshms_conversations *n = emalloc_zero(sizeof(struct meshms_conversations));
if (!n)
goto end;
@ -389,10 +470,18 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
*ptr = n;
ptr = &n->_next;
n->them = sid;
n->their_last_message = last_message;
n->read_offset = read_offset;
n->their_size = their_size;
n->them = *sid;
n->metadata = metadata;
DEBUGF(meshms, "Unpacked existing conversation for %s (their_size=%"PRIu64", my_size=%"PRIu64", last_message=%"PRIu64", read_offset=%"PRIu64", my_ack=%"PRIu64", their_ack=%"PRIu64")",
alloca_tohex_sid_t(*sid),
metadata.their_size,
metadata.my_size,
metadata.their_last_message,
metadata.read_offset,
metadata.my_last_ack,
metadata.their_last_ack
);
}
end:
rhizome_read_close(&read);
@ -402,27 +491,41 @@ end:
static ssize_t write_conversation(struct rhizome_write *write, struct meshms_conversations *conv)
{
size_t len=0;
unsigned char buffer[sizeof(conv->them) + (8*3)];
if (write)
bcopy(conv->them.binary, buffer, sizeof(conv->them));
unsigned char buffer[sizeof(conv->them) + (12*3) + 1];
bcopy(conv->them.binary, buffer, sizeof(conv->them));
len+=sizeof(conv->them);
buffer[len++] = 0; // TODO reserved for flags
assert(conv->metadata.their_size >= conv->metadata.their_last_message);
assert(conv->metadata.their_size >= conv->metadata.read_offset);
assert(conv->metadata.their_size >= conv->metadata.my_last_ack);
assert(conv->metadata.their_size >= conv->metadata.their_last_ack_offset);
assert(conv->metadata.my_size >= conv->metadata.their_last_ack);
// assume that most ack & read offsets are going to be near the ply length
// so store them as delta's.
len+=pack_uint(&buffer[len], conv->metadata.their_size);
len+=pack_uint(&buffer[len], conv->metadata.my_size);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_message);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.read_offset);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_ack_offset);
len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.my_last_ack);
len+=pack_uint(&buffer[len], conv->metadata.my_size - conv->metadata.their_last_ack);
assert(len <= sizeof buffer);
if (write){
len+=pack_uint(&buffer[len], conv->their_last_message);
len+=pack_uint(&buffer[len], conv->read_offset);
len+=pack_uint(&buffer[len], conv->their_size);
int ret=rhizome_write_buffer(write, buffer, len);
if (ret == -1)
return ret;
}else{
len+=measure_packed_uint(conv->their_last_message);
len+=measure_packed_uint(conv->read_offset);
len+=measure_packed_uint(conv->their_size);
}
DEBUGF(meshms, "len %s, %"PRId64", %"PRId64", %"PRId64" = %zu",
alloca_tohex_sid_t(conv->them),
conv->their_last_message,
conv->read_offset,
conv->their_size,
conv->metadata.their_last_message,
conv->metadata.read_offset,
conv->metadata.their_size,
len
);
return len;
@ -449,33 +552,34 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct
bzero(&write, sizeof(write));
enum meshms_status status = MESHMS_STATUS_ERROR;
// TODO rebalance tree...
// TODO rebalance tree...?
// measure the final payload first
ssize_t len=write_conversations(NULL, conv);
if (len == -1)
goto end;
// then write it
rhizome_manifest_set_version(m, m->version + 1);
rhizome_manifest_set_filesize(m, (size_t)len + 1);
rhizome_manifest_set_filesize(m, RHIZOME_SIZE_UNSET);
rhizome_manifest_set_filehash(m, NULL);
enum rhizome_payload_status pstatus = rhizome_write_open_manifest(&write, m);
if (pstatus!=RHIZOME_PAYLOAD_STATUS_NEW)
// TODO log something?
goto end;
unsigned char version=1;
uint8_t version=2;
if (rhizome_write_buffer(&write, &version, 1) == -1)
goto end;
if (write_conversations(&write, conv) == -1)
goto end;
if (write.file_offset == 1){
// Don't bother if we don't know anyone.
status = MESHMS_STATUS_OK;
goto end;
}
pstatus = rhizome_finish_write(&write);
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW)
goto end;
rhizome_manifest_set_filehash(m, &write.id);
rhizome_manifest_set_filesize(m, write.file_length);
struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 1);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_ERROR:
@ -523,13 +627,35 @@ end:
return status;
}
static enum meshms_status meshms_open_list(const keyring_identity *id, rhizome_manifest *m, struct meshms_conversations **conv)
{
enum meshms_status status;
if (meshms_failed(status = get_my_conversation_bundle(id, m)))
goto end;
// read conversations payload
if (meshms_failed(status = read_known_conversations(m, conv)))
goto end;
status = get_database_conversations(id, conv);
end:
return status;
}
static enum meshms_status meshms_save_list(const keyring_identity *id, rhizome_manifest *m, struct meshms_conversations **conv)
{
enum meshms_status status;
if ((status = update_conversations(id, conv)) == MESHMS_STATUS_UPDATED)
status = write_known_conversations(m, *conv);
return status;
}
// read information about existing conversations from a rhizome payload
enum meshms_status meshms_conversations_list(const keyring_identity *id, const sid_t *my_sid, const sid_t *their_sid, struct meshms_conversations **conv)
enum meshms_status meshms_conversations_list(const keyring_identity *id, const sid_t *my_sid, struct meshms_conversations **conv)
{
enum meshms_status status = MESHMS_STATUS_ERROR;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
goto end;
rhizome_manifest *m=NULL;
assert(id || my_sid);
if (!my_sid){
@ -542,15 +668,16 @@ enum meshms_status meshms_conversations_list(const keyring_identity *id, const s
}
}
if (meshms_failed(status = get_my_conversation_bundle(id, m)))
m = rhizome_new_manifest();
if (!m)
goto end;
// read conversations payload
if (meshms_failed(status = read_known_conversations(m, their_sid, conv)))
if (meshms_failed(status = meshms_open_list(id, m, conv)))
goto end;
if (meshms_failed(status = get_database_conversations(id, their_sid, conv)))
if (meshms_failed(status = meshms_save_list(id, m, conv)))
goto end;
if ((status = update_conversations(id, conv)) == MESHMS_STATUS_UPDATED && their_sid == NULL)
status = write_known_conversations(m, *conv);
end:
rhizome_manifest_free(m);
DEBUGF(meshms, "status=%d", status);
@ -578,12 +705,15 @@ void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *i
enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *iter, const sid_t *me, const sid_t *them)
{
bzero(iter, sizeof *iter);
DEBUGF(meshms, "iter=%p me=%s them=%s", iter,
me ? alloca_tohex_sid_t(*me) : "NULL",
them ? alloca_tohex_sid_t(*them) : "NULL"
);
enum meshms_status status = MESHMS_STATUS_ERROR;
bzero(iter, sizeof *iter);
struct meshms_conversations *conv = NULL;
rhizome_manifest *m = NULL;
keyring_identity *id = keyring_find_identity_sid(keyring, me);
if (!id){
@ -592,71 +722,80 @@ enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *
goto fail;
}
if (meshms_failed(status = find_or_create_conv(id, them, &iter->_conv)))
if (!(m = rhizome_new_manifest()))
goto error;
if (meshms_failed(status = meshms_open_list(id, m, &conv)))
goto fail;
assert(iter->_conv != NULL);
iter->identity = id;
iter->_my_sid = *me;
iter->my_sid = &iter->_my_sid;
iter->their_sid = &iter->_conv->them;
iter->my_ply_bid = &iter->_conv->my_ply.bundle_id;
iter->their_ply_bid = &iter->_conv->their_ply.bundle_id;
iter->read_offset = iter->_conv->read_offset;
iter->timestamp = 0;
// If I have never sent a message (or acked any of theirs), there are no messages in the thread.
if (iter->_conv->my_ply.found) {
int r = message_ply_read_open(&iter->_my_reader, &iter->_conv->my_ply.bundle_id);
if (r != 0)
goto error;
if (iter->_conv->their_ply.found) {
r = message_ply_read_open(&iter->_their_reader, &iter->_conv->their_ply.bundle_id);
if (r != 0)
goto error;
// Find their latest ACK so we know which of my messages have been delivered.
if (message_ply_find_prev(&iter->_their_reader, MESSAGE_BLOCK_TYPE_ACK)==0){
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->latest_ack_my_offset) == -1)
iter->latest_ack_my_offset = 0;
else{
iter->latest_ack_offset = iter->_their_reader.record_end_offset;
DEBUGF(meshms, "Found their last ack @%"PRId64, iter->latest_ack_offset);
}
}
// Re-seek to end of their ply.
iter->_their_reader.read.offset = iter->_their_reader.read.length;
}
} else {
DEBUGF(meshms, "Did not find sender's ply; no messages in thread");
}
iter->_in_ack = 0;
iter->my_sid = *me;
iter->their_sid = *them;
struct meshms_conversations *c = conv;
while(c){
if (cmp_sid_t(them, &c->them)==0){
DEBUGF(meshms, "Found matching conversation, found_mine=%d, found_theirs=%d, read_offset=%"PRId64,
c->my_ply.found, c->their_ply.found, c->metadata.read_offset);
if (meshms_failed(status = update_conversation(id, c)))
goto fail;
if (status == MESHMS_STATUS_UPDATED)
// ignore failures, we can retry later anyway.
write_known_conversations(m, conv);
if (meshms_failed(status = open_ply(&c->my_ply, &iter->_my_reader)))
goto fail;
if (meshms_failed(status = open_ply(&c->their_ply, &iter->_their_reader)))
goto fail;
iter->metadata = c->metadata;
iter->my_ply = c->my_ply;
iter->their_ply = c->their_ply;
if (c->their_ply.found && c->metadata.their_last_message > c->metadata.my_last_ack){
iter->_in_ack = 1;
iter->_their_reader.read.offset = c->metadata.their_last_message;
iter->_end_range = c->metadata.my_last_ack;
}
break;
}
c = c->_next;
}
meshms_free_conversations(conv);
return MESHMS_STATUS_OK;
error:
status = MESHMS_STATUS_ERROR;
fail:
meshms_message_iterator_close(iter);
meshms_free_conversations(conv);
return status;
}
int meshms_message_iterator_is_open(const struct meshms_message_iterator *iter)
{
return iter->_conv != NULL;
}
void meshms_message_iterator_close(struct meshms_message_iterator *iter)
{
DEBUGF(meshms, "iter=%p", iter);
message_ply_read_close(&iter->_my_reader);
message_ply_read_close(&iter->_their_reader);
meshms_free_conversations(iter->_conv);
iter->_conv = NULL;
}
enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *iter)
{
assert(iter->_conv != NULL);
enum meshms_status status = MESHMS_STATUS_UPDATED;
while (status == MESHMS_STATUS_UPDATED) {
if (iter->_in_ack) {
DEBUGF(meshms, "Reading other log from %"PRId64", to %"PRId64, iter->_their_reader.read.offset, iter->_end_range);
DEBUGF(meshms, "iter=%p, in_ack=%d, found_mine=%d, my_offset=%"PRIu64", their_offset=%"PRIu64,
iter, iter->_in_ack, iter->my_ply.found,
iter->_my_reader.read.offset, iter->_their_reader.read.offset);
while (1) {
if (iter->their_ply.found && iter->_in_ack) {
DEBUGF(meshms, "Reading other log from %"PRIu64", to %"PRIu64, iter->_their_reader.read.offset, iter->_end_range);
// just in case we don't have the full bundle in this rhizome store
if (iter->_their_reader.read.offset > iter->_their_reader.read.length)
iter->_their_reader.read.offset = iter->_their_reader.read.length;
// eof or other read errors, skip over messages (the tail is allowed to advance)
if (message_ply_read_prev(&iter->_their_reader)==0){
iter->which_ply = THEIR_PLY;
@ -670,7 +809,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1)
iter->ack_offset = 0;
iter->read = 0;
return status;
return MESHMS_STATUS_UPDATED;
case MESSAGE_BLOCK_TYPE_MESSAGE:
iter->type = MESSAGE_RECEIVED;
iter->offset = iter->_their_reader.record_end_offset;
@ -679,8 +818,8 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
if ( iter->_their_reader.record_length != 0
&& iter->_their_reader.record[iter->_their_reader.record_length - 1] == '\0'
) {
iter->read = iter->_their_reader.record_end_offset <= iter->_conv->read_offset;
return status;
iter->read = iter->_their_reader.record_end_offset <= iter->metadata.read_offset;
return MESHMS_STATUS_UPDATED;
}
WARN("Malformed MeshMS2 ply journal, missing NUL terminator");
return MESHMS_STATUS_PROTOCOL_FAULT;
@ -689,9 +828,12 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
}
}
iter->_in_ack = 0;
status = MESHMS_STATUS_UPDATED;
}else if (message_ply_read_prev(&iter->_my_reader) == 0) {
DEBUGF(meshms, "Offset %"PRId64", type %d, read_offset %"PRId64, iter->_my_reader.read.offset, iter->_my_reader.type, iter->read_offset);
}else if(iter->my_ply.found){
if (message_ply_read_prev(&iter->_my_reader) != 0)
return MESHMS_STATUS_OK;
DEBUGF(meshms, "Offset %"PRId64", type %d, read_offset %"PRId64,
iter->_my_reader.read.offset, iter->_my_reader.type, iter->metadata.read_offset);
iter->which_ply = MY_PLY;
switch (iter->_my_reader.type) {
case MESSAGE_BLOCK_TYPE_TIME:
@ -704,7 +846,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
break;
case MESSAGE_BLOCK_TYPE_ACK:
// Read the received messages up to the ack'ed offset
if (iter->_conv->their_ply.found) {
if (iter->their_ply.found) {
int ofs = unpack_uint(iter->_my_reader.record, iter->_my_reader.record_length, (uint64_t*)&iter->_their_reader.read.offset);
if (ofs == -1) {
WHYF("Malformed ACK");
@ -717,9 +859,6 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
else
iter->_end_range = iter->_their_reader.read.offset - end_range;
// TODO tail
// just in case we don't have the full bundle anymore
if (iter->_their_reader.read.offset > iter->_their_reader.read.length)
iter->_their_reader.read.offset = iter->_their_reader.read.length;
iter->_in_ack = 1;
}
break;
@ -728,14 +867,13 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->offset = iter->_my_reader.record_end_offset;
iter->text = (const char *)iter->_my_reader.record;
iter->text_length = iter->_my_reader.record_length;
iter->delivered = iter->latest_ack_my_offset && iter->_my_reader.record_end_offset <= iter->latest_ack_my_offset;
return status;
iter->delivered = iter->_my_reader.record_end_offset <= iter->metadata.their_last_ack;
return MESHMS_STATUS_UPDATED;
}
}else{
status = MESHMS_STATUS_OK;
return MESHMS_STATUS_OK;
}
}
return status;
}
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len)
@ -747,29 +885,72 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
}
struct meshms_conversations *conv = NULL;
enum meshms_status status = MESHMS_STATUS_ERROR;
rhizome_manifest *m=NULL;
keyring_identity *id = keyring_find_identity_sid(keyring, sender);
if (!id)
return MESHMS_STATUS_SID_LOCKED;
if (meshms_failed(status = find_or_create_conv(id, recipient, &conv)))
m = rhizome_new_manifest();
if (!m)
goto end;
assert(conv != NULL);
if (meshms_failed(status = meshms_open_list(id, m, &conv)))
goto end;
struct meshms_conversations *c = conv;
while(c && cmp_sid_t(recipient, &c->them)!=0)
c = c->_next;
if (!c){
c = (struct meshms_conversations *) emalloc_zero(sizeof(struct meshms_conversations));
if (!c)
goto end;
c->them = *recipient;
c->_next = conv;
conv = c;
status = MESHMS_STATUS_UPDATED;
}
enum meshms_status tmp_status = update_stats(c);
if (meshms_failed(tmp_status))
return tmp_status;
if (tmp_status == MESHMS_STATUS_UPDATED)
status = tmp_status;
// construct a message payload
struct overlay_buffer *b = ob_new();
// if we didn't "know" them, or we just received a new message, we may need to add an ack now.
// lets do that in one hit
uint8_t ack = (c->metadata.my_last_ack < c->metadata.their_last_message) ? 1:0;
DEBUGF(meshms,"Our ack %"PRIu64", their message %"PRIu64, c->metadata.my_last_ack, c->metadata.their_last_message);
if (ack)
message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack);
message_ply_append_message(b, message, message_len);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, recipient, &conv->my_ply, b)==0)
if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, recipient, &c->my_ply, b)==0){
if (ack)
c->metadata.my_last_ack = c->metadata.their_last_message;
c->metadata.my_size += ob_position(b);
// save known conversations since our stats will always change.
write_known_conversations(m, conv);
status = MESHMS_STATUS_UPDATED;
}else{
status = MESHMS_STATUS_ERROR;
}
ob_free(b);
end:
if (m)
rhizome_manifest_free(m);
meshms_free_conversations(conv);
return status;
}
@ -788,36 +969,27 @@ enum meshms_status meshms_mark_read(const sid_t *sender, const sid_t *recipient,
DEBUGF(meshms, "sender=%s recipient=%s offset=%"PRIu64,
alloca_tohex_sid_t(*sender),
recipient ? alloca_tohex_sid_t(*recipient) : "NULL",
recipient ? alloca_tohex_sid_t(*recipient) : "(all)",
offset
);
m = rhizome_new_manifest();
if (!m)
goto end;
if (meshms_failed(status = get_my_conversation_bundle(id, m)))
if (meshms_failed(status = meshms_open_list(id, m, &conv)))
goto end;
// read all conversations, so we can write them again
if (meshms_failed(status = read_known_conversations(m, NULL, &conv)))
goto end;
// read the full list of conversations from the database too
if (meshms_failed(status = get_database_conversations(id, NULL, &conv)))
goto end;
// check if any incoming conversations need to be acked or have new messages and update the read offset
unsigned changed = 0;
// check if any incoming conversations need to be acked or have new messages
if (meshms_failed(status = update_conversations(id, &conv)))
goto end;
if (status == MESHMS_STATUS_UPDATED)
changed = 1;
changed ++;
// update the read offset
changed += mark_read(conv, recipient, offset);
DEBUGF(meshms, "changed=%u", changed);
if (changed) {
if (meshms_failed(status = write_known_conversations(m, conv)))
goto end;
if (status != MESHMS_STATUS_UPDATED) {
WHYF("expecting %d (MESHMS_STATUS_UPDATED), got %s", MESHMS_STATUS_UPDATED, status);
status = MESHMS_STATUS_ERROR;
}
}
if (changed)
status = write_known_conversations(m, conv);
end:
if (m)
rhizome_manifest_free(m);
@ -835,13 +1007,17 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_
// - never past their last message
// - never rewind, only advance
uint64_t new_offset = offset;
if (new_offset > conv->their_last_message)
new_offset = conv->their_last_message;
if (new_offset > conv->read_offset) {
DEBUGF(meshms, "Moving read marker for %s, from %"PRId64" to %"PRId64,
alloca_tohex_sid_t(conv->them), conv->read_offset, new_offset
);
conv->read_offset = new_offset;
if (new_offset > conv->metadata.their_last_message)
new_offset = conv->metadata.their_last_message;
DEBUGF(meshms, "Read marker for %s, to %"PRIu64" (asked for %"PRIu64", was %"PRIu64")",
alloca_tohex_sid_t(conv->them),
new_offset,
offset,
conv->metadata.read_offset);
if (new_offset > conv->metadata.read_offset) {
conv->metadata.read_offset = new_offset;
ret++;
}
if (their_sid)

View File

@ -48,6 +48,24 @@ __MESHMS_INLINE int meshms_failed(enum meshms_status status) {
const char *meshms_status_message(enum meshms_status);
struct meshms_metadata{
// what is the offset of their last message
uint64_t their_last_message;
// what is the offset of their last ack
uint64_t their_last_ack_offset;
// where in our ply, does their ack point
uint64_t their_last_ack;
// what is the last message we marked as read
uint64_t read_offset;
// our cached value for the last known size of their ply
uint64_t their_size;
// where in their ply, does our last ack point
uint64_t my_last_ack;
// our cached value for the last known size of our ply
uint64_t my_size;
};
struct meshms_conversations {
struct meshms_conversations *_next;
@ -56,19 +74,14 @@ struct meshms_conversations {
struct message_ply my_ply;
struct message_ply their_ply;
// what is the offset of their last message
uint64_t their_last_message;
// what is the last message we marked as read
uint64_t read_offset;
// our cached value for the last known size of their ply
uint64_t their_size;
struct meshms_metadata metadata;
};
/* Fetch the list of all MeshMS conversations into a binary tree whose nodes
* are all allocated by malloc(3).
*/
enum meshms_status meshms_conversations_list(const struct keyring_identity *id, const sid_t *my_sid, const sid_t *their_sid, struct meshms_conversations **conv);
enum meshms_status meshms_conversations_list(const struct keyring_identity *id, const sid_t *my_sid, struct meshms_conversations **conv);
void meshms_free_conversations(struct meshms_conversations *conv);
/* For iterating over a binary tree of all MeshMS conversations, as created by
@ -108,13 +121,14 @@ void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *)
struct meshms_message_iterator {
// Public fields that remain fixed for the life of the iterator:
struct keyring_identity *identity;
const sid_t *my_sid;
const sid_t *their_sid;
const rhizome_bid_t *my_ply_bid;
const rhizome_bid_t *their_ply_bid;
uint64_t latest_ack_offset; // offset in remote (their) ply of most recent ACK
uint64_t latest_ack_my_offset; // offset in my ply of most recent message ACKed by them
uint64_t read_offset; // offset in remote (their) ply of most recent message read by me
sid_t my_sid;
sid_t their_sid;
struct message_ply my_ply;
struct message_ply their_ply;
struct meshms_metadata metadata;
// The following public fields change per message:
enum meshms_which_ply { NEITHER_PLY, MY_PLY, THEIR_PLY } which_ply;
enum { MESSAGE_SENT, MESSAGE_RECEIVED, ACK_RECEIVED } type;
@ -131,15 +145,12 @@ struct meshms_message_iterator {
uint64_t ack_offset; // for ACK_RECEIVED
};
// Private implementation -- could change, so don't use them.
sid_t _my_sid;
struct meshms_conversations *_conv;
struct message_ply_read _my_reader;
struct message_ply_read _their_reader;
uint64_t _end_range;
bool_t _in_ack;
uint8_t _in_ack:1;
};
enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *, const sid_t *me, const sid_t *them);
int meshms_message_iterator_is_open(const struct meshms_message_iterator *);
void meshms_message_iterator_close(struct meshms_message_iterator *);
enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *);
@ -149,6 +160,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
* MESHMS_STATUS_UPDATED on success, any other value indicates a failure or
* error (which is already logged).
*/
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len);
/* Update the read offset for one or more conversations. Returns

View File

@ -39,7 +39,7 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_
if (rhizome_opendb() == -1)
goto end;
if (meshms_failed(status = meshms_conversations_list(NULL, &sid, NULL, &conv)))
if (meshms_failed(status = meshms_conversations_list(NULL, &sid, &conv)))
goto end;
const char *names[]={
@ -57,16 +57,16 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_
if (rows >= offset) {
cli_put_long(context, rows, ":");
cli_put_hexvalue(context, it.current->them.binary, sizeof(it.current->them), ":");
cli_put_string(context, it.current->read_offset < it.current->their_last_message ? "unread":"", ":");
cli_put_long(context, it.current->their_last_message, ":");
cli_put_long(context, it.current->read_offset, include_message?":":"\n");
cli_put_string(context, it.current->metadata.read_offset < it.current->metadata.their_last_message ? "unread":"", ":");
cli_put_long(context, it.current->metadata.their_last_message, ":");
cli_put_long(context, it.current->metadata.read_offset, include_message?":":"\n");
if (include_message){
int output = 0;
if (it.current->their_last_message && it.current->their_ply.found){
if (it.current->metadata.their_last_message && it.current->their_ply.found){
struct message_ply_read reader;
bzero(&reader, sizeof reader);
if (message_ply_read_open(&reader, &it.current->their_ply.bundle_id) == 0){
reader.read.offset = it.current->their_last_message;
reader.read.offset = it.current->metadata.their_last_message;
if (message_ply_read_prev(&reader)==0){
cli_put_string(context, (const char *)reader.record, "\n");
output = 1;
@ -175,7 +175,7 @@ static int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_
case MESSAGE_SENT:
if (iter.delivered && !marked_delivered){
cli_put_long(context, id++, ":");
cli_put_long(context, iter.latest_ack_offset, ":");
cli_put_long(context, iter.metadata.their_last_ack_offset, ":");
cli_put_long(context, iter.timestamp ? (now - iter.timestamp):(long)-1, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
@ -193,7 +193,7 @@ static int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_
case MESSAGE_RECEIVED:
if (iter.read && !marked_read) {
cli_put_long(context, id++, ":");
cli_put_long(context, iter.read_offset, ":");
cli_put_long(context, iter.metadata.read_offset, ":");
cli_put_long(context, iter.timestamp ? (now - iter.timestamp):(long)-1, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");

View File

@ -42,35 +42,50 @@ static void finalise_union_meshms_sendmessage(httpd_request *r)
form_buf_malloc_release(&r->u.sendmsg.message);
}
#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(sizeof(rhizome_bid_t) + sizeof(uint64_t)))
#define alloca_meshms_token(bid, offset) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (bid), (offset))
#define MAX_TOKEN_LEN 21
#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(MAX_TOKEN_LEN))
#define alloca_meshms_token(pos) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (pos))
static char *meshms_token_to_str(char *buf, const rhizome_bid_t *bid, uint64_t offset)
static char *meshms_token_to_str(char *buf, struct newsince_position *pos)
{
struct iovec iov[2];
iov[0].iov_base = (void *) bid->binary;
iov[0].iov_len = sizeof bid->binary;
iov[1].iov_base = &offset;
iov[1].iov_len = sizeof offset;
size_t n = base64url_encodev(buf, iov, 2);
assert(n == MESHMS_TOKEN_STRLEN);
uint8_t tmp[MAX_TOKEN_LEN];
int ofs = 0;
tmp[ofs++]=pos->which_ply;
ofs += pack_uint(tmp + ofs, pos->offset);
ofs += pack_uint(tmp + ofs, pos->their_ack);
assert(ofs <= MAX_TOKEN_LEN);
size_t n = base64url_encode(buf, tmp, ofs);
assert(n <= MESHMS_TOKEN_STRLEN);
buf[n] = '\0';
return buf;
}
static int strn_to_meshms_token(const char *str, rhizome_bid_t *bidp, uint64_t *offsetp, const char **afterp)
static int strn_to_meshms_token(const char *str, struct newsince_position *pos, const char **afterp)
{
unsigned char token[sizeof bidp->binary + sizeof *offsetp];
if (base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL) == sizeof token){
memcpy(bidp->binary, token, sizeof bidp->binary);
memcpy(offsetp, token + sizeof bidp->binary, sizeof *offsetp);
(*afterp)++;
}else{
// don't skip the token
*afterp=str;
bzero(bidp, sizeof *bidp);
*offsetp=0;
}
uint8_t token[MAX_TOKEN_LEN];
size_t token_len = base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL);
if (token_len < 3)
goto blank;
size_t ofs=0;
pos->which_ply = token[ofs++];
int unpacked;
if ((unpacked = unpack_uint(token + ofs, token_len - ofs, &pos->offset))==-1)
goto blank;
ofs += unpacked;
if ((unpacked = unpack_uint(token + ofs, token_len - ofs, &pos->their_ack))==-1)
goto blank;
(*afterp)++;
return 1;
blank:
// don't skip the token
*afterp=str;
pos->which_ply = MY_PLY;
pos->offset = 0;
pos->their_ack = 0;
return 1;
}
@ -159,7 +174,7 @@ static int restful_meshms_(httpd_request *r, const char *remainder)
remainder = "";
}
else if ( str_startswith(remainder, "/newsince/", &end)
&& strn_to_meshms_token(end, &r->bid, &r->ui64, &end)
&& strn_to_meshms_token(end, &r->u.msglist.token, &end)
&& strcmp(end, "messagelist.json") == 0
) {
handler = restful_meshms_newsince_messagelist_json;
@ -215,7 +230,7 @@ static int restful_meshms_conversationlist_json(httpd_request *r, const char *re
r->u.mclist.rowcount = 0;
r->u.mclist.conv = NULL;
enum meshms_status status;
if (meshms_failed(status = meshms_conversations_list(NULL, &r->sid1, NULL, &r->u.mclist.conv)))
if (meshms_failed(status = meshms_conversations_list(NULL, &r->sid1, &r->u.mclist.conv)))
return http_request_meshms_response(r, 0, NULL, status);
if (r->u.mclist.conv != NULL)
meshms_conversation_iterator_start(&r->u.mclist.iter, r->u.mclist.conv);
@ -275,11 +290,11 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.mclist.iter.current->them.binary, sizeof r->u.mclist.iter.current->them.binary);
strbuf_putc(b, ',');
strbuf_json_boolean(b, r->u.mclist.iter.current->read_offset >= r->u.mclist.iter.current->their_last_message);
strbuf_json_boolean(b, r->u.mclist.iter.current->metadata.read_offset >= r->u.mclist.iter.current->metadata.their_last_message);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->their_last_message);
strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.their_last_message);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->read_offset);
strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.read_offset);
strbuf_puts(b, "]");
if (!strbuf_overrun(b)) {
meshms_conversation_iterator_advance(&r->u.mclist.iter);
@ -307,8 +322,6 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r)
if (r->u.msglist.dirty) {
meshms_message_iterator_close(&r->u.msglist.iter);
r->u.msglist.dirty = 0;
}
if (!meshms_message_iterator_is_open(&r->u.msglist.iter)) {
enum meshms_status status;
if ( meshms_failed(status = meshms_message_iterator_open(&r->u.msglist.iter, &r->sid1, &r->sid2))
|| meshms_failed(status = meshms_message_iterator_prev(&r->u.msglist.iter))
@ -318,6 +331,7 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r)
if (!r->u.msglist.finished) {
r->u.msglist.latest.which_ply = r->u.msglist.iter.which_ply;
r->u.msglist.latest.offset = r->u.msglist.iter.offset;
r->u.msglist.current.their_ack = r->u.msglist.latest.their_ack = r->u.msglist.iter.metadata.their_last_ack;
}
}
return MESHMS_STATUS_OK;
@ -334,6 +348,7 @@ static int restful_meshms_messagelist_json(httpd_request *r, const char *remaind
r->u.msglist.token.which_ply = NEITHER_PLY;
r->u.msglist.token.offset = 0;
r->u.msglist.end_time = 0;
r->u.msglist.dirty = 1;
enum meshms_status status;
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
return http_request_meshms_response(r, 0, NULL, status);
@ -354,17 +369,7 @@ static int restful_meshms_newsince_messagelist_json(httpd_request *r, const char
enum meshms_status status;
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
return http_request_meshms_response(r, 0, NULL, status);
if (rhizome_bid_t_is_zero(r->bid) || cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.my_ply_bid) == 0)
r->u.msglist.token.which_ply = MY_PLY;
else if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.their_ply_bid) == 0)
r->u.msglist.token.which_ply = THEIR_PLY;
else {
http_request_simple_response(&r->http, 404, "Unmatched token");
return 404;
}
r->u.msglist.token.offset = r->ui64;
r->u.msglist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000;
r->u.msglist.current = r->u.msglist.token;
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshms_messagelist_json_content);
return 1;
}
@ -391,7 +396,7 @@ static int restful_meshms_messagelist_json_content(struct http_request *hr, unsi
return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshms_messagelist_json_content_chunk);
}
static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos);
static void _messagelist_json_ack(struct httpd_request *r, strbuf b);
static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr, strbuf b)
{
@ -415,8 +420,8 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_puts(b, "{\n");
if (!r->u.msglist.end_time) {
strbuf_sprintf(b, "\"read_offset\":%"PRIu64",\n\"latest_ack_offset\":%"PRIu64",\n",
r->u.msglist.iter.read_offset,
r->u.msglist.iter.latest_ack_my_offset
r->u.msglist.iter.metadata.read_offset,
r->u.msglist.iter.metadata.their_last_ack
);
}
strbuf_puts(b, "\"header\":[");
@ -433,39 +438,57 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
case LIST_FIRST:
case LIST_ROWS:
{
r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply;
r->u.msglist.current.offset = r->u.msglist.iter.offset;
if ( r->u.msglist.finished
|| (r->u.msglist.token.which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token.offset)
) {
time_ms_t now;
if (r->u.msglist.end_time && (now = gettime_ms()) < r->u.msglist.end_time) {
int appended_row = _messagelist_json_ack(r, b, r->u.msglist.current);
if (strbuf_overrun(b))
return 1;
if (appended_row)
// If messages before the requested token have now been acked, add an extra ack record to the end
if (r->u.msglist.token.their_ack != r->u.msglist.latest.their_ack){
r->u.msglist.current.their_ack = r->u.msglist.latest.their_ack;
_messagelist_json_ack(r, b);
if (strbuf_overrun(b))
return 1;
++r->u.msglist.rowcount;
}
r->u.msglist.token = r->u.msglist.latest;
meshms_message_iterator_close(&r->u.msglist.iter);
r->u.msglist.dirty = 1;
http_request_pause_response(&r->http, r->u.msglist.end_time);
return 0;
}
r->u.msglist.phase = LIST_END;
} else {
r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply;
r->u.msglist.current.offset = r->u.msglist.iter.offset;
int rows=0;
switch (r->u.msglist.iter.type) {
case MESSAGE_SENT:
// if you haven't seen the current ack && this is the message that was acked.
// output the ack now
if (r->u.msglist.token.their_ack != r->u.msglist.latest.their_ack &&
r->u.msglist.iter.offset <= r->u.msglist.latest.their_ack){
_messagelist_json_ack(r, b);
if (!strbuf_overrun(b)){
++r->u.msglist.rowcount;
r->u.msglist.token.their_ack = r->u.msglist.latest.their_ack;
r->u.msglist.current.their_ack = 0;
}
return 1;
}
if (r->u.msglist.rowcount != 0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[");
strbuf_json_string(b, ">");
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
strbuf_putc(b, ',');
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->my_ply.bundle_id, r->u.msglist.iter.offset));
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current));
strbuf_putc(b, ',');
strbuf_json_string(b, r->u.msglist.iter.text);
strbuf_putc(b, ',');
@ -477,6 +500,7 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_putc(b, ',');
strbuf_json_null(b);
strbuf_puts(b, "]");
rows++;
break;
case MESSAGE_RECEIVED:
if (r->u.msglist.rowcount != 0)
@ -484,13 +508,13 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_puts(b, "\n[");
strbuf_json_string(b, "<");
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
strbuf_putc(b, ',');
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->their_ply.bundle_id, r->u.msglist.iter.offset));
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current));
strbuf_putc(b, ',');
strbuf_json_string(b, r->u.msglist.iter.text);
strbuf_putc(b, ',');
@ -502,13 +526,13 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_putc(b, ',');
strbuf_json_null(b);
strbuf_puts(b, "]");
rows++;
break;
case ACK_RECEIVED:
_messagelist_json_ack(r, b, r->u.msglist.current);
break;
}
if (!strbuf_overrun(b)) {
++r->u.msglist.rowcount;
r->u.msglist.rowcount+=rows;
enum meshms_status status;
if (meshms_failed(status = meshms_message_iterator_prev(&r->u.msglist.iter)))
return http_request_meshms_response(r, 0, NULL, status);
@ -531,26 +555,20 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
return 0;
}
static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos)
static void _messagelist_json_ack(struct httpd_request *r, strbuf b)
{
// Don't send old (irrelevant) ACKs.
if (r->u.msglist.iter.latest_ack_my_offset <= r->u.msglist.highest_ack_offset)
return 0;
if (r->u.msglist.rowcount != 0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[");
strbuf_json_string(b, "ACK");
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_offset);
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.metadata.their_last_ack_offset);
strbuf_putc(b, ',');
// Same token as the message row just sent.
strbuf_json_string(b, alloca_meshms_token(
pos.which_ply == MY_PLY ? r->u.msglist.iter.my_ply_bid : r->u.msglist.iter.their_ply_bid,
pos.offset));
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current));
strbuf_putc(b, ',');
strbuf_json_null(b);
strbuf_putc(b, ',');
@ -560,10 +578,8 @@ static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsi
strbuf_putc(b, ',');
strbuf_json_null(b); // no timestamp on ACKs
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_my_offset);
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.metadata.their_last_ack);
strbuf_puts(b, "]");
r->u.msglist.highest_ack_offset = r->u.msglist.iter.latest_ack_my_offset;
return 1;
}
static HTTP_REQUEST_PARSER restful_meshms_sendmessage_end;

View File

@ -136,6 +136,16 @@ int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid
return ret;
}
void message_ply_read_rewind(struct message_ply_read *ply)
{
ply->read.offset = ply->read.length;
}
int message_ply_is_open(struct message_ply_read *ply)
{
return ply->read.length>0;
}
void message_ply_read_close(struct message_ply_read *ply)
{
if (ply->record){
@ -152,7 +162,7 @@ void message_ply_read_close(struct message_ply_read *ply)
int message_ply_read_prev(struct message_ply_read *ply)
{
ply->record_end_offset = ply->read.offset;
unsigned char footer[2];
uint8_t footer[2];
if (ply->read.offset <= sizeof footer) {
DEBUG(meshms, "EOF");
return -1;
@ -223,6 +233,7 @@ void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, u
{
ob_checkpoint(b);
ob_append_packed_ui64(b, message_offset);
// append the number of bytes acked (should be smaller than an absolute offset)
if (previous_ack_offset)
ob_append_packed_ui64(b, message_offset - previous_ack_offset);
append_footer(b, MESSAGE_BLOCK_TYPE_ACK);
@ -233,4 +244,5 @@ void message_ply_append_message(struct overlay_buffer *b, const char *message, s
ob_checkpoint(b);
ob_append_strn(b, message, message_len);
append_footer(b, MESSAGE_BLOCK_TYPE_MESSAGE);
}
}

View File

@ -28,15 +28,17 @@ struct message_ply_read {
uint64_t record_end_offset;
uint16_t record_length;
size_t record_size;
char type;
uint8_t type;
// raw record data
unsigned char *record;
uint8_t *record;
};
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid);
void message_ply_read_close(struct message_ply_read *ply);
int message_ply_read_prev(struct message_ply_read *ply);
int message_ply_find_prev(struct message_ply_read *ply, char type);
int message_ply_is_open(struct message_ply_read *ply);
void message_ply_read_rewind(struct message_ply_read *ply);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset);
void message_ply_append_timestamp(struct overlay_buffer *b);

View File

@ -1263,6 +1263,8 @@ void rhizome_read_close(struct rhizome_read *read)
END);
}
read->length = 0;
read->offset = 0;
read->tail = 0;
}
struct cache_entry{
@ -1476,6 +1478,8 @@ static enum rhizome_payload_status read_derive_key(rhizome_manifest *m, struct r
enum rhizome_payload_status rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state)
{
if (m->filesize == 0 && !m->has_filehash)
return RHIZOME_PAYLOAD_STATUS_EMPTY;
enum rhizome_payload_status status = rhizome_open_read(read_state, &m->filehash);
if (status == RHIZOME_PAYLOAD_STATUS_STORED)
status = read_derive_key(m, read_state);

View File

@ -97,12 +97,14 @@ meshms_add_messages() {
MESSAGE[$n]="<"
meshms_send_message $sid2 $sid1 "${TEXT[$n]}"
let ++NRECV
let sent_since_ack=0
;;
'A')
MESSAGE[$n]=ACK
[ $i -ne 0 -a $sent_since_ack -eq 0 ] && error "two ACKs in a row (at position $i)"
meshms_send_messages $sid2 $sid1
meshms_list_messages $sid2 $sid1
let ++NACK
let sent_since_ack=0
;;
*)
error "invalid message symbol '$sym' (at position $i)"

View File

@ -22,7 +22,7 @@ source "${0%/*}/../testframework.sh"
source "${0%/*}/../testdefs.sh"
source "${0%/*}/../testdefs_rhizome.sh"
rexp_age="[0-9]\+"
rexp_age="[-0-9]\+"
teardown() {
stop_all_servald_servers
@ -34,66 +34,60 @@ teardown() {
setup_logging() {
executeOk_servald config \
set debug.meshms on \
set debug.rhizome on \
set debug.rhizome_manifest on \
set debug.rhizome_store on \
set log.console.level debug \
set log.console.show_time on
}
doc_MessageDelivery="Send messages, ack and read them in a 2 party conversation"
setup_MessageDelivery() {
setup_common() {
setup_servald
set_instance +A
create_identities 2
create_identities $1
setup_logging
}
test_MessageDelivery() {
# 1. empty list
doc_InitiallyEmpty="Return an empty list before sending anything"
setup_InitiallyEmpty() {
setup_common 2
}
test_InitiallyEmpty() {
executeOk_servald meshms list messages $SIDA1 $SIDA2
tfw_cat --stdout
assertStdoutIs --stdout --line=1 -e '5\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:age:type:message\n'
assertStdoutLineCount '==' 2
# 2. create a manifest with a single message and list it back
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
}
doc_SendAndList="Send outgoing messages and list them"
setup_SendAndList() {
setup_common 3
}
test_SendAndList() {
# create a single message and list it back
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 1"
tfw_cat --stderr
executeOk_servald meshms send message $SIDA1 $SIDA3 "Message 1"
tfw_cat --stderr
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
assertStdoutLineCount '==' 3
# 3. append a second message and list them both
executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you"
tfw_cat --stderr
assertStdoutGrep --stdout --matches=1 "^0:12:$rexp_age:>:Message 1\$"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 2"
tfw_cat --stderr
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 ":>:How are you\$"
assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
assertStdoutLineCount '==' 4
# 4. list the messages from the receivers point of view (which ACKs them)
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:<:Hi\$"
assertStdoutLineCount '==' 4
CONV_BID=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*bid=\([0-9A-F]*\).*/\1/p')
CONV_SECRET=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*secret=\([0-9A-F]*\).*/\1/p')
tfw_log "CONV_BID=$CONV_BID CONV_SECRET=$CONV_SECRET"
# 5. mark the first message as read
executeOk_servald meshms read messages $SIDA2 $SIDA1 5
check_meshms_bundles
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$"
tfw_cat --stderr
assertStdoutGrep --stdout --matches=1 "^0:30:$rexp_age:>:Message 2\$"
assertStdoutGrep --stdout --matches=1 "^1:12:$rexp_age:>:Message 1\$"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 3"
executeOk_servald meshms list messages $SIDA1 $SIDA2
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:48:$rexp_age:>:Message 3\$"
assertStdoutGrep --stdout --matches=1 "^1:30:$rexp_age:>:Message 2\$"
assertStdoutGrep --stdout --matches=1 "^2:12:$rexp_age:>:Message 1\$"
assertStdoutLineCount '==' 5
# 6. mark all messages as read
executeOk_servald meshms read messages $SIDA2
check_meshms_bundles
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$"
assertStdoutLineCount '==' 5
# 7. list messages from the senders point of view after they have been delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 "^0:3:$rexp_age:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:>:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:>:Hi\$"
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:48:$rexp_age:<:Message 3\$"
assertStdoutGrep --stdout --matches=1 "^1:30:$rexp_age:<:Message 2\$"
assertStdoutGrep --stdout --matches=1 "^2:12:$rexp_age:<:Message 1\$"
assertStdoutLineCount '==' 5
}
@ -171,6 +165,10 @@ setup_reorderList() {
set_instance +A
create_identities 5
setup_logging
# ensure conversations are known
executeOk_servald meshms send message $SIDA1 $SIDA2 "Start"
executeOk_servald meshms send message $SIDA1 $SIDA3 "Start"
executeOk_servald meshms send message $SIDA1 $SIDA4 "Start"
}
test_reorderList() {
# new incoming messages should bump to the top
@ -212,21 +210,20 @@ test_listConversations() {
assertStdoutIs --stderr --line=2 -e '_id:recipient:read:last_message:read_offset:message\n'
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0:\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0:Message2\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0:Message4\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0:Message4\$"
assertStdoutLineCount '==' 5
executeOk_servald meshms list conversations $SIDA1 1
tfw_cat --stderr
assertStdoutLineCount '==' 4
executeOk_servald meshms list conversations $SIDA1 1 1
tfw_cat --stderr
assertStdoutLineCount '==' 3
# mark all incoming messages as read
executeOk_servald meshms read messages $SIDA1
tfw_cat --stderr
# explicitly mark sida3 as known
executeOk_servald meshms read messages $SIDA1 $SIDA3
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
assertStdoutLineCount '==' 5
executeOk_servald meshms list messages $SIDA1 $SIDA2
executeOk_servald meshms list messages $SIDA1 $SIDA4

View File

@ -77,19 +77,19 @@ test_MeshmsListConversations() {
assertStdoutLineCount '==' 3
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA2, read=true, last_message=0, read_offset=0"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA3, read=false, last_message=14, read_offset=0"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=false, last_message=24, read_offset=0"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=false, last_message=18, read_offset=0"
executeOk_servald meshms read messages $SIDA1
executeJavaOk org.servalproject.test.Meshms meshms-list-conversations $SIDA1
assertStdoutLineCount '==' 3
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA2, read=true, last_message=0, read_offset=0"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA3, read=true, last_message=14, read_offset=14"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=true, last_message=24, read_offset=24"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=true, last_message=18, read_offset=18"
}
doc_MeshmsListMessages="Java API list MeshMS messages in one conversation"
setup_MeshmsListMessages() {
setup
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<A<>><>>A<<>'
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeOk_servald meshms list messages $SIDA1 $SIDA2
delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):[0-9]\+:>:.*/\1/p;q}' "$TFWSTDOUT")
@ -162,7 +162,7 @@ setup_MeshmsListMessagesNewSince() {
executeOk_servald config set api.restful.newsince_timeout 1s
}
setup
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<A<>><>>A<<>'
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeJavaOk org.servalproject.test.Meshms meshms-list-messages "$SIDA1" "$SIDA2"
tfw_cat --stdout --stderr
@ -288,7 +288,7 @@ setup_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$"
}
test_MeshmsReadAllConversations() {
executeJavaOk org.servalproject.test.Meshms meshms-mark-all-conversations-read $SIDA1
@ -296,7 +296,7 @@ test_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
}
doc_MeshmsReadAllMessages="Java API MeshMS mark all conversations read"

View File

@ -69,9 +69,6 @@ set_meshms_config() {
executeOk_servald config \
set debug.http_server on \
set debug.httpd on \
set debug.rhizome_manifest on \
set debug.rhizome_store on \
set debug.rhizome on \
set debug.meshms on \
set debug.verbose on \
set log.console.level debug
@ -164,12 +161,13 @@ test_MeshmsListConversations() {
{ my_sid: \"$SIDA1\",
their_sid: \"$SIDA4\",
read: false,
last_message: 20,
last_message: 14,
read_offset: 0
}
])"
# mark all incoming messages as read
executeOk_servald meshms read messages $SIDA1
executeOk_servald meshms read messages $SIDA1 $SIDA3
tfw_cat --stderr
executeOk curl \
--silent --fail --show-error \
@ -205,8 +203,8 @@ test_MeshmsListConversations() {
{ my_sid: \"$SIDA1\",
their_sid: \"$SIDA4\",
read: true,
last_message: 20,
read_offset: 20
last_message: 14,
read_offset: 14
}
])"
}
@ -215,7 +213,7 @@ doc_MeshmsListMessages="HTTP RESTful list MeshMS messages in one conversation as
setup_MeshmsListMessages() {
IDENTITY_COUNT=2
setup
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<A<>><>>A<<>'
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeOk_servald meshms list messages $SIDA1 $SIDA2
delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):[0-9]\+:>:.*/\1/p;q}' "$TFWSTDOUT")
@ -297,7 +295,7 @@ setup_MeshmsListMessagesNewSince() {
executeOk_servald config set api.restful.newsince_timeout 1s
}
setup
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<A<>><>>A<<>'
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeOk curl \
--silent --fail --show-error \
@ -327,11 +325,10 @@ test_MeshmsListMessagesNewSince() {
for ((i = 0; i < NROWS; i += 3)); do
transform_list_json messagelist$i.json messages$i.json
tfw_preserve messages$i.json
{ echo '{"a":'; jq '.[:'$i']' messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
# If an ACK is new since the token, then it will be in its correct place
# in messages$i.json, otherwise the ACK will appear in messages$i.json
# anyway.
assertJq tmp.json '.a == (if .a | contains([{type:"ACK"}]) then .b else .b | map(select(.type != "ACK")) end)'
jq '.[:'$i']' messages.json > messages_expected$i.json
tfw_preserve messages_expected$i.json
{ echo '{"a":'; cat messages_expected$i.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
assertJq tmp.json '.a == .b'
done
}
@ -385,7 +382,7 @@ test_MeshmsListMessagesNewSinceArrival() {
'A') waitfor="ACK";;
*) error "message=${message}";;
esac
wait_until --timeout=120 grepall "$waitfor" newsince{1,2,3}.json
wait_until --timeout=10 grepall "$waitfor" newsince{1,2,3}.json
done
fork_terminate_all
fork_wait_all
@ -596,7 +593,7 @@ setup_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$"
}
test_MeshmsReadAllConversations() {
executeOk curl \
@ -612,7 +609,7 @@ test_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
}
doc_MeshmsPostSpuriousContent="HTTP RESTful MeshMS rejects unwanted content in POST request"
@ -625,7 +622,7 @@ setup_MeshmsPostSpuriousContent() {
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message3"
executeOk_servald meshms send message $SIDA2 $SIDA1 "Message4"
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:35:0\$"
}
test_MeshmsPostSpuriousContent() {
executeOk curl \
@ -642,7 +639,7 @@ test_MeshmsPostSpuriousContent() {
assertJq http_body 'contains({"http_status_code": 400})'
assertJqGrep --ignore-case http_body '.http_status_message' 'content length'
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:35:0\$"
}
doc_MeshmsReadAllMessages="HTTP RESTful MeshMS mark all conversations read"
@ -656,6 +653,8 @@ setup_MeshmsReadAllMessages() {
executeOk_servald meshms send message $SIDA1 $SIDA4 "Message4"
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message5"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message6"
executeOk_servald meshms list messages $SIDA2 $SIDA1
tfw_cat --stdout
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:45:0\$"
}