diff --git a/httpd.h b/httpd.h
index c59678d1..6de95f5b 100644
--- a/httpd.h
+++ b/httpd.h
@@ -179,12 +179,12 @@ typedef struct httpd_request
struct newsince_position {
enum meshms_which_ply which_ply;
uint64_t offset;
+ uint64_t their_ack;
}
token,
current,
latest;
time_ms_t end_time;
- uint64_t highest_ack_offset;
enum list_phase phase;
size_t rowcount;
struct meshms_message_iterator iter;
diff --git a/meshms.c b/meshms.c
index e3502f29..1271d312 100644
--- a/meshms.c
+++ b/meshms.c
@@ -66,7 +66,7 @@ static enum meshms_status get_my_conversation_bundle(const keyring_identity *id,
if (m->haveSecret == NEW_BUNDLE_ID) {
rhizome_manifest_set_service(m, RHIZOME_SERVICE_FILE);
rhizome_manifest_set_name(m, "");
- rhizome_manifest_set_author_identity(m, id);
+ // setting the author would imply needing a BK, which we don't need since the private key is seeded above.
struct rhizome_bundle_result result = rhizome_fill_manifest(m, NULL);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_NEW:
@@ -123,26 +123,21 @@ static struct meshms_conversations *add_conv(struct meshms_conversations **conv,
// find matching conversations
// if their_sid == my_sid, return all conversations with any recipient
-static enum meshms_status get_database_conversations(const keyring_identity *id, const sid_t *their_sid, struct meshms_conversations **conv)
+static enum meshms_status get_database_conversations(const keyring_identity *id, struct meshms_conversations **conv)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
"SELECT id, version, filesize, tail, sender, recipient"
" FROM manifests"
- " WHERE service = ?3"
- " AND (sender=?1 or recipient=?1)"
- " AND (sender=?2 or recipient=?2)",
+ " WHERE service = ?2"
+ " AND (sender=?1 or recipient=?1)",
SID_T, id->box_pk,
- SID_T, their_sid ? their_sid : id->box_pk,
STATIC_TEXT, RHIZOME_SERVICE_MESHMS2,
END
);
if (!statement)
return MESHMS_STATUS_ERROR;
- DEBUGF(meshms, "Looking for conversations for %s, %s",
- alloca_tohex_sid_t(*id->box_pk),
- alloca_tohex_sid_t(*(their_sid ? their_sid : id->box_pk))
- );
+ DEBUGF(meshms, "Looking for conversations for %s", alloca_tohex_sid_t(*id->box_pk));
int r;
while ((r=sqlite_step_retry(&retry, statement)) == SQLITE_ROW) {
const char *id_hex = (const char *)sqlite3_column_text(statement, 0);
@@ -151,7 +146,7 @@ static enum meshms_status get_database_conversations(const keyring_identity *id,
int64_t tail = sqlite3_column_int64(statement, 3);
const char *sender = (const char *)sqlite3_column_text(statement, 4);
const char *recipient = (const char *)sqlite3_column_text(statement, 5);
- DEBUGF(meshms, "found id %s, sender %s, recipient %s", id_hex, sender, recipient);
+ DEBUGF(meshms, "found id %s, sender %s, recipient %s, size %"PRId64, id_hex, sender, recipient, size);
rhizome_bid_t bid;
if (str_to_rhizome_bid_t(&bid, id_hex) == -1) {
WHYF("invalid Bundle ID hex: %s -- skipping", alloca_str_toprint(id_hex));
@@ -191,94 +186,151 @@ static enum meshms_status get_database_conversations(const keyring_identity *id,
return MESHMS_STATUS_OK;
}
-static enum meshms_status find_or_create_conv(keyring_identity *id, const sid_t *their_sid, struct meshms_conversations **conv)
+static enum meshms_status open_ply(struct message_ply *ply, struct message_ply_read *reader)
{
- enum meshms_status status;
- if (meshms_failed(status = meshms_conversations_list(id, NULL, their_sid, conv)))
- return status;
- if (*conv == NULL) {
- if ((*conv = (struct meshms_conversations *) emalloc_zero(sizeof(struct meshms_conversations))) == NULL)
- return MESHMS_STATUS_ERROR;
- (*conv)->them = *their_sid;
- status = MESHMS_STATUS_UPDATED;
+ if (ply->found
+ && !message_ply_is_open(reader)
+ && message_ply_read_open(reader, &ply->bundle_id)!=0)
+ return MESHMS_STATUS_ERROR;
+ return MESHMS_STATUS_OK;
+}
+
+static enum meshms_status update_their_stats(struct meshms_metadata *metadata, struct message_ply *ply, struct message_ply_read *reader)
+{
+ DEBUGF(meshms, "Update their stats? (theirsize=%"PRIu64", plysize=%"PRIu64", lastmessage=%"PRIu64", lastackoffset=%"PRIu64", lastack=%"PRIu64")",
+ metadata->their_size,
+ ply->size,
+ metadata->their_last_message,
+ metadata->their_last_ack_offset,
+ metadata->their_last_ack
+ );
+ if (metadata->their_size != ply->size){
+ enum meshms_status status;
+ if (meshms_failed(status = open_ply(ply, reader)))
+ return status;
+
+ uint8_t found_their_msg=0;
+ uint8_t found_their_ack=0;
+
+ while((!found_their_msg || !found_their_ack) && message_ply_read_prev(reader) == 0){
+ // stop if we've seen these records before
+ if (reader->record_end_offset <= metadata->their_size)
+ break;
+
+ switch(reader->type){
+ case MESSAGE_BLOCK_TYPE_MESSAGE:
+ if (!found_their_msg){
+ found_their_msg = 1;
+ metadata->their_last_message = reader->record_end_offset;
+ DEBUGF(meshms, "Found their last message @%"PRIu64, metadata->their_last_message);
+ }
+ break;
+ case MESSAGE_BLOCK_TYPE_ACK:
+ if (!found_their_ack){
+ found_their_ack = 1;
+ uint64_t value=0;
+ metadata->their_last_ack_offset = reader->record_end_offset;
+ if (unpack_uint(reader->record, reader->record_length, &value) != -1){
+ metadata->their_last_ack = value;
+ }
+ DEBUGF(meshms, "Found their last ack @%"PRIu64" = %"PRIu64,
+ metadata->their_last_ack_offset, metadata->their_last_ack);
+ }
+ break;
+ }
+ }
+ metadata->their_size = ply->size;
+ message_ply_read_rewind(reader);
+ return MESHMS_STATUS_UPDATED;
}
+ return MESHMS_STATUS_OK;
+}
+
+static enum meshms_status update_my_stats(struct meshms_metadata *metadata, struct message_ply *ply, struct message_ply_read *reader)
+{
+ DEBUGF(meshms, "Update my stats? (mysize=%"PRIu64", plysize=%"PRIu64", lastack=%"PRIu64")",
+ metadata->my_size,
+ ply->size,
+ metadata->my_last_ack);
+ if (metadata->my_size != ply->size){
+ enum meshms_status status;
+ if (meshms_failed(status = open_ply(ply, reader)))
+ return status;
+
+ if (message_ply_find_prev(reader, MESSAGE_BLOCK_TYPE_ACK)==0){
+ uint64_t my_ack = 0;
+ if (unpack_uint(reader->record, reader->record_length, &my_ack) != -1){
+ metadata->my_last_ack = my_ack;
+ DEBUGF(meshms, "Found my last ack %"PRId64, my_ack);
+ }
+ }
+ metadata->my_size = ply->size;
+ message_ply_read_rewind(reader);
+ return MESHMS_STATUS_UPDATED;
+ }
+
+ return MESHMS_STATUS_OK;
+}
+
+static enum meshms_status update_stats(struct meshms_conversations *conv)
+{
+ enum meshms_status status = MESHMS_STATUS_OK;
+ struct message_ply_read reader;
+ bzero(&reader, sizeof reader);
+
+ enum meshms_status tmp_status = update_their_stats(&conv->metadata, &conv->their_ply, &reader);
+ message_ply_read_close(&reader);
+ if (meshms_failed(tmp_status))
+ return tmp_status;
+ if (tmp_status == MESHMS_STATUS_UPDATED)
+ status = tmp_status;
+
+ // Nothing else to be done if they have never sent us anything
+ if (!conv->metadata.their_last_message)
+ return status;
+
+ tmp_status = update_my_stats(&conv->metadata, &conv->my_ply, &reader);
+ message_ply_read_close(&reader);
+
+ if (meshms_failed(tmp_status))
+ return tmp_status;
+ if (tmp_status == MESHMS_STATUS_UPDATED)
+ status = tmp_status;
+
return status;
}
-// update if any conversations are unread or need to be acked.
+// create an ack if required.
// return MESHMS_STATUS_UPDATED if the conversation index needs to be saved.
static enum meshms_status update_conversation(const keyring_identity *id, struct meshms_conversations *conv)
{
DEBUG(meshms, "Checking if conversation needs to be acked");
-
- // Nothing to be done if they have never sent us anything
- if (!conv->their_ply.found)
- return MESHMS_STATUS_OK;
- uint64_t last_offset=0;
- {
- struct message_ply_read ply;
- bzero(&ply, sizeof ply);
- if (message_ply_read_open(&ply, &conv->their_ply.bundle_id)!=0)
- return MESHMS_STATUS_ERROR;
+ enum meshms_status status = update_stats(conv);
+ if (meshms_failed(status))
+ return status;
- DEBUG(meshms, "Locating their last message");
- if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_MESSAGE)==0){
- last_offset = ply.record_end_offset;
- DEBUGF(meshms, "Found last message @%"PRId64, last_offset);
- }
- message_ply_read_close(&ply);
- }
+ if (conv->metadata.my_last_ack >= conv->metadata.their_last_message)
+ return status;
- // Perhaps only an ack has been added
- if (last_offset == 0 || conv->their_last_message == last_offset)
- return MESHMS_STATUS_OK;
+ // append an ack for their message
+ DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, conv->metadata.my_last_ack, conv->metadata.their_last_message);
+ unsigned char buffer[30];
+ struct overlay_buffer *b = ob_static(buffer, sizeof buffer);
- // find our previous ack
- uint64_t previous_ack = 0;
-
- if (conv->my_ply.found){
- struct message_ply_read ply;
- bzero(&ply, sizeof ply);
- if (message_ply_read_open(&ply, &conv->my_ply.bundle_id)!=0)
- return MESHMS_STATUS_ERROR;
+ message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack);
+ message_ply_append_timestamp(b);
+ assert(!ob_overrun(b));
- DEBUG(meshms, "Locating our previous ack");
- if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_ACK)==0){
- if (unpack_uint(ply.record, ply.record_length, &previous_ack) == -1)
- previous_ack=0;
- else
- DEBUGF(meshms, "Previous ack is %"PRId64, previous_ack);
- }
- message_ply_read_close(&ply);
+ if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, &conv->them, &conv->my_ply, b)!=0){
+ status = MESHMS_STATUS_ERROR;
}else{
- DEBUGF(meshms, "No outgoing ply");
+ conv->metadata.my_last_ack = conv->metadata.their_last_message;
+ conv->metadata.my_size += ob_position(b);
+ status = MESHMS_STATUS_UPDATED;
}
- // Note that we may have already acked this message, but failed to record it in our conversation list bundle
- enum meshms_status status = MESHMS_STATUS_UPDATED;
-
- if (previous_ack < last_offset){
- // append an ack for their message
- DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, previous_ack, last_offset);
- unsigned char buffer[30];
- struct overlay_buffer *b = ob_static(buffer, sizeof buffer);
-
- message_ply_append_ack(b, last_offset, previous_ack);
- message_ply_append_timestamp(b);
- assert(!ob_overrun(b));
-
- if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, &conv->them, &conv->my_ply, b)!=0)
- status = MESHMS_STATUS_ERROR;
-
- ob_free(b);
- }
-
- if (!meshms_failed(status)){
- // if it's all good, remember the size of their ply at the time we examined it.
- conv->their_last_message = last_offset;
- conv->their_size = conv->their_ply.size;
- }
+ ob_free(b);
return status;
}
@@ -290,19 +342,17 @@ static enum meshms_status update_conversations(const keyring_identity *id, struc
struct meshms_conversations **ptr = conv;
while (*ptr) {
struct meshms_conversations *n = *ptr;
- if (n->their_size != n->their_ply.size) {
- enum meshms_status status;
- if (meshms_failed(status = update_conversation(id, n)))
- return status;
- if (status == MESHMS_STATUS_UPDATED){
- rstatus = MESHMS_STATUS_UPDATED;
- if (n != *conv){
- DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them));
- *ptr = n->_next;
- n->_next = *conv;
- *conv = n;
- continue;
- }
+ enum meshms_status status;
+ if (meshms_failed(status = update_conversation(id, n)))
+ return status;
+ if (status == MESHMS_STATUS_UPDATED){
+ rstatus = MESHMS_STATUS_UPDATED;
+ if (n != *conv){
+ DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them));
+ *ptr = n->_next;
+ n->_next = *conv;
+ *conv = n;
+ continue;
}
}
ptr = &(*ptr)->_next;
@@ -312,7 +362,7 @@ static enum meshms_status update_conversations(const keyring_identity *id, struc
// read our cached conversation list from our rhizome payload
// if we can't load the existing data correctly, just ignore it.
-static enum meshms_status read_known_conversations(rhizome_manifest *m, const sid_t *their_sid, struct meshms_conversations **conv)
+static enum meshms_status read_known_conversations(rhizome_manifest *m, struct meshms_conversations **conv)
{
if (m->haveSecret==NEW_BUNDLE_ID)
return MESHMS_STATUS_OK;
@@ -332,56 +382,87 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
if (pstatus != RHIZOME_PAYLOAD_STATUS_STORED && pstatus != RHIZOME_PAYLOAD_STATUS_EMPTY)
goto end;
- unsigned char version=0xFF;
+ uint8_t version=0xFF;
ssize_t r = rhizome_read_buffered(&read, &buff, &version, 1);
if (r == -1)
goto end;
- if (version != 1) {
- WARNF("Expected version 1 (got 0x%02x)", version);
+ if (version < 1 || version > 2) {
+ WARNF("Unknown file format version (got 0x%02x)", version);
goto end;
}
while (1) {
- sid_t sid;
- r = rhizome_read_buffered(&read, &buff, sid.binary, sizeof sid.binary);
- if (r == 0) {
+ uint8_t buffer[SID_SIZE + 12*3 + 1];
+
+ ssize_t bytes = rhizome_read_buffered(&read, &buff, buffer, sizeof buffer);
+ if (bytes == 0) {
status = MESHMS_STATUS_OK;
goto end;
}
- if (r != sizeof sid.binary)
+ if (bytes < SID_SIZE+1)
break;
- DEBUGF(meshms, "Reading existing conversation for %s", alloca_tohex_sid_t(sid));
-
- // unpack the stored details first so we know where the next record is
- unsigned char details[12*3];
- r = rhizome_read_buffered(&read, &buff, details, sizeof details);
- if (r == -1)
- break;
- int bytes = r;
-
- uint64_t last_message=0;
- uint64_t read_offset=0;
- uint64_t their_size=0;
-
- int ofs = 0;
- int unpacked = unpack_uint(details, bytes, &last_message);
- if (unpacked == -1)
- break;
- ofs += unpacked;
- unpacked = unpack_uint(details+ofs, bytes-ofs, &read_offset);
- if (unpacked == -1)
- break;
- ofs += unpacked;
- unpacked = unpack_uint(details+ofs, bytes-ofs, &their_size);
- if (unpacked == -1)
- break;
- ofs += unpacked;
+
+ const sid_t *sid = (sid_t *)&buffer[0];
+
+ int ofs = SID_SIZE;
+
+ uint8_t flags = 0; // TODO flags
+ struct meshms_metadata metadata;
+ bzero(&metadata, sizeof metadata);
+ int unpacked;
+
+ if (version==1){
+ // force re-reading ply details
+ uint64_t ignored=0;
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1)
+ break;
+ ofs += unpacked;
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1)
+ break;
+ ofs += unpacked;
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1)
+ break;
+ ofs += unpacked;
+ }else if(version>=2){
+ uint64_t delta=0;
+
+ flags = buffer[ofs++];
+
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.their_size)) == -1)
+ break;
+ ofs += unpacked;
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.my_size)) == -1)
+ break;
+ ofs += unpacked;
+
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
+ break;
+ ofs += unpacked;
+ metadata.their_last_message = metadata.their_size - delta;
+
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
+ break;
+ ofs += unpacked;
+ metadata.read_offset = metadata.their_size - delta;
+
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
+ break;
+ ofs += unpacked;
+ metadata.their_last_ack_offset = metadata.their_size - delta;
+
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
+ break;
+ ofs += unpacked;
+ metadata.my_last_ack = metadata.their_size - delta;
+
+ if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1)
+ break;
+ ofs += unpacked;
+ metadata.their_last_ack = metadata.my_size - delta;
+ }
+
read.offset += ofs - bytes;
- // skip uninteresting records
- if (their_sid && cmp_sid_t(&sid, their_sid) != 0)
- continue;
-
struct meshms_conversations *n = emalloc_zero(sizeof(struct meshms_conversations));
if (!n)
goto end;
@@ -389,10 +470,18 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si
*ptr = n;
ptr = &n->_next;
- n->them = sid;
- n->their_last_message = last_message;
- n->read_offset = read_offset;
- n->their_size = their_size;
+ n->them = *sid;
+ n->metadata = metadata;
+
+ DEBUGF(meshms, "Unpacked existing conversation for %s (their_size=%"PRIu64", my_size=%"PRIu64", last_message=%"PRIu64", read_offset=%"PRIu64", my_ack=%"PRIu64", their_ack=%"PRIu64")",
+ alloca_tohex_sid_t(*sid),
+ metadata.their_size,
+ metadata.my_size,
+ metadata.their_last_message,
+ metadata.read_offset,
+ metadata.my_last_ack,
+ metadata.their_last_ack
+ );
}
end:
rhizome_read_close(&read);
@@ -402,27 +491,41 @@ end:
static ssize_t write_conversation(struct rhizome_write *write, struct meshms_conversations *conv)
{
size_t len=0;
- unsigned char buffer[sizeof(conv->them) + (8*3)];
- if (write)
- bcopy(conv->them.binary, buffer, sizeof(conv->them));
+ unsigned char buffer[sizeof(conv->them) + (12*3) + 1];
+
+ bcopy(conv->them.binary, buffer, sizeof(conv->them));
len+=sizeof(conv->them);
+ buffer[len++] = 0; // TODO reserved for flags
+
+ assert(conv->metadata.their_size >= conv->metadata.their_last_message);
+ assert(conv->metadata.their_size >= conv->metadata.read_offset);
+ assert(conv->metadata.their_size >= conv->metadata.my_last_ack);
+ assert(conv->metadata.their_size >= conv->metadata.their_last_ack_offset);
+ assert(conv->metadata.my_size >= conv->metadata.their_last_ack);
+
+ // assume that most ack & read offsets are going to be near the ply length
+ // so store them as delta's.
+ len+=pack_uint(&buffer[len], conv->metadata.their_size);
+ len+=pack_uint(&buffer[len], conv->metadata.my_size);
+ len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_message);
+ len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.read_offset);
+ len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_ack_offset);
+ len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.my_last_ack);
+ len+=pack_uint(&buffer[len], conv->metadata.my_size - conv->metadata.their_last_ack);
+
+ assert(len <= sizeof buffer);
+
if (write){
- len+=pack_uint(&buffer[len], conv->their_last_message);
- len+=pack_uint(&buffer[len], conv->read_offset);
- len+=pack_uint(&buffer[len], conv->their_size);
int ret=rhizome_write_buffer(write, buffer, len);
if (ret == -1)
return ret;
- }else{
- len+=measure_packed_uint(conv->their_last_message);
- len+=measure_packed_uint(conv->read_offset);
- len+=measure_packed_uint(conv->their_size);
}
+
DEBUGF(meshms, "len %s, %"PRId64", %"PRId64", %"PRId64" = %zu",
alloca_tohex_sid_t(conv->them),
- conv->their_last_message,
- conv->read_offset,
- conv->their_size,
+ conv->metadata.their_last_message,
+ conv->metadata.read_offset,
+ conv->metadata.their_size,
len
);
return len;
@@ -449,33 +552,34 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct
bzero(&write, sizeof(write));
enum meshms_status status = MESHMS_STATUS_ERROR;
- // TODO rebalance tree...
+ // TODO rebalance tree...?
- // measure the final payload first
- ssize_t len=write_conversations(NULL, conv);
- if (len == -1)
- goto end;
-
- // then write it
rhizome_manifest_set_version(m, m->version + 1);
- rhizome_manifest_set_filesize(m, (size_t)len + 1);
+ rhizome_manifest_set_filesize(m, RHIZOME_SIZE_UNSET);
rhizome_manifest_set_filehash(m, NULL);
enum rhizome_payload_status pstatus = rhizome_write_open_manifest(&write, m);
if (pstatus!=RHIZOME_PAYLOAD_STATUS_NEW)
- // TODO log something?
goto end;
- unsigned char version=1;
+ uint8_t version=2;
if (rhizome_write_buffer(&write, &version, 1) == -1)
goto end;
if (write_conversations(&write, conv) == -1)
goto end;
+
+ if (write.file_offset == 1){
+ // Don't bother if we don't know anyone.
+ status = MESHMS_STATUS_OK;
+ goto end;
+ }
+
pstatus = rhizome_finish_write(&write);
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW)
goto end;
rhizome_manifest_set_filehash(m, &write.id);
-
+ rhizome_manifest_set_filesize(m, write.file_length);
+
struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 1);
switch (result.status) {
case RHIZOME_BUNDLE_STATUS_ERROR:
@@ -523,13 +627,35 @@ end:
return status;
}
+static enum meshms_status meshms_open_list(const keyring_identity *id, rhizome_manifest *m, struct meshms_conversations **conv)
+{
+ enum meshms_status status;
+
+ if (meshms_failed(status = get_my_conversation_bundle(id, m)))
+ goto end;
+ // read conversations payload
+ if (meshms_failed(status = read_known_conversations(m, conv)))
+ goto end;
+ status = get_database_conversations(id, conv);
+end:
+ return status;
+}
+
+static enum meshms_status meshms_save_list(const keyring_identity *id, rhizome_manifest *m, struct meshms_conversations **conv)
+{
+ enum meshms_status status;
+
+ if ((status = update_conversations(id, conv)) == MESHMS_STATUS_UPDATED)
+ status = write_known_conversations(m, *conv);
+
+ return status;
+}
+
// read information about existing conversations from a rhizome payload
-enum meshms_status meshms_conversations_list(const keyring_identity *id, const sid_t *my_sid, const sid_t *their_sid, struct meshms_conversations **conv)
+enum meshms_status meshms_conversations_list(const keyring_identity *id, const sid_t *my_sid, struct meshms_conversations **conv)
{
enum meshms_status status = MESHMS_STATUS_ERROR;
- rhizome_manifest *m = rhizome_new_manifest();
- if (!m)
- goto end;
+ rhizome_manifest *m=NULL;
assert(id || my_sid);
if (!my_sid){
@@ -542,15 +668,16 @@ enum meshms_status meshms_conversations_list(const keyring_identity *id, const s
}
}
- if (meshms_failed(status = get_my_conversation_bundle(id, m)))
+ m = rhizome_new_manifest();
+ if (!m)
goto end;
- // read conversations payload
- if (meshms_failed(status = read_known_conversations(m, their_sid, conv)))
+
+ if (meshms_failed(status = meshms_open_list(id, m, conv)))
goto end;
- if (meshms_failed(status = get_database_conversations(id, their_sid, conv)))
+
+ if (meshms_failed(status = meshms_save_list(id, m, conv)))
goto end;
- if ((status = update_conversations(id, conv)) == MESHMS_STATUS_UPDATED && their_sid == NULL)
- status = write_known_conversations(m, *conv);
+
end:
rhizome_manifest_free(m);
DEBUGF(meshms, "status=%d", status);
@@ -578,12 +705,15 @@ void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *i
enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *iter, const sid_t *me, const sid_t *them)
{
+ bzero(iter, sizeof *iter);
DEBUGF(meshms, "iter=%p me=%s them=%s", iter,
me ? alloca_tohex_sid_t(*me) : "NULL",
them ? alloca_tohex_sid_t(*them) : "NULL"
);
+
enum meshms_status status = MESHMS_STATUS_ERROR;
- bzero(iter, sizeof *iter);
+ struct meshms_conversations *conv = NULL;
+ rhizome_manifest *m = NULL;
keyring_identity *id = keyring_find_identity_sid(keyring, me);
if (!id){
@@ -592,71 +722,80 @@ enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *
goto fail;
}
- if (meshms_failed(status = find_or_create_conv(id, them, &iter->_conv)))
+ if (!(m = rhizome_new_manifest()))
+ goto error;
+
+ if (meshms_failed(status = meshms_open_list(id, m, &conv)))
goto fail;
- assert(iter->_conv != NULL);
+
iter->identity = id;
- iter->_my_sid = *me;
- iter->my_sid = &iter->_my_sid;
- iter->their_sid = &iter->_conv->them;
- iter->my_ply_bid = &iter->_conv->my_ply.bundle_id;
- iter->their_ply_bid = &iter->_conv->their_ply.bundle_id;
- iter->read_offset = iter->_conv->read_offset;
iter->timestamp = 0;
- // If I have never sent a message (or acked any of theirs), there are no messages in the thread.
- if (iter->_conv->my_ply.found) {
- int r = message_ply_read_open(&iter->_my_reader, &iter->_conv->my_ply.bundle_id);
- if (r != 0)
- goto error;
- if (iter->_conv->their_ply.found) {
- r = message_ply_read_open(&iter->_their_reader, &iter->_conv->their_ply.bundle_id);
- if (r != 0)
- goto error;
- // Find their latest ACK so we know which of my messages have been delivered.
- if (message_ply_find_prev(&iter->_their_reader, MESSAGE_BLOCK_TYPE_ACK)==0){
- if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->latest_ack_my_offset) == -1)
- iter->latest_ack_my_offset = 0;
- else{
- iter->latest_ack_offset = iter->_their_reader.record_end_offset;
- DEBUGF(meshms, "Found their last ack @%"PRId64, iter->latest_ack_offset);
- }
- }
- // Re-seek to end of their ply.
- iter->_their_reader.read.offset = iter->_their_reader.read.length;
- }
- } else {
- DEBUGF(meshms, "Did not find sender's ply; no messages in thread");
- }
iter->_in_ack = 0;
+ iter->my_sid = *me;
+ iter->their_sid = *them;
+
+ struct meshms_conversations *c = conv;
+ while(c){
+ if (cmp_sid_t(them, &c->them)==0){
+ DEBUGF(meshms, "Found matching conversation, found_mine=%d, found_theirs=%d, read_offset=%"PRId64,
+ c->my_ply.found, c->their_ply.found, c->metadata.read_offset);
+
+ if (meshms_failed(status = update_conversation(id, c)))
+ goto fail;
+ if (status == MESHMS_STATUS_UPDATED)
+ // ignore failures, we can retry later anyway.
+ write_known_conversations(m, conv);
+
+ if (meshms_failed(status = open_ply(&c->my_ply, &iter->_my_reader)))
+ goto fail;
+ if (meshms_failed(status = open_ply(&c->their_ply, &iter->_their_reader)))
+ goto fail;
+
+ iter->metadata = c->metadata;
+ iter->my_ply = c->my_ply;
+ iter->their_ply = c->their_ply;
+
+ if (c->their_ply.found && c->metadata.their_last_message > c->metadata.my_last_ack){
+ iter->_in_ack = 1;
+ iter->_their_reader.read.offset = c->metadata.their_last_message;
+ iter->_end_range = c->metadata.my_last_ack;
+ }
+
+ break;
+ }
+ c = c->_next;
+ }
+
+ meshms_free_conversations(conv);
return MESHMS_STATUS_OK;
+
error:
status = MESHMS_STATUS_ERROR;
fail:
meshms_message_iterator_close(iter);
+ meshms_free_conversations(conv);
return status;
}
-int meshms_message_iterator_is_open(const struct meshms_message_iterator *iter)
-{
- return iter->_conv != NULL;
-}
-
void meshms_message_iterator_close(struct meshms_message_iterator *iter)
{
DEBUGF(meshms, "iter=%p", iter);
message_ply_read_close(&iter->_my_reader);
message_ply_read_close(&iter->_their_reader);
- meshms_free_conversations(iter->_conv);
- iter->_conv = NULL;
}
enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *iter)
{
- assert(iter->_conv != NULL);
- enum meshms_status status = MESHMS_STATUS_UPDATED;
- while (status == MESHMS_STATUS_UPDATED) {
- if (iter->_in_ack) {
- DEBUGF(meshms, "Reading other log from %"PRId64", to %"PRId64, iter->_their_reader.read.offset, iter->_end_range);
+ DEBUGF(meshms, "iter=%p, in_ack=%d, found_mine=%d, my_offset=%"PRIu64", their_offset=%"PRIu64,
+ iter, iter->_in_ack, iter->my_ply.found,
+ iter->_my_reader.read.offset, iter->_their_reader.read.offset);
+ while (1) {
+ if (iter->their_ply.found && iter->_in_ack) {
+ DEBUGF(meshms, "Reading other log from %"PRIu64", to %"PRIu64, iter->_their_reader.read.offset, iter->_end_range);
+ // just in case we don't have the full bundle in this rhizome store
+ if (iter->_their_reader.read.offset > iter->_their_reader.read.length)
+ iter->_their_reader.read.offset = iter->_their_reader.read.length;
+
// eof or other read errors, skip over messages (the tail is allowed to advance)
if (message_ply_read_prev(&iter->_their_reader)==0){
iter->which_ply = THEIR_PLY;
@@ -670,7 +809,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1)
iter->ack_offset = 0;
iter->read = 0;
- return status;
+ return MESHMS_STATUS_UPDATED;
case MESSAGE_BLOCK_TYPE_MESSAGE:
iter->type = MESSAGE_RECEIVED;
iter->offset = iter->_their_reader.record_end_offset;
@@ -679,8 +818,8 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
if ( iter->_their_reader.record_length != 0
&& iter->_their_reader.record[iter->_their_reader.record_length - 1] == '\0'
) {
- iter->read = iter->_their_reader.record_end_offset <= iter->_conv->read_offset;
- return status;
+ iter->read = iter->_their_reader.record_end_offset <= iter->metadata.read_offset;
+ return MESHMS_STATUS_UPDATED;
}
WARN("Malformed MeshMS2 ply journal, missing NUL terminator");
return MESHMS_STATUS_PROTOCOL_FAULT;
@@ -689,9 +828,12 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
}
}
iter->_in_ack = 0;
- status = MESHMS_STATUS_UPDATED;
- }else if (message_ply_read_prev(&iter->_my_reader) == 0) {
- DEBUGF(meshms, "Offset %"PRId64", type %d, read_offset %"PRId64, iter->_my_reader.read.offset, iter->_my_reader.type, iter->read_offset);
+ }else if(iter->my_ply.found){
+ if (message_ply_read_prev(&iter->_my_reader) != 0)
+ return MESHMS_STATUS_OK;
+
+ DEBUGF(meshms, "Offset %"PRId64", type %d, read_offset %"PRId64,
+ iter->_my_reader.read.offset, iter->_my_reader.type, iter->metadata.read_offset);
iter->which_ply = MY_PLY;
switch (iter->_my_reader.type) {
case MESSAGE_BLOCK_TYPE_TIME:
@@ -704,7 +846,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
break;
case MESSAGE_BLOCK_TYPE_ACK:
// Read the received messages up to the ack'ed offset
- if (iter->_conv->their_ply.found) {
+ if (iter->their_ply.found) {
int ofs = unpack_uint(iter->_my_reader.record, iter->_my_reader.record_length, (uint64_t*)&iter->_their_reader.read.offset);
if (ofs == -1) {
WHYF("Malformed ACK");
@@ -717,9 +859,6 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
else
iter->_end_range = iter->_their_reader.read.offset - end_range;
// TODO tail
- // just in case we don't have the full bundle anymore
- if (iter->_their_reader.read.offset > iter->_their_reader.read.length)
- iter->_their_reader.read.offset = iter->_their_reader.read.length;
iter->_in_ack = 1;
}
break;
@@ -728,14 +867,13 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->offset = iter->_my_reader.record_end_offset;
iter->text = (const char *)iter->_my_reader.record;
iter->text_length = iter->_my_reader.record_length;
- iter->delivered = iter->latest_ack_my_offset && iter->_my_reader.record_end_offset <= iter->latest_ack_my_offset;
- return status;
+ iter->delivered = iter->_my_reader.record_end_offset <= iter->metadata.their_last_ack;
+ return MESHMS_STATUS_UPDATED;
}
}else{
- status = MESHMS_STATUS_OK;
+ return MESHMS_STATUS_OK;
}
}
- return status;
}
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len)
@@ -747,29 +885,72 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
}
struct meshms_conversations *conv = NULL;
enum meshms_status status = MESHMS_STATUS_ERROR;
+ rhizome_manifest *m=NULL;
keyring_identity *id = keyring_find_identity_sid(keyring, sender);
if (!id)
return MESHMS_STATUS_SID_LOCKED;
- if (meshms_failed(status = find_or_create_conv(id, recipient, &conv)))
+ m = rhizome_new_manifest();
+ if (!m)
goto end;
- assert(conv != NULL);
+ if (meshms_failed(status = meshms_open_list(id, m, &conv)))
+ goto end;
+
+ struct meshms_conversations *c = conv;
+ while(c && cmp_sid_t(recipient, &c->them)!=0)
+ c = c->_next;
+
+ if (!c){
+ c = (struct meshms_conversations *) emalloc_zero(sizeof(struct meshms_conversations));
+ if (!c)
+ goto end;
+ c->them = *recipient;
+ c->_next = conv;
+ conv = c;
+ status = MESHMS_STATUS_UPDATED;
+ }
+
+ enum meshms_status tmp_status = update_stats(c);
+ if (meshms_failed(tmp_status))
+ return tmp_status;
+ if (tmp_status == MESHMS_STATUS_UPDATED)
+ status = tmp_status;
// construct a message payload
struct overlay_buffer *b = ob_new();
+
+ // if we didn't "know" them, or we just received a new message, we may need to add an ack now.
+ // lets do that in one hit
+ uint8_t ack = (c->metadata.my_last_ack < c->metadata.their_last_message) ? 1:0;
+ DEBUGF(meshms,"Our ack %"PRIu64", their message %"PRIu64, c->metadata.my_last_ack, c->metadata.their_last_message);
+ if (ack)
+ message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack);
+
message_ply_append_message(b, message, message_len);
message_ply_append_timestamp(b);
assert(!ob_overrun(b));
- if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, recipient, &conv->my_ply, b)==0)
+ if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, recipient, &c->my_ply, b)==0){
+ if (ack)
+ c->metadata.my_last_ack = c->metadata.their_last_message;
+ c->metadata.my_size += ob_position(b);
+
+ // save known conversations since our stats will always change.
+ write_known_conversations(m, conv);
+
status = MESHMS_STATUS_UPDATED;
+ }else{
+ status = MESHMS_STATUS_ERROR;
+ }
ob_free(b);
end:
+ if (m)
+ rhizome_manifest_free(m);
meshms_free_conversations(conv);
return status;
}
@@ -788,36 +969,27 @@ enum meshms_status meshms_mark_read(const sid_t *sender, const sid_t *recipient,
DEBUGF(meshms, "sender=%s recipient=%s offset=%"PRIu64,
alloca_tohex_sid_t(*sender),
- recipient ? alloca_tohex_sid_t(*recipient) : "NULL",
+ recipient ? alloca_tohex_sid_t(*recipient) : "(all)",
offset
);
m = rhizome_new_manifest();
if (!m)
goto end;
- if (meshms_failed(status = get_my_conversation_bundle(id, m)))
+
+ if (meshms_failed(status = meshms_open_list(id, m, &conv)))
goto end;
- // read all conversations, so we can write them again
- if (meshms_failed(status = read_known_conversations(m, NULL, &conv)))
- goto end;
- // read the full list of conversations from the database too
- if (meshms_failed(status = get_database_conversations(id, NULL, &conv)))
- goto end;
- // check if any incoming conversations need to be acked or have new messages and update the read offset
+
unsigned changed = 0;
+ // check if any incoming conversations need to be acked or have new messages
if (meshms_failed(status = update_conversations(id, &conv)))
goto end;
if (status == MESHMS_STATUS_UPDATED)
- changed = 1;
+ changed ++;
+ // update the read offset
changed += mark_read(conv, recipient, offset);
DEBUGF(meshms, "changed=%u", changed);
- if (changed) {
- if (meshms_failed(status = write_known_conversations(m, conv)))
- goto end;
- if (status != MESHMS_STATUS_UPDATED) {
- WHYF("expecting %d (MESHMS_STATUS_UPDATED), got %s", MESHMS_STATUS_UPDATED, status);
- status = MESHMS_STATUS_ERROR;
- }
- }
+ if (changed)
+ status = write_known_conversations(m, conv);
end:
if (m)
rhizome_manifest_free(m);
@@ -835,13 +1007,17 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_
// - never past their last message
// - never rewind, only advance
uint64_t new_offset = offset;
- if (new_offset > conv->their_last_message)
- new_offset = conv->their_last_message;
- if (new_offset > conv->read_offset) {
- DEBUGF(meshms, "Moving read marker for %s, from %"PRId64" to %"PRId64,
- alloca_tohex_sid_t(conv->them), conv->read_offset, new_offset
- );
- conv->read_offset = new_offset;
+ if (new_offset > conv->metadata.their_last_message)
+ new_offset = conv->metadata.their_last_message;
+
+ DEBUGF(meshms, "Read marker for %s, to %"PRIu64" (asked for %"PRIu64", was %"PRIu64")",
+ alloca_tohex_sid_t(conv->them),
+ new_offset,
+ offset,
+ conv->metadata.read_offset);
+
+ if (new_offset > conv->metadata.read_offset) {
+ conv->metadata.read_offset = new_offset;
ret++;
}
if (their_sid)
diff --git a/meshms.h b/meshms.h
index e347707c..d153ca6c 100644
--- a/meshms.h
+++ b/meshms.h
@@ -48,6 +48,24 @@ __MESHMS_INLINE int meshms_failed(enum meshms_status status) {
const char *meshms_status_message(enum meshms_status);
+struct meshms_metadata{
+ // what is the offset of their last message
+ uint64_t their_last_message;
+ // what is the offset of their last ack
+ uint64_t their_last_ack_offset;
+ // where in our ply, does their ack point
+ uint64_t their_last_ack;
+ // what is the last message we marked as read
+ uint64_t read_offset;
+ // our cached value for the last known size of their ply
+ uint64_t their_size;
+
+ // where in their ply, does our last ack point
+ uint64_t my_last_ack;
+ // our cached value for the last known size of our ply
+ uint64_t my_size;
+};
+
struct meshms_conversations {
struct meshms_conversations *_next;
@@ -56,19 +74,14 @@ struct meshms_conversations {
struct message_ply my_ply;
struct message_ply their_ply;
-
- // what is the offset of their last message
- uint64_t their_last_message;
- // what is the last message we marked as read
- uint64_t read_offset;
- // our cached value for the last known size of their ply
- uint64_t their_size;
+
+ struct meshms_metadata metadata;
};
/* Fetch the list of all MeshMS conversations into a binary tree whose nodes
* are all allocated by malloc(3).
*/
-enum meshms_status meshms_conversations_list(const struct keyring_identity *id, const sid_t *my_sid, const sid_t *their_sid, struct meshms_conversations **conv);
+enum meshms_status meshms_conversations_list(const struct keyring_identity *id, const sid_t *my_sid, struct meshms_conversations **conv);
void meshms_free_conversations(struct meshms_conversations *conv);
/* For iterating over a binary tree of all MeshMS conversations, as created by
@@ -108,13 +121,14 @@ void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *)
struct meshms_message_iterator {
// Public fields that remain fixed for the life of the iterator:
struct keyring_identity *identity;
- const sid_t *my_sid;
- const sid_t *their_sid;
- const rhizome_bid_t *my_ply_bid;
- const rhizome_bid_t *their_ply_bid;
- uint64_t latest_ack_offset; // offset in remote (their) ply of most recent ACK
- uint64_t latest_ack_my_offset; // offset in my ply of most recent message ACKed by them
- uint64_t read_offset; // offset in remote (their) ply of most recent message read by me
+ sid_t my_sid;
+ sid_t their_sid;
+
+ struct message_ply my_ply;
+ struct message_ply their_ply;
+
+ struct meshms_metadata metadata;
+
// The following public fields change per message:
enum meshms_which_ply { NEITHER_PLY, MY_PLY, THEIR_PLY } which_ply;
enum { MESSAGE_SENT, MESSAGE_RECEIVED, ACK_RECEIVED } type;
@@ -131,15 +145,12 @@ struct meshms_message_iterator {
uint64_t ack_offset; // for ACK_RECEIVED
};
// Private implementation -- could change, so don't use them.
- sid_t _my_sid;
- struct meshms_conversations *_conv;
struct message_ply_read _my_reader;
struct message_ply_read _their_reader;
uint64_t _end_range;
- bool_t _in_ack;
+ uint8_t _in_ack:1;
};
enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *, const sid_t *me, const sid_t *them);
-int meshms_message_iterator_is_open(const struct meshms_message_iterator *);
void meshms_message_iterator_close(struct meshms_message_iterator *);
enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *);
@@ -149,6 +160,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
* MESHMS_STATUS_UPDATED on success, any other value indicates a failure or
* error (which is already logged).
*/
+
enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len);
/* Update the read offset for one or more conversations. Returns
diff --git a/meshms_cli.c b/meshms_cli.c
index 653041a2..707a09c0 100644
--- a/meshms_cli.c
+++ b/meshms_cli.c
@@ -39,7 +39,7 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_
if (rhizome_opendb() == -1)
goto end;
- if (meshms_failed(status = meshms_conversations_list(NULL, &sid, NULL, &conv)))
+ if (meshms_failed(status = meshms_conversations_list(NULL, &sid, &conv)))
goto end;
const char *names[]={
@@ -57,16 +57,16 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_
if (rows >= offset) {
cli_put_long(context, rows, ":");
cli_put_hexvalue(context, it.current->them.binary, sizeof(it.current->them), ":");
- cli_put_string(context, it.current->read_offset < it.current->their_last_message ? "unread":"", ":");
- cli_put_long(context, it.current->their_last_message, ":");
- cli_put_long(context, it.current->read_offset, include_message?":":"\n");
+ cli_put_string(context, it.current->metadata.read_offset < it.current->metadata.their_last_message ? "unread":"", ":");
+ cli_put_long(context, it.current->metadata.their_last_message, ":");
+ cli_put_long(context, it.current->metadata.read_offset, include_message?":":"\n");
if (include_message){
int output = 0;
- if (it.current->their_last_message && it.current->their_ply.found){
+ if (it.current->metadata.their_last_message && it.current->their_ply.found){
struct message_ply_read reader;
bzero(&reader, sizeof reader);
if (message_ply_read_open(&reader, &it.current->their_ply.bundle_id) == 0){
- reader.read.offset = it.current->their_last_message;
+ reader.read.offset = it.current->metadata.their_last_message;
if (message_ply_read_prev(&reader)==0){
cli_put_string(context, (const char *)reader.record, "\n");
output = 1;
@@ -175,7 +175,7 @@ static int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_
case MESSAGE_SENT:
if (iter.delivered && !marked_delivered){
cli_put_long(context, id++, ":");
- cli_put_long(context, iter.latest_ack_offset, ":");
+ cli_put_long(context, iter.metadata.their_last_ack_offset, ":");
cli_put_long(context, iter.timestamp ? (now - iter.timestamp):(long)-1, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
@@ -193,7 +193,7 @@ static int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_
case MESSAGE_RECEIVED:
if (iter.read && !marked_read) {
cli_put_long(context, id++, ":");
- cli_put_long(context, iter.read_offset, ":");
+ cli_put_long(context, iter.metadata.read_offset, ":");
cli_put_long(context, iter.timestamp ? (now - iter.timestamp):(long)-1, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");
diff --git a/meshms_restful.c b/meshms_restful.c
index 097e941b..222b674c 100644
--- a/meshms_restful.c
+++ b/meshms_restful.c
@@ -42,35 +42,50 @@ static void finalise_union_meshms_sendmessage(httpd_request *r)
form_buf_malloc_release(&r->u.sendmsg.message);
}
-#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(sizeof(rhizome_bid_t) + sizeof(uint64_t)))
-#define alloca_meshms_token(bid, offset) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (bid), (offset))
+#define MAX_TOKEN_LEN 21
+#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(MAX_TOKEN_LEN))
+#define alloca_meshms_token(pos) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (pos))
-static char *meshms_token_to_str(char *buf, const rhizome_bid_t *bid, uint64_t offset)
+static char *meshms_token_to_str(char *buf, struct newsince_position *pos)
{
- struct iovec iov[2];
- iov[0].iov_base = (void *) bid->binary;
- iov[0].iov_len = sizeof bid->binary;
- iov[1].iov_base = &offset;
- iov[1].iov_len = sizeof offset;
- size_t n = base64url_encodev(buf, iov, 2);
- assert(n == MESHMS_TOKEN_STRLEN);
+ uint8_t tmp[MAX_TOKEN_LEN];
+ int ofs = 0;
+ tmp[ofs++]=pos->which_ply;
+ ofs += pack_uint(tmp + ofs, pos->offset);
+ ofs += pack_uint(tmp + ofs, pos->their_ack);
+ assert(ofs <= MAX_TOKEN_LEN);
+ size_t n = base64url_encode(buf, tmp, ofs);
+ assert(n <= MESHMS_TOKEN_STRLEN);
buf[n] = '\0';
return buf;
}
-static int strn_to_meshms_token(const char *str, rhizome_bid_t *bidp, uint64_t *offsetp, const char **afterp)
+static int strn_to_meshms_token(const char *str, struct newsince_position *pos, const char **afterp)
{
- unsigned char token[sizeof bidp->binary + sizeof *offsetp];
- if (base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL) == sizeof token){
- memcpy(bidp->binary, token, sizeof bidp->binary);
- memcpy(offsetp, token + sizeof bidp->binary, sizeof *offsetp);
- (*afterp)++;
- }else{
- // don't skip the token
- *afterp=str;
- bzero(bidp, sizeof *bidp);
- *offsetp=0;
- }
+ uint8_t token[MAX_TOKEN_LEN];
+ size_t token_len = base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL);
+ if (token_len < 3)
+ goto blank;
+
+ size_t ofs=0;
+ pos->which_ply = token[ofs++];
+ int unpacked;
+ if ((unpacked = unpack_uint(token + ofs, token_len - ofs, &pos->offset))==-1)
+ goto blank;
+
+ ofs += unpacked;
+ if ((unpacked = unpack_uint(token + ofs, token_len - ofs, &pos->their_ack))==-1)
+ goto blank;
+
+ (*afterp)++;
+ return 1;
+
+blank:
+ // don't skip the token
+ *afterp=str;
+ pos->which_ply = MY_PLY;
+ pos->offset = 0;
+ pos->their_ack = 0;
return 1;
}
@@ -159,7 +174,7 @@ static int restful_meshms_(httpd_request *r, const char *remainder)
remainder = "";
}
else if ( str_startswith(remainder, "/newsince/", &end)
- && strn_to_meshms_token(end, &r->bid, &r->ui64, &end)
+ && strn_to_meshms_token(end, &r->u.msglist.token, &end)
&& strcmp(end, "messagelist.json") == 0
) {
handler = restful_meshms_newsince_messagelist_json;
@@ -215,7 +230,7 @@ static int restful_meshms_conversationlist_json(httpd_request *r, const char *re
r->u.mclist.rowcount = 0;
r->u.mclist.conv = NULL;
enum meshms_status status;
- if (meshms_failed(status = meshms_conversations_list(NULL, &r->sid1, NULL, &r->u.mclist.conv)))
+ if (meshms_failed(status = meshms_conversations_list(NULL, &r->sid1, &r->u.mclist.conv)))
return http_request_meshms_response(r, 0, NULL, status);
if (r->u.mclist.conv != NULL)
meshms_conversation_iterator_start(&r->u.mclist.iter, r->u.mclist.conv);
@@ -275,11 +290,11 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.mclist.iter.current->them.binary, sizeof r->u.mclist.iter.current->them.binary);
strbuf_putc(b, ',');
- strbuf_json_boolean(b, r->u.mclist.iter.current->read_offset >= r->u.mclist.iter.current->their_last_message);
+ strbuf_json_boolean(b, r->u.mclist.iter.current->metadata.read_offset >= r->u.mclist.iter.current->metadata.their_last_message);
strbuf_putc(b, ',');
- strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->their_last_message);
+ strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.their_last_message);
strbuf_putc(b, ',');
- strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->read_offset);
+ strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.read_offset);
strbuf_puts(b, "]");
if (!strbuf_overrun(b)) {
meshms_conversation_iterator_advance(&r->u.mclist.iter);
@@ -307,8 +322,6 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r)
if (r->u.msglist.dirty) {
meshms_message_iterator_close(&r->u.msglist.iter);
r->u.msglist.dirty = 0;
- }
- if (!meshms_message_iterator_is_open(&r->u.msglist.iter)) {
enum meshms_status status;
if ( meshms_failed(status = meshms_message_iterator_open(&r->u.msglist.iter, &r->sid1, &r->sid2))
|| meshms_failed(status = meshms_message_iterator_prev(&r->u.msglist.iter))
@@ -318,6 +331,7 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r)
if (!r->u.msglist.finished) {
r->u.msglist.latest.which_ply = r->u.msglist.iter.which_ply;
r->u.msglist.latest.offset = r->u.msglist.iter.offset;
+ r->u.msglist.current.their_ack = r->u.msglist.latest.their_ack = r->u.msglist.iter.metadata.their_last_ack;
}
}
return MESHMS_STATUS_OK;
@@ -334,6 +348,7 @@ static int restful_meshms_messagelist_json(httpd_request *r, const char *remaind
r->u.msglist.token.which_ply = NEITHER_PLY;
r->u.msglist.token.offset = 0;
r->u.msglist.end_time = 0;
+ r->u.msglist.dirty = 1;
enum meshms_status status;
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
return http_request_meshms_response(r, 0, NULL, status);
@@ -354,17 +369,7 @@ static int restful_meshms_newsince_messagelist_json(httpd_request *r, const char
enum meshms_status status;
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
return http_request_meshms_response(r, 0, NULL, status);
- if (rhizome_bid_t_is_zero(r->bid) || cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.my_ply_bid) == 0)
- r->u.msglist.token.which_ply = MY_PLY;
- else if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.their_ply_bid) == 0)
- r->u.msglist.token.which_ply = THEIR_PLY;
- else {
- http_request_simple_response(&r->http, 404, "Unmatched token");
- return 404;
- }
- r->u.msglist.token.offset = r->ui64;
r->u.msglist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000;
- r->u.msglist.current = r->u.msglist.token;
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshms_messagelist_json_content);
return 1;
}
@@ -391,7 +396,7 @@ static int restful_meshms_messagelist_json_content(struct http_request *hr, unsi
return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshms_messagelist_json_content_chunk);
}
-static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos);
+static void _messagelist_json_ack(struct httpd_request *r, strbuf b);
static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr, strbuf b)
{
@@ -415,8 +420,8 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_puts(b, "{\n");
if (!r->u.msglist.end_time) {
strbuf_sprintf(b, "\"read_offset\":%"PRIu64",\n\"latest_ack_offset\":%"PRIu64",\n",
- r->u.msglist.iter.read_offset,
- r->u.msglist.iter.latest_ack_my_offset
+ r->u.msglist.iter.metadata.read_offset,
+ r->u.msglist.iter.metadata.their_last_ack
);
}
strbuf_puts(b, "\"header\":[");
@@ -433,39 +438,57 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
case LIST_FIRST:
case LIST_ROWS:
{
+ r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply;
+ r->u.msglist.current.offset = r->u.msglist.iter.offset;
if ( r->u.msglist.finished
|| (r->u.msglist.token.which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token.offset)
) {
time_ms_t now;
if (r->u.msglist.end_time && (now = gettime_ms()) < r->u.msglist.end_time) {
- int appended_row = _messagelist_json_ack(r, b, r->u.msglist.current);
- if (strbuf_overrun(b))
- return 1;
- if (appended_row)
+ // If messages before the requested token have now been acked, add an extra ack record to the end
+ if (r->u.msglist.token.their_ack != r->u.msglist.latest.their_ack){
+ r->u.msglist.current.their_ack = r->u.msglist.latest.their_ack;
+ _messagelist_json_ack(r, b);
+ if (strbuf_overrun(b))
+ return 1;
++r->u.msglist.rowcount;
+ }
r->u.msglist.token = r->u.msglist.latest;
meshms_message_iterator_close(&r->u.msglist.iter);
+ r->u.msglist.dirty = 1;
http_request_pause_response(&r->http, r->u.msglist.end_time);
return 0;
}
r->u.msglist.phase = LIST_END;
} else {
- r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply;
- r->u.msglist.current.offset = r->u.msglist.iter.offset;
+ int rows=0;
+
switch (r->u.msglist.iter.type) {
case MESSAGE_SENT:
+ // if you haven't seen the current ack && this is the message that was acked.
+ // output the ack now
+ if (r->u.msglist.token.their_ack != r->u.msglist.latest.their_ack &&
+ r->u.msglist.iter.offset <= r->u.msglist.latest.their_ack){
+ _messagelist_json_ack(r, b);
+ if (!strbuf_overrun(b)){
+ ++r->u.msglist.rowcount;
+ r->u.msglist.token.their_ack = r->u.msglist.latest.their_ack;
+ r->u.msglist.current.their_ack = 0;
+ }
+ return 1;
+ }
if (r->u.msglist.rowcount != 0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[");
strbuf_json_string(b, ">");
strbuf_putc(b, ',');
- strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
+ strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary);
strbuf_putc(b, ',');
- strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
+ strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
strbuf_putc(b, ',');
- strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->my_ply.bundle_id, r->u.msglist.iter.offset));
+ strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current));
strbuf_putc(b, ',');
strbuf_json_string(b, r->u.msglist.iter.text);
strbuf_putc(b, ',');
@@ -477,6 +500,7 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_putc(b, ',');
strbuf_json_null(b);
strbuf_puts(b, "]");
+ rows++;
break;
case MESSAGE_RECEIVED:
if (r->u.msglist.rowcount != 0)
@@ -484,13 +508,13 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_puts(b, "\n[");
strbuf_json_string(b, "<");
strbuf_putc(b, ',');
- strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
+ strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary);
strbuf_putc(b, ',');
- strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
+ strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
strbuf_putc(b, ',');
- strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->their_ply.bundle_id, r->u.msglist.iter.offset));
+ strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current));
strbuf_putc(b, ',');
strbuf_json_string(b, r->u.msglist.iter.text);
strbuf_putc(b, ',');
@@ -502,13 +526,13 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_putc(b, ',');
strbuf_json_null(b);
strbuf_puts(b, "]");
+ rows++;
break;
case ACK_RECEIVED:
- _messagelist_json_ack(r, b, r->u.msglist.current);
break;
}
if (!strbuf_overrun(b)) {
- ++r->u.msglist.rowcount;
+ r->u.msglist.rowcount+=rows;
enum meshms_status status;
if (meshms_failed(status = meshms_message_iterator_prev(&r->u.msglist.iter)))
return http_request_meshms_response(r, 0, NULL, status);
@@ -531,26 +555,20 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
return 0;
}
-static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos)
+static void _messagelist_json_ack(struct httpd_request *r, strbuf b)
{
- // Don't send old (irrelevant) ACKs.
- if (r->u.msglist.iter.latest_ack_my_offset <= r->u.msglist.highest_ack_offset)
- return 0;
if (r->u.msglist.rowcount != 0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[");
strbuf_json_string(b, "ACK");
strbuf_putc(b, ',');
- strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
+ strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary);
strbuf_putc(b, ',');
- strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
+ strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary);
strbuf_putc(b, ',');
- strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_offset);
+ strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.metadata.their_last_ack_offset);
strbuf_putc(b, ',');
- // Same token as the message row just sent.
- strbuf_json_string(b, alloca_meshms_token(
- pos.which_ply == MY_PLY ? r->u.msglist.iter.my_ply_bid : r->u.msglist.iter.their_ply_bid,
- pos.offset));
+ strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current));
strbuf_putc(b, ',');
strbuf_json_null(b);
strbuf_putc(b, ',');
@@ -560,10 +578,8 @@ static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsi
strbuf_putc(b, ',');
strbuf_json_null(b); // no timestamp on ACKs
strbuf_putc(b, ',');
- strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_my_offset);
+ strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.metadata.their_last_ack);
strbuf_puts(b, "]");
- r->u.msglist.highest_ack_offset = r->u.msglist.iter.latest_ack_my_offset;
- return 1;
}
static HTTP_REQUEST_PARSER restful_meshms_sendmessage_end;
diff --git a/message_ply.c b/message_ply.c
index e8dbf56b..f2bee3a5 100644
--- a/message_ply.c
+++ b/message_ply.c
@@ -136,6 +136,16 @@ int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid
return ret;
}
+void message_ply_read_rewind(struct message_ply_read *ply)
+{
+ ply->read.offset = ply->read.length;
+}
+
+int message_ply_is_open(struct message_ply_read *ply)
+{
+ return ply->read.length>0;
+}
+
void message_ply_read_close(struct message_ply_read *ply)
{
if (ply->record){
@@ -152,7 +162,7 @@ void message_ply_read_close(struct message_ply_read *ply)
int message_ply_read_prev(struct message_ply_read *ply)
{
ply->record_end_offset = ply->read.offset;
- unsigned char footer[2];
+ uint8_t footer[2];
if (ply->read.offset <= sizeof footer) {
DEBUG(meshms, "EOF");
return -1;
@@ -223,6 +233,7 @@ void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, u
{
ob_checkpoint(b);
ob_append_packed_ui64(b, message_offset);
+ // append the number of bytes acked (should be smaller than an absolute offset)
if (previous_ack_offset)
ob_append_packed_ui64(b, message_offset - previous_ack_offset);
append_footer(b, MESSAGE_BLOCK_TYPE_ACK);
@@ -233,4 +244,5 @@ void message_ply_append_message(struct overlay_buffer *b, const char *message, s
ob_checkpoint(b);
ob_append_strn(b, message, message_len);
append_footer(b, MESSAGE_BLOCK_TYPE_MESSAGE);
-}
\ No newline at end of file
+}
+
diff --git a/message_ply.h b/message_ply.h
index 199a4da4..9efccc96 100644
--- a/message_ply.h
+++ b/message_ply.h
@@ -28,15 +28,17 @@ struct message_ply_read {
uint64_t record_end_offset;
uint16_t record_length;
size_t record_size;
- char type;
+ uint8_t type;
// raw record data
- unsigned char *record;
+ uint8_t *record;
};
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid);
void message_ply_read_close(struct message_ply_read *ply);
int message_ply_read_prev(struct message_ply_read *ply);
int message_ply_find_prev(struct message_ply_read *ply, char type);
+int message_ply_is_open(struct message_ply_read *ply);
+void message_ply_read_rewind(struct message_ply_read *ply);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset);
void message_ply_append_timestamp(struct overlay_buffer *b);
diff --git a/rhizome_store.c b/rhizome_store.c
index d4682eb2..d77417aa 100644
--- a/rhizome_store.c
+++ b/rhizome_store.c
@@ -1263,6 +1263,8 @@ void rhizome_read_close(struct rhizome_read *read)
END);
}
read->length = 0;
+ read->offset = 0;
+ read->tail = 0;
}
struct cache_entry{
@@ -1476,6 +1478,8 @@ static enum rhizome_payload_status read_derive_key(rhizome_manifest *m, struct r
enum rhizome_payload_status rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state)
{
+ if (m->filesize == 0 && !m->has_filehash)
+ return RHIZOME_PAYLOAD_STATUS_EMPTY;
enum rhizome_payload_status status = rhizome_open_read(read_state, &m->filehash);
if (status == RHIZOME_PAYLOAD_STATUS_STORED)
status = read_derive_key(m, read_state);
diff --git a/testdefs_meshms.sh b/testdefs_meshms.sh
index ebdc07ed..a4b806b8 100644
--- a/testdefs_meshms.sh
+++ b/testdefs_meshms.sh
@@ -97,12 +97,14 @@ meshms_add_messages() {
MESSAGE[$n]="<"
meshms_send_message $sid2 $sid1 "${TEXT[$n]}"
let ++NRECV
+ let sent_since_ack=0
;;
'A')
MESSAGE[$n]=ACK
[ $i -ne 0 -a $sent_since_ack -eq 0 ] && error "two ACKs in a row (at position $i)"
- meshms_send_messages $sid2 $sid1
+ meshms_list_messages $sid2 $sid1
let ++NACK
+ let sent_since_ack=0
;;
*)
error "invalid message symbol '$sym' (at position $i)"
diff --git a/tests/meshms b/tests/meshms
index 5e9f7cfb..dd337af4 100755
--- a/tests/meshms
+++ b/tests/meshms
@@ -22,7 +22,7 @@ source "${0%/*}/../testframework.sh"
source "${0%/*}/../testdefs.sh"
source "${0%/*}/../testdefs_rhizome.sh"
-rexp_age="[0-9]\+"
+rexp_age="[-0-9]\+"
teardown() {
stop_all_servald_servers
@@ -34,66 +34,60 @@ teardown() {
setup_logging() {
executeOk_servald config \
set debug.meshms on \
- set debug.rhizome on \
- set debug.rhizome_manifest on \
- set debug.rhizome_store on \
set log.console.level debug \
set log.console.show_time on
}
-doc_MessageDelivery="Send messages, ack and read them in a 2 party conversation"
-setup_MessageDelivery() {
+setup_common() {
setup_servald
set_instance +A
- create_identities 2
+ create_identities $1
setup_logging
}
-test_MessageDelivery() {
- # 1. empty list
+
+doc_InitiallyEmpty="Return an empty list before sending anything"
+setup_InitiallyEmpty() {
+ setup_common 2
+}
+test_InitiallyEmpty() {
executeOk_servald meshms list messages $SIDA1 $SIDA2
+ tfw_cat --stdout
assertStdoutIs --stdout --line=1 -e '5\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:age:type:message\n'
assertStdoutLineCount '==' 2
- # 2. create a manifest with a single message and list it back
- executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
+}
+
+doc_SendAndList="Send outgoing messages and list them"
+setup_SendAndList() {
+ setup_common 3
+}
+test_SendAndList() {
+ # create a single message and list it back
+ executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 1"
+ tfw_cat --stderr
+ executeOk_servald meshms send message $SIDA1 $SIDA3 "Message 1"
+ tfw_cat --stderr
executeOk_servald meshms list messages $SIDA1 $SIDA2
- assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
- assertStdoutLineCount '==' 3
- # 3. append a second message and list them both
- executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you"
+ tfw_cat --stderr
+ assertStdoutGrep --stdout --matches=1 "^0:12:$rexp_age:>:Message 1\$"
+ executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 2"
+ tfw_cat --stderr
executeOk_servald meshms list messages $SIDA1 $SIDA2
- assertStdoutGrep --stdout --matches=1 ":>:How are you\$"
- assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
- assertStdoutLineCount '==' 4
- # 4. list the messages from the receivers point of view (which ACKs them)
- executeOk_servald meshms list messages $SIDA2 $SIDA1
- assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$"
- assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:<:Hi\$"
- assertStdoutLineCount '==' 4
- CONV_BID=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*bid=\([0-9A-F]*\).*/\1/p')
- CONV_SECRET=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*secret=\([0-9A-F]*\).*/\1/p')
- tfw_log "CONV_BID=$CONV_BID CONV_SECRET=$CONV_SECRET"
- # 5. mark the first message as read
- executeOk_servald meshms read messages $SIDA2 $SIDA1 5
- check_meshms_bundles
- executeOk_servald meshms list messages $SIDA2 $SIDA1
- assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$"
- assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:MARK:read\$"
- assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$"
+ tfw_cat --stderr
+ assertStdoutGrep --stdout --matches=1 "^0:30:$rexp_age:>:Message 2\$"
+ assertStdoutGrep --stdout --matches=1 "^1:12:$rexp_age:>:Message 1\$"
+ executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 3"
+ executeOk_servald meshms list messages $SIDA1 $SIDA2
+ tfw_cat --stdout
+ assertStdoutGrep --stdout --matches=1 "^0:48:$rexp_age:>:Message 3\$"
+ assertStdoutGrep --stdout --matches=1 "^1:30:$rexp_age:>:Message 2\$"
+ assertStdoutGrep --stdout --matches=1 "^2:12:$rexp_age:>:Message 1\$"
assertStdoutLineCount '==' 5
- # 6. mark all messages as read
- executeOk_servald meshms read messages $SIDA2
- check_meshms_bundles
executeOk_servald meshms list messages $SIDA2 $SIDA1
- assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:MARK:read\$"
- assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:<:How are you\$"
- assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$"
- assertStdoutLineCount '==' 5
- # 7. list messages from the senders point of view after they have been delivered
- executeOk_servald meshms list messages $SIDA1 $SIDA2
- assertStdoutGrep --stdout --matches=1 "^0:3:$rexp_age:ACK:delivered\$"
- assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:>:How are you\$"
- assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:>:Hi\$"
+ tfw_cat --stdout
+ assertStdoutGrep --stdout --matches=1 "^0:48:$rexp_age:<:Message 3\$"
+ assertStdoutGrep --stdout --matches=1 "^1:30:$rexp_age:<:Message 2\$"
+ assertStdoutGrep --stdout --matches=1 "^2:12:$rexp_age:<:Message 1\$"
assertStdoutLineCount '==' 5
}
@@ -171,6 +165,10 @@ setup_reorderList() {
set_instance +A
create_identities 5
setup_logging
+ # ensure conversations are known
+ executeOk_servald meshms send message $SIDA1 $SIDA2 "Start"
+ executeOk_servald meshms send message $SIDA1 $SIDA3 "Start"
+ executeOk_servald meshms send message $SIDA1 $SIDA4 "Start"
}
test_reorderList() {
# new incoming messages should bump to the top
@@ -212,21 +210,20 @@ test_listConversations() {
assertStdoutIs --stderr --line=2 -e '_id:recipient:read:last_message:read_offset:message\n'
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0:\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0:Message2\$"
- assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0:Message4\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0:Message4\$"
assertStdoutLineCount '==' 5
executeOk_servald meshms list conversations $SIDA1 1
- tfw_cat --stderr
assertStdoutLineCount '==' 4
executeOk_servald meshms list conversations $SIDA1 1 1
- tfw_cat --stderr
assertStdoutLineCount '==' 3
# mark all incoming messages as read
executeOk_servald meshms read messages $SIDA1
- tfw_cat --stderr
+ # explicitly mark sida3 as known
+ executeOk_servald meshms read messages $SIDA1 $SIDA3
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
- assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
assertStdoutLineCount '==' 5
executeOk_servald meshms list messages $SIDA1 $SIDA2
executeOk_servald meshms list messages $SIDA1 $SIDA4
diff --git a/tests/meshmsjava b/tests/meshmsjava
index 4005db70..2d4bd201 100755
--- a/tests/meshmsjava
+++ b/tests/meshmsjava
@@ -77,19 +77,19 @@ test_MeshmsListConversations() {
assertStdoutLineCount '==' 3
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA2, read=true, last_message=0, read_offset=0"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA3, read=false, last_message=14, read_offset=0"
- assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=false, last_message=24, read_offset=0"
+ assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=false, last_message=18, read_offset=0"
executeOk_servald meshms read messages $SIDA1
executeJavaOk org.servalproject.test.Meshms meshms-list-conversations $SIDA1
assertStdoutLineCount '==' 3
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA2, read=true, last_message=0, read_offset=0"
assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA3, read=true, last_message=14, read_offset=14"
- assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=true, last_message=24, read_offset=24"
+ assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=true, last_message=18, read_offset=18"
}
doc_MeshmsListMessages="Java API list MeshMS messages in one conversation"
setup_MeshmsListMessages() {
setup
- meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>'
+ meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeOk_servald meshms list messages $SIDA1 $SIDA2
delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):[0-9]\+:>:.*/\1/p;q}' "$TFWSTDOUT")
@@ -162,7 +162,7 @@ setup_MeshmsListMessagesNewSince() {
executeOk_servald config set api.restful.newsince_timeout 1s
}
setup
- meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>'
+ meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeJavaOk org.servalproject.test.Meshms meshms-list-messages "$SIDA1" "$SIDA2"
tfw_cat --stdout --stderr
@@ -288,7 +288,7 @@ setup_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
- assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$"
}
test_MeshmsReadAllConversations() {
executeJavaOk org.servalproject.test.Meshms meshms-mark-all-conversations-read $SIDA1
@@ -296,7 +296,7 @@ test_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
- assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
}
doc_MeshmsReadAllMessages="Java API MeshMS mark all conversations read"
diff --git a/tests/meshmsrestful b/tests/meshmsrestful
index c7a1d2f3..50ff0295 100755
--- a/tests/meshmsrestful
+++ b/tests/meshmsrestful
@@ -69,9 +69,6 @@ set_meshms_config() {
executeOk_servald config \
set debug.http_server on \
set debug.httpd on \
- set debug.rhizome_manifest on \
- set debug.rhizome_store on \
- set debug.rhizome on \
set debug.meshms on \
set debug.verbose on \
set log.console.level debug
@@ -164,12 +161,13 @@ test_MeshmsListConversations() {
{ my_sid: \"$SIDA1\",
their_sid: \"$SIDA4\",
read: false,
- last_message: 20,
+ last_message: 14,
read_offset: 0
}
])"
# mark all incoming messages as read
executeOk_servald meshms read messages $SIDA1
+ executeOk_servald meshms read messages $SIDA1 $SIDA3
tfw_cat --stderr
executeOk curl \
--silent --fail --show-error \
@@ -205,8 +203,8 @@ test_MeshmsListConversations() {
{ my_sid: \"$SIDA1\",
their_sid: \"$SIDA4\",
read: true,
- last_message: 20,
- read_offset: 20
+ last_message: 14,
+ read_offset: 14
}
])"
}
@@ -215,7 +213,7 @@ doc_MeshmsListMessages="HTTP RESTful list MeshMS messages in one conversation as
setup_MeshmsListMessages() {
IDENTITY_COUNT=2
setup
- meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>'
+ meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeOk_servald meshms list messages $SIDA1 $SIDA2
delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):[0-9]\+:>:.*/\1/p;q}' "$TFWSTDOUT")
@@ -297,7 +295,7 @@ setup_MeshmsListMessagesNewSince() {
executeOk_servald config set api.restful.newsince_timeout 1s
}
setup
- meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>'
+ meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeOk curl \
--silent --fail --show-error \
@@ -327,11 +325,10 @@ test_MeshmsListMessagesNewSince() {
for ((i = 0; i < NROWS; i += 3)); do
transform_list_json messagelist$i.json messages$i.json
tfw_preserve messages$i.json
- { echo '{"a":'; jq '.[:'$i']' messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
- # If an ACK is new since the token, then it will be in its correct place
- # in messages$i.json, otherwise the ACK will appear in messages$i.json
- # anyway.
- assertJq tmp.json '.a == (if .a | contains([{type:"ACK"}]) then .b else .b | map(select(.type != "ACK")) end)'
+ jq '.[:'$i']' messages.json > messages_expected$i.json
+ tfw_preserve messages_expected$i.json
+ { echo '{"a":'; cat messages_expected$i.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
+ assertJq tmp.json '.a == .b'
done
}
@@ -385,7 +382,7 @@ test_MeshmsListMessagesNewSinceArrival() {
'A') waitfor="ACK";;
*) error "message=${message}";;
esac
- wait_until --timeout=120 grepall "$waitfor" newsince{1,2,3}.json
+ wait_until --timeout=10 grepall "$waitfor" newsince{1,2,3}.json
done
fork_terminate_all
fork_wait_all
@@ -596,7 +593,7 @@ setup_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
- assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$"
}
test_MeshmsReadAllConversations() {
executeOk curl \
@@ -612,7 +609,7 @@ test_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
- assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
}
doc_MeshmsPostSpuriousContent="HTTP RESTful MeshMS rejects unwanted content in POST request"
@@ -625,7 +622,7 @@ setup_MeshmsPostSpuriousContent() {
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message3"
executeOk_servald meshms send message $SIDA2 $SIDA1 "Message4"
executeOk_servald meshms list conversations $SIDA1
- assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:35:0\$"
}
test_MeshmsPostSpuriousContent() {
executeOk curl \
@@ -642,7 +639,7 @@ test_MeshmsPostSpuriousContent() {
assertJq http_body 'contains({"http_status_code": 400})'
assertJqGrep --ignore-case http_body '.http_status_message' 'content length'
executeOk_servald meshms list conversations $SIDA1
- assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$"
+ assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:35:0\$"
}
doc_MeshmsReadAllMessages="HTTP RESTful MeshMS mark all conversations read"
@@ -656,6 +653,8 @@ setup_MeshmsReadAllMessages() {
executeOk_servald meshms send message $SIDA1 $SIDA4 "Message4"
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message5"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message6"
+ executeOk_servald meshms list messages $SIDA2 $SIDA1
+ tfw_cat --stdout
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:45:0\$"
}