From e13b9c3c94cb5cdd08963d63ff4a218008bf0964 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 17 Aug 2016 11:28:36 +0930 Subject: [PATCH] Reduce redundant reading of message ply's --- httpd.h | 2 +- meshms.c | 692 +++++++++++++++++++++++++++----------------- meshms.h | 50 ++-- meshms_cli.c | 16 +- meshms_restful.c | 156 +++++----- message_ply.c | 16 +- message_ply.h | 6 +- rhizome_store.c | 4 + testdefs_meshms.sh | 4 +- tests/meshms | 97 +++---- tests/meshmsjava | 12 +- tests/meshmsrestful | 35 ++- 12 files changed, 655 insertions(+), 435 deletions(-) diff --git a/httpd.h b/httpd.h index c59678d1..6de95f5b 100644 --- a/httpd.h +++ b/httpd.h @@ -179,12 +179,12 @@ typedef struct httpd_request struct newsince_position { enum meshms_which_ply which_ply; uint64_t offset; + uint64_t their_ack; } token, current, latest; time_ms_t end_time; - uint64_t highest_ack_offset; enum list_phase phase; size_t rowcount; struct meshms_message_iterator iter; diff --git a/meshms.c b/meshms.c index e3502f29..1271d312 100644 --- a/meshms.c +++ b/meshms.c @@ -66,7 +66,7 @@ static enum meshms_status get_my_conversation_bundle(const keyring_identity *id, if (m->haveSecret == NEW_BUNDLE_ID) { rhizome_manifest_set_service(m, RHIZOME_SERVICE_FILE); rhizome_manifest_set_name(m, ""); - rhizome_manifest_set_author_identity(m, id); + // setting the author would imply needing a BK, which we don't need since the private key is seeded above. struct rhizome_bundle_result result = rhizome_fill_manifest(m, NULL); switch (result.status) { case RHIZOME_BUNDLE_STATUS_NEW: @@ -123,26 +123,21 @@ static struct meshms_conversations *add_conv(struct meshms_conversations **conv, // find matching conversations // if their_sid == my_sid, return all conversations with any recipient -static enum meshms_status get_database_conversations(const keyring_identity *id, const sid_t *their_sid, struct meshms_conversations **conv) +static enum meshms_status get_database_conversations(const keyring_identity *id, struct meshms_conversations **conv) { sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; sqlite3_stmt *statement = sqlite_prepare_bind(&retry, "SELECT id, version, filesize, tail, sender, recipient" " FROM manifests" - " WHERE service = ?3" - " AND (sender=?1 or recipient=?1)" - " AND (sender=?2 or recipient=?2)", + " WHERE service = ?2" + " AND (sender=?1 or recipient=?1)", SID_T, id->box_pk, - SID_T, their_sid ? their_sid : id->box_pk, STATIC_TEXT, RHIZOME_SERVICE_MESHMS2, END ); if (!statement) return MESHMS_STATUS_ERROR; - DEBUGF(meshms, "Looking for conversations for %s, %s", - alloca_tohex_sid_t(*id->box_pk), - alloca_tohex_sid_t(*(their_sid ? their_sid : id->box_pk)) - ); + DEBUGF(meshms, "Looking for conversations for %s", alloca_tohex_sid_t(*id->box_pk)); int r; while ((r=sqlite_step_retry(&retry, statement)) == SQLITE_ROW) { const char *id_hex = (const char *)sqlite3_column_text(statement, 0); @@ -151,7 +146,7 @@ static enum meshms_status get_database_conversations(const keyring_identity *id, int64_t tail = sqlite3_column_int64(statement, 3); const char *sender = (const char *)sqlite3_column_text(statement, 4); const char *recipient = (const char *)sqlite3_column_text(statement, 5); - DEBUGF(meshms, "found id %s, sender %s, recipient %s", id_hex, sender, recipient); + DEBUGF(meshms, "found id %s, sender %s, recipient %s, size %"PRId64, id_hex, sender, recipient, size); rhizome_bid_t bid; if (str_to_rhizome_bid_t(&bid, id_hex) == -1) { WHYF("invalid Bundle ID hex: %s -- skipping", alloca_str_toprint(id_hex)); @@ -191,94 +186,151 @@ static enum meshms_status get_database_conversations(const keyring_identity *id, return MESHMS_STATUS_OK; } -static enum meshms_status find_or_create_conv(keyring_identity *id, const sid_t *their_sid, struct meshms_conversations **conv) +static enum meshms_status open_ply(struct message_ply *ply, struct message_ply_read *reader) { - enum meshms_status status; - if (meshms_failed(status = meshms_conversations_list(id, NULL, their_sid, conv))) - return status; - if (*conv == NULL) { - if ((*conv = (struct meshms_conversations *) emalloc_zero(sizeof(struct meshms_conversations))) == NULL) - return MESHMS_STATUS_ERROR; - (*conv)->them = *their_sid; - status = MESHMS_STATUS_UPDATED; + if (ply->found + && !message_ply_is_open(reader) + && message_ply_read_open(reader, &ply->bundle_id)!=0) + return MESHMS_STATUS_ERROR; + return MESHMS_STATUS_OK; +} + +static enum meshms_status update_their_stats(struct meshms_metadata *metadata, struct message_ply *ply, struct message_ply_read *reader) +{ + DEBUGF(meshms, "Update their stats? (theirsize=%"PRIu64", plysize=%"PRIu64", lastmessage=%"PRIu64", lastackoffset=%"PRIu64", lastack=%"PRIu64")", + metadata->their_size, + ply->size, + metadata->their_last_message, + metadata->their_last_ack_offset, + metadata->their_last_ack + ); + if (metadata->their_size != ply->size){ + enum meshms_status status; + if (meshms_failed(status = open_ply(ply, reader))) + return status; + + uint8_t found_their_msg=0; + uint8_t found_their_ack=0; + + while((!found_their_msg || !found_their_ack) && message_ply_read_prev(reader) == 0){ + // stop if we've seen these records before + if (reader->record_end_offset <= metadata->their_size) + break; + + switch(reader->type){ + case MESSAGE_BLOCK_TYPE_MESSAGE: + if (!found_their_msg){ + found_their_msg = 1; + metadata->their_last_message = reader->record_end_offset; + DEBUGF(meshms, "Found their last message @%"PRIu64, metadata->their_last_message); + } + break; + case MESSAGE_BLOCK_TYPE_ACK: + if (!found_their_ack){ + found_their_ack = 1; + uint64_t value=0; + metadata->their_last_ack_offset = reader->record_end_offset; + if (unpack_uint(reader->record, reader->record_length, &value) != -1){ + metadata->their_last_ack = value; + } + DEBUGF(meshms, "Found their last ack @%"PRIu64" = %"PRIu64, + metadata->their_last_ack_offset, metadata->their_last_ack); + } + break; + } + } + metadata->their_size = ply->size; + message_ply_read_rewind(reader); + return MESHMS_STATUS_UPDATED; } + return MESHMS_STATUS_OK; +} + +static enum meshms_status update_my_stats(struct meshms_metadata *metadata, struct message_ply *ply, struct message_ply_read *reader) +{ + DEBUGF(meshms, "Update my stats? (mysize=%"PRIu64", plysize=%"PRIu64", lastack=%"PRIu64")", + metadata->my_size, + ply->size, + metadata->my_last_ack); + if (metadata->my_size != ply->size){ + enum meshms_status status; + if (meshms_failed(status = open_ply(ply, reader))) + return status; + + if (message_ply_find_prev(reader, MESSAGE_BLOCK_TYPE_ACK)==0){ + uint64_t my_ack = 0; + if (unpack_uint(reader->record, reader->record_length, &my_ack) != -1){ + metadata->my_last_ack = my_ack; + DEBUGF(meshms, "Found my last ack %"PRId64, my_ack); + } + } + metadata->my_size = ply->size; + message_ply_read_rewind(reader); + return MESHMS_STATUS_UPDATED; + } + + return MESHMS_STATUS_OK; +} + +static enum meshms_status update_stats(struct meshms_conversations *conv) +{ + enum meshms_status status = MESHMS_STATUS_OK; + struct message_ply_read reader; + bzero(&reader, sizeof reader); + + enum meshms_status tmp_status = update_their_stats(&conv->metadata, &conv->their_ply, &reader); + message_ply_read_close(&reader); + if (meshms_failed(tmp_status)) + return tmp_status; + if (tmp_status == MESHMS_STATUS_UPDATED) + status = tmp_status; + + // Nothing else to be done if they have never sent us anything + if (!conv->metadata.their_last_message) + return status; + + tmp_status = update_my_stats(&conv->metadata, &conv->my_ply, &reader); + message_ply_read_close(&reader); + + if (meshms_failed(tmp_status)) + return tmp_status; + if (tmp_status == MESHMS_STATUS_UPDATED) + status = tmp_status; + return status; } -// update if any conversations are unread or need to be acked. +// create an ack if required. // return MESHMS_STATUS_UPDATED if the conversation index needs to be saved. static enum meshms_status update_conversation(const keyring_identity *id, struct meshms_conversations *conv) { DEBUG(meshms, "Checking if conversation needs to be acked"); - - // Nothing to be done if they have never sent us anything - if (!conv->their_ply.found) - return MESHMS_STATUS_OK; - uint64_t last_offset=0; - { - struct message_ply_read ply; - bzero(&ply, sizeof ply); - if (message_ply_read_open(&ply, &conv->their_ply.bundle_id)!=0) - return MESHMS_STATUS_ERROR; + enum meshms_status status = update_stats(conv); + if (meshms_failed(status)) + return status; - DEBUG(meshms, "Locating their last message"); - if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_MESSAGE)==0){ - last_offset = ply.record_end_offset; - DEBUGF(meshms, "Found last message @%"PRId64, last_offset); - } - message_ply_read_close(&ply); - } + if (conv->metadata.my_last_ack >= conv->metadata.their_last_message) + return status; - // Perhaps only an ack has been added - if (last_offset == 0 || conv->their_last_message == last_offset) - return MESHMS_STATUS_OK; + // append an ack for their message + DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, conv->metadata.my_last_ack, conv->metadata.their_last_message); + unsigned char buffer[30]; + struct overlay_buffer *b = ob_static(buffer, sizeof buffer); - // find our previous ack - uint64_t previous_ack = 0; - - if (conv->my_ply.found){ - struct message_ply_read ply; - bzero(&ply, sizeof ply); - if (message_ply_read_open(&ply, &conv->my_ply.bundle_id)!=0) - return MESHMS_STATUS_ERROR; + message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack); + message_ply_append_timestamp(b); + assert(!ob_overrun(b)); - DEBUG(meshms, "Locating our previous ack"); - if (message_ply_find_prev(&ply, MESSAGE_BLOCK_TYPE_ACK)==0){ - if (unpack_uint(ply.record, ply.record_length, &previous_ack) == -1) - previous_ack=0; - else - DEBUGF(meshms, "Previous ack is %"PRId64, previous_ack); - } - message_ply_read_close(&ply); + if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, &conv->them, &conv->my_ply, b)!=0){ + status = MESHMS_STATUS_ERROR; }else{ - DEBUGF(meshms, "No outgoing ply"); + conv->metadata.my_last_ack = conv->metadata.their_last_message; + conv->metadata.my_size += ob_position(b); + status = MESHMS_STATUS_UPDATED; } - // Note that we may have already acked this message, but failed to record it in our conversation list bundle - enum meshms_status status = MESHMS_STATUS_UPDATED; - - if (previous_ack < last_offset){ - // append an ack for their message - DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, previous_ack, last_offset); - unsigned char buffer[30]; - struct overlay_buffer *b = ob_static(buffer, sizeof buffer); - - message_ply_append_ack(b, last_offset, previous_ack); - message_ply_append_timestamp(b); - assert(!ob_overrun(b)); - - if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, &conv->them, &conv->my_ply, b)!=0) - status = MESHMS_STATUS_ERROR; - - ob_free(b); - } - - if (!meshms_failed(status)){ - // if it's all good, remember the size of their ply at the time we examined it. - conv->their_last_message = last_offset; - conv->their_size = conv->their_ply.size; - } + ob_free(b); return status; } @@ -290,19 +342,17 @@ static enum meshms_status update_conversations(const keyring_identity *id, struc struct meshms_conversations **ptr = conv; while (*ptr) { struct meshms_conversations *n = *ptr; - if (n->their_size != n->their_ply.size) { - enum meshms_status status; - if (meshms_failed(status = update_conversation(id, n))) - return status; - if (status == MESHMS_STATUS_UPDATED){ - rstatus = MESHMS_STATUS_UPDATED; - if (n != *conv){ - DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them)); - *ptr = n->_next; - n->_next = *conv; - *conv = n; - continue; - } + enum meshms_status status; + if (meshms_failed(status = update_conversation(id, n))) + return status; + if (status == MESHMS_STATUS_UPDATED){ + rstatus = MESHMS_STATUS_UPDATED; + if (n != *conv){ + DEBUGF(meshms, "Bumping conversation from %s", alloca_tohex_sid_t(n->them)); + *ptr = n->_next; + n->_next = *conv; + *conv = n; + continue; } } ptr = &(*ptr)->_next; @@ -312,7 +362,7 @@ static enum meshms_status update_conversations(const keyring_identity *id, struc // read our cached conversation list from our rhizome payload // if we can't load the existing data correctly, just ignore it. -static enum meshms_status read_known_conversations(rhizome_manifest *m, const sid_t *their_sid, struct meshms_conversations **conv) +static enum meshms_status read_known_conversations(rhizome_manifest *m, struct meshms_conversations **conv) { if (m->haveSecret==NEW_BUNDLE_ID) return MESHMS_STATUS_OK; @@ -332,56 +382,87 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si if (pstatus != RHIZOME_PAYLOAD_STATUS_STORED && pstatus != RHIZOME_PAYLOAD_STATUS_EMPTY) goto end; - unsigned char version=0xFF; + uint8_t version=0xFF; ssize_t r = rhizome_read_buffered(&read, &buff, &version, 1); if (r == -1) goto end; - if (version != 1) { - WARNF("Expected version 1 (got 0x%02x)", version); + if (version < 1 || version > 2) { + WARNF("Unknown file format version (got 0x%02x)", version); goto end; } while (1) { - sid_t sid; - r = rhizome_read_buffered(&read, &buff, sid.binary, sizeof sid.binary); - if (r == 0) { + uint8_t buffer[SID_SIZE + 12*3 + 1]; + + ssize_t bytes = rhizome_read_buffered(&read, &buff, buffer, sizeof buffer); + if (bytes == 0) { status = MESHMS_STATUS_OK; goto end; } - if (r != sizeof sid.binary) + if (bytes < SID_SIZE+1) break; - DEBUGF(meshms, "Reading existing conversation for %s", alloca_tohex_sid_t(sid)); - - // unpack the stored details first so we know where the next record is - unsigned char details[12*3]; - r = rhizome_read_buffered(&read, &buff, details, sizeof details); - if (r == -1) - break; - int bytes = r; - - uint64_t last_message=0; - uint64_t read_offset=0; - uint64_t their_size=0; - - int ofs = 0; - int unpacked = unpack_uint(details, bytes, &last_message); - if (unpacked == -1) - break; - ofs += unpacked; - unpacked = unpack_uint(details+ofs, bytes-ofs, &read_offset); - if (unpacked == -1) - break; - ofs += unpacked; - unpacked = unpack_uint(details+ofs, bytes-ofs, &their_size); - if (unpacked == -1) - break; - ofs += unpacked; + + const sid_t *sid = (sid_t *)&buffer[0]; + + int ofs = SID_SIZE; + + uint8_t flags = 0; // TODO flags + struct meshms_metadata metadata; + bzero(&metadata, sizeof metadata); + int unpacked; + + if (version==1){ + // force re-reading ply details + uint64_t ignored=0; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1) + break; + ofs += unpacked; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1) + break; + ofs += unpacked; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &ignored)) == -1) + break; + ofs += unpacked; + }else if(version>=2){ + uint64_t delta=0; + + flags = buffer[ofs++]; + + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.their_size)) == -1) + break; + ofs += unpacked; + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &metadata.my_size)) == -1) + break; + ofs += unpacked; + + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.their_last_message = metadata.their_size - delta; + + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.read_offset = metadata.their_size - delta; + + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.their_last_ack_offset = metadata.their_size - delta; + + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.my_last_ack = metadata.their_size - delta; + + if ((unpacked = unpack_uint(buffer+ofs, bytes-ofs, &delta)) == -1) + break; + ofs += unpacked; + metadata.their_last_ack = metadata.my_size - delta; + } + read.offset += ofs - bytes; - // skip uninteresting records - if (their_sid && cmp_sid_t(&sid, their_sid) != 0) - continue; - struct meshms_conversations *n = emalloc_zero(sizeof(struct meshms_conversations)); if (!n) goto end; @@ -389,10 +470,18 @@ static enum meshms_status read_known_conversations(rhizome_manifest *m, const si *ptr = n; ptr = &n->_next; - n->them = sid; - n->their_last_message = last_message; - n->read_offset = read_offset; - n->their_size = their_size; + n->them = *sid; + n->metadata = metadata; + + DEBUGF(meshms, "Unpacked existing conversation for %s (their_size=%"PRIu64", my_size=%"PRIu64", last_message=%"PRIu64", read_offset=%"PRIu64", my_ack=%"PRIu64", their_ack=%"PRIu64")", + alloca_tohex_sid_t(*sid), + metadata.their_size, + metadata.my_size, + metadata.their_last_message, + metadata.read_offset, + metadata.my_last_ack, + metadata.their_last_ack + ); } end: rhizome_read_close(&read); @@ -402,27 +491,41 @@ end: static ssize_t write_conversation(struct rhizome_write *write, struct meshms_conversations *conv) { size_t len=0; - unsigned char buffer[sizeof(conv->them) + (8*3)]; - if (write) - bcopy(conv->them.binary, buffer, sizeof(conv->them)); + unsigned char buffer[sizeof(conv->them) + (12*3) + 1]; + + bcopy(conv->them.binary, buffer, sizeof(conv->them)); len+=sizeof(conv->them); + buffer[len++] = 0; // TODO reserved for flags + + assert(conv->metadata.their_size >= conv->metadata.their_last_message); + assert(conv->metadata.their_size >= conv->metadata.read_offset); + assert(conv->metadata.their_size >= conv->metadata.my_last_ack); + assert(conv->metadata.their_size >= conv->metadata.their_last_ack_offset); + assert(conv->metadata.my_size >= conv->metadata.their_last_ack); + + // assume that most ack & read offsets are going to be near the ply length + // so store them as delta's. + len+=pack_uint(&buffer[len], conv->metadata.their_size); + len+=pack_uint(&buffer[len], conv->metadata.my_size); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_message); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.read_offset); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.their_last_ack_offset); + len+=pack_uint(&buffer[len], conv->metadata.their_size - conv->metadata.my_last_ack); + len+=pack_uint(&buffer[len], conv->metadata.my_size - conv->metadata.their_last_ack); + + assert(len <= sizeof buffer); + if (write){ - len+=pack_uint(&buffer[len], conv->their_last_message); - len+=pack_uint(&buffer[len], conv->read_offset); - len+=pack_uint(&buffer[len], conv->their_size); int ret=rhizome_write_buffer(write, buffer, len); if (ret == -1) return ret; - }else{ - len+=measure_packed_uint(conv->their_last_message); - len+=measure_packed_uint(conv->read_offset); - len+=measure_packed_uint(conv->their_size); } + DEBUGF(meshms, "len %s, %"PRId64", %"PRId64", %"PRId64" = %zu", alloca_tohex_sid_t(conv->them), - conv->their_last_message, - conv->read_offset, - conv->their_size, + conv->metadata.their_last_message, + conv->metadata.read_offset, + conv->metadata.their_size, len ); return len; @@ -449,33 +552,34 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct bzero(&write, sizeof(write)); enum meshms_status status = MESHMS_STATUS_ERROR; - // TODO rebalance tree... + // TODO rebalance tree...? - // measure the final payload first - ssize_t len=write_conversations(NULL, conv); - if (len == -1) - goto end; - - // then write it rhizome_manifest_set_version(m, m->version + 1); - rhizome_manifest_set_filesize(m, (size_t)len + 1); + rhizome_manifest_set_filesize(m, RHIZOME_SIZE_UNSET); rhizome_manifest_set_filehash(m, NULL); enum rhizome_payload_status pstatus = rhizome_write_open_manifest(&write, m); if (pstatus!=RHIZOME_PAYLOAD_STATUS_NEW) - // TODO log something? goto end; - unsigned char version=1; + uint8_t version=2; if (rhizome_write_buffer(&write, &version, 1) == -1) goto end; if (write_conversations(&write, conv) == -1) goto end; + + if (write.file_offset == 1){ + // Don't bother if we don't know anyone. + status = MESHMS_STATUS_OK; + goto end; + } + pstatus = rhizome_finish_write(&write); if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW) goto end; rhizome_manifest_set_filehash(m, &write.id); - + rhizome_manifest_set_filesize(m, write.file_length); + struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 1); switch (result.status) { case RHIZOME_BUNDLE_STATUS_ERROR: @@ -523,13 +627,35 @@ end: return status; } +static enum meshms_status meshms_open_list(const keyring_identity *id, rhizome_manifest *m, struct meshms_conversations **conv) +{ + enum meshms_status status; + + if (meshms_failed(status = get_my_conversation_bundle(id, m))) + goto end; + // read conversations payload + if (meshms_failed(status = read_known_conversations(m, conv))) + goto end; + status = get_database_conversations(id, conv); +end: + return status; +} + +static enum meshms_status meshms_save_list(const keyring_identity *id, rhizome_manifest *m, struct meshms_conversations **conv) +{ + enum meshms_status status; + + if ((status = update_conversations(id, conv)) == MESHMS_STATUS_UPDATED) + status = write_known_conversations(m, *conv); + + return status; +} + // read information about existing conversations from a rhizome payload -enum meshms_status meshms_conversations_list(const keyring_identity *id, const sid_t *my_sid, const sid_t *their_sid, struct meshms_conversations **conv) +enum meshms_status meshms_conversations_list(const keyring_identity *id, const sid_t *my_sid, struct meshms_conversations **conv) { enum meshms_status status = MESHMS_STATUS_ERROR; - rhizome_manifest *m = rhizome_new_manifest(); - if (!m) - goto end; + rhizome_manifest *m=NULL; assert(id || my_sid); if (!my_sid){ @@ -542,15 +668,16 @@ enum meshms_status meshms_conversations_list(const keyring_identity *id, const s } } - if (meshms_failed(status = get_my_conversation_bundle(id, m))) + m = rhizome_new_manifest(); + if (!m) goto end; - // read conversations payload - if (meshms_failed(status = read_known_conversations(m, their_sid, conv))) + + if (meshms_failed(status = meshms_open_list(id, m, conv))) goto end; - if (meshms_failed(status = get_database_conversations(id, their_sid, conv))) + + if (meshms_failed(status = meshms_save_list(id, m, conv))) goto end; - if ((status = update_conversations(id, conv)) == MESHMS_STATUS_UPDATED && their_sid == NULL) - status = write_known_conversations(m, *conv); + end: rhizome_manifest_free(m); DEBUGF(meshms, "status=%d", status); @@ -578,12 +705,15 @@ void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *i enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *iter, const sid_t *me, const sid_t *them) { + bzero(iter, sizeof *iter); DEBUGF(meshms, "iter=%p me=%s them=%s", iter, me ? alloca_tohex_sid_t(*me) : "NULL", them ? alloca_tohex_sid_t(*them) : "NULL" ); + enum meshms_status status = MESHMS_STATUS_ERROR; - bzero(iter, sizeof *iter); + struct meshms_conversations *conv = NULL; + rhizome_manifest *m = NULL; keyring_identity *id = keyring_find_identity_sid(keyring, me); if (!id){ @@ -592,71 +722,80 @@ enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator * goto fail; } - if (meshms_failed(status = find_or_create_conv(id, them, &iter->_conv))) + if (!(m = rhizome_new_manifest())) + goto error; + + if (meshms_failed(status = meshms_open_list(id, m, &conv))) goto fail; - assert(iter->_conv != NULL); + iter->identity = id; - iter->_my_sid = *me; - iter->my_sid = &iter->_my_sid; - iter->their_sid = &iter->_conv->them; - iter->my_ply_bid = &iter->_conv->my_ply.bundle_id; - iter->their_ply_bid = &iter->_conv->their_ply.bundle_id; - iter->read_offset = iter->_conv->read_offset; iter->timestamp = 0; - // If I have never sent a message (or acked any of theirs), there are no messages in the thread. - if (iter->_conv->my_ply.found) { - int r = message_ply_read_open(&iter->_my_reader, &iter->_conv->my_ply.bundle_id); - if (r != 0) - goto error; - if (iter->_conv->their_ply.found) { - r = message_ply_read_open(&iter->_their_reader, &iter->_conv->their_ply.bundle_id); - if (r != 0) - goto error; - // Find their latest ACK so we know which of my messages have been delivered. - if (message_ply_find_prev(&iter->_their_reader, MESSAGE_BLOCK_TYPE_ACK)==0){ - if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->latest_ack_my_offset) == -1) - iter->latest_ack_my_offset = 0; - else{ - iter->latest_ack_offset = iter->_their_reader.record_end_offset; - DEBUGF(meshms, "Found their last ack @%"PRId64, iter->latest_ack_offset); - } - } - // Re-seek to end of their ply. - iter->_their_reader.read.offset = iter->_their_reader.read.length; - } - } else { - DEBUGF(meshms, "Did not find sender's ply; no messages in thread"); - } iter->_in_ack = 0; + iter->my_sid = *me; + iter->their_sid = *them; + + struct meshms_conversations *c = conv; + while(c){ + if (cmp_sid_t(them, &c->them)==0){ + DEBUGF(meshms, "Found matching conversation, found_mine=%d, found_theirs=%d, read_offset=%"PRId64, + c->my_ply.found, c->their_ply.found, c->metadata.read_offset); + + if (meshms_failed(status = update_conversation(id, c))) + goto fail; + if (status == MESHMS_STATUS_UPDATED) + // ignore failures, we can retry later anyway. + write_known_conversations(m, conv); + + if (meshms_failed(status = open_ply(&c->my_ply, &iter->_my_reader))) + goto fail; + if (meshms_failed(status = open_ply(&c->their_ply, &iter->_their_reader))) + goto fail; + + iter->metadata = c->metadata; + iter->my_ply = c->my_ply; + iter->their_ply = c->their_ply; + + if (c->their_ply.found && c->metadata.their_last_message > c->metadata.my_last_ack){ + iter->_in_ack = 1; + iter->_their_reader.read.offset = c->metadata.their_last_message; + iter->_end_range = c->metadata.my_last_ack; + } + + break; + } + c = c->_next; + } + + meshms_free_conversations(conv); return MESHMS_STATUS_OK; + error: status = MESHMS_STATUS_ERROR; fail: meshms_message_iterator_close(iter); + meshms_free_conversations(conv); return status; } -int meshms_message_iterator_is_open(const struct meshms_message_iterator *iter) -{ - return iter->_conv != NULL; -} - void meshms_message_iterator_close(struct meshms_message_iterator *iter) { DEBUGF(meshms, "iter=%p", iter); message_ply_read_close(&iter->_my_reader); message_ply_read_close(&iter->_their_reader); - meshms_free_conversations(iter->_conv); - iter->_conv = NULL; } enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *iter) { - assert(iter->_conv != NULL); - enum meshms_status status = MESHMS_STATUS_UPDATED; - while (status == MESHMS_STATUS_UPDATED) { - if (iter->_in_ack) { - DEBUGF(meshms, "Reading other log from %"PRId64", to %"PRId64, iter->_their_reader.read.offset, iter->_end_range); + DEBUGF(meshms, "iter=%p, in_ack=%d, found_mine=%d, my_offset=%"PRIu64", their_offset=%"PRIu64, + iter, iter->_in_ack, iter->my_ply.found, + iter->_my_reader.read.offset, iter->_their_reader.read.offset); + while (1) { + if (iter->their_ply.found && iter->_in_ack) { + DEBUGF(meshms, "Reading other log from %"PRIu64", to %"PRIu64, iter->_their_reader.read.offset, iter->_end_range); + // just in case we don't have the full bundle in this rhizome store + if (iter->_their_reader.read.offset > iter->_their_reader.read.length) + iter->_their_reader.read.offset = iter->_their_reader.read.length; + // eof or other read errors, skip over messages (the tail is allowed to advance) if (message_ply_read_prev(&iter->_their_reader)==0){ iter->which_ply = THEIR_PLY; @@ -670,7 +809,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1) iter->ack_offset = 0; iter->read = 0; - return status; + return MESHMS_STATUS_UPDATED; case MESSAGE_BLOCK_TYPE_MESSAGE: iter->type = MESSAGE_RECEIVED; iter->offset = iter->_their_reader.record_end_offset; @@ -679,8 +818,8 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * if ( iter->_their_reader.record_length != 0 && iter->_their_reader.record[iter->_their_reader.record_length - 1] == '\0' ) { - iter->read = iter->_their_reader.record_end_offset <= iter->_conv->read_offset; - return status; + iter->read = iter->_their_reader.record_end_offset <= iter->metadata.read_offset; + return MESHMS_STATUS_UPDATED; } WARN("Malformed MeshMS2 ply journal, missing NUL terminator"); return MESHMS_STATUS_PROTOCOL_FAULT; @@ -689,9 +828,12 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * } } iter->_in_ack = 0; - status = MESHMS_STATUS_UPDATED; - }else if (message_ply_read_prev(&iter->_my_reader) == 0) { - DEBUGF(meshms, "Offset %"PRId64", type %d, read_offset %"PRId64, iter->_my_reader.read.offset, iter->_my_reader.type, iter->read_offset); + }else if(iter->my_ply.found){ + if (message_ply_read_prev(&iter->_my_reader) != 0) + return MESHMS_STATUS_OK; + + DEBUGF(meshms, "Offset %"PRId64", type %d, read_offset %"PRId64, + iter->_my_reader.read.offset, iter->_my_reader.type, iter->metadata.read_offset); iter->which_ply = MY_PLY; switch (iter->_my_reader.type) { case MESSAGE_BLOCK_TYPE_TIME: @@ -704,7 +846,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * break; case MESSAGE_BLOCK_TYPE_ACK: // Read the received messages up to the ack'ed offset - if (iter->_conv->their_ply.found) { + if (iter->their_ply.found) { int ofs = unpack_uint(iter->_my_reader.record, iter->_my_reader.record_length, (uint64_t*)&iter->_their_reader.read.offset); if (ofs == -1) { WHYF("Malformed ACK"); @@ -717,9 +859,6 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * else iter->_end_range = iter->_their_reader.read.offset - end_range; // TODO tail - // just in case we don't have the full bundle anymore - if (iter->_their_reader.read.offset > iter->_their_reader.read.length) - iter->_their_reader.read.offset = iter->_their_reader.read.length; iter->_in_ack = 1; } break; @@ -728,14 +867,13 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * iter->offset = iter->_my_reader.record_end_offset; iter->text = (const char *)iter->_my_reader.record; iter->text_length = iter->_my_reader.record_length; - iter->delivered = iter->latest_ack_my_offset && iter->_my_reader.record_end_offset <= iter->latest_ack_my_offset; - return status; + iter->delivered = iter->_my_reader.record_end_offset <= iter->metadata.their_last_ack; + return MESHMS_STATUS_UPDATED; } }else{ - status = MESHMS_STATUS_OK; + return MESHMS_STATUS_OK; } } - return status; } enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len) @@ -747,29 +885,72 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie } struct meshms_conversations *conv = NULL; enum meshms_status status = MESHMS_STATUS_ERROR; + rhizome_manifest *m=NULL; keyring_identity *id = keyring_find_identity_sid(keyring, sender); if (!id) return MESHMS_STATUS_SID_LOCKED; - if (meshms_failed(status = find_or_create_conv(id, recipient, &conv))) + m = rhizome_new_manifest(); + if (!m) goto end; - assert(conv != NULL); + if (meshms_failed(status = meshms_open_list(id, m, &conv))) + goto end; + + struct meshms_conversations *c = conv; + while(c && cmp_sid_t(recipient, &c->them)!=0) + c = c->_next; + + if (!c){ + c = (struct meshms_conversations *) emalloc_zero(sizeof(struct meshms_conversations)); + if (!c) + goto end; + c->them = *recipient; + c->_next = conv; + conv = c; + status = MESHMS_STATUS_UPDATED; + } + + enum meshms_status tmp_status = update_stats(c); + if (meshms_failed(tmp_status)) + return tmp_status; + if (tmp_status == MESHMS_STATUS_UPDATED) + status = tmp_status; // construct a message payload struct overlay_buffer *b = ob_new(); + + // if we didn't "know" them, or we just received a new message, we may need to add an ack now. + // lets do that in one hit + uint8_t ack = (c->metadata.my_last_ack < c->metadata.their_last_message) ? 1:0; + DEBUGF(meshms,"Our ack %"PRIu64", their message %"PRIu64, c->metadata.my_last_ack, c->metadata.their_last_message); + if (ack) + message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack); + message_ply_append_message(b, message, message_len); message_ply_append_timestamp(b); assert(!ob_overrun(b)); - if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, recipient, &conv->my_ply, b)==0) + if (message_ply_append(id, RHIZOME_SERVICE_MESHMS2, recipient, &c->my_ply, b)==0){ + if (ack) + c->metadata.my_last_ack = c->metadata.their_last_message; + c->metadata.my_size += ob_position(b); + + // save known conversations since our stats will always change. + write_known_conversations(m, conv); + status = MESHMS_STATUS_UPDATED; + }else{ + status = MESHMS_STATUS_ERROR; + } ob_free(b); end: + if (m) + rhizome_manifest_free(m); meshms_free_conversations(conv); return status; } @@ -788,36 +969,27 @@ enum meshms_status meshms_mark_read(const sid_t *sender, const sid_t *recipient, DEBUGF(meshms, "sender=%s recipient=%s offset=%"PRIu64, alloca_tohex_sid_t(*sender), - recipient ? alloca_tohex_sid_t(*recipient) : "NULL", + recipient ? alloca_tohex_sid_t(*recipient) : "(all)", offset ); m = rhizome_new_manifest(); if (!m) goto end; - if (meshms_failed(status = get_my_conversation_bundle(id, m))) + + if (meshms_failed(status = meshms_open_list(id, m, &conv))) goto end; - // read all conversations, so we can write them again - if (meshms_failed(status = read_known_conversations(m, NULL, &conv))) - goto end; - // read the full list of conversations from the database too - if (meshms_failed(status = get_database_conversations(id, NULL, &conv))) - goto end; - // check if any incoming conversations need to be acked or have new messages and update the read offset + unsigned changed = 0; + // check if any incoming conversations need to be acked or have new messages if (meshms_failed(status = update_conversations(id, &conv))) goto end; if (status == MESHMS_STATUS_UPDATED) - changed = 1; + changed ++; + // update the read offset changed += mark_read(conv, recipient, offset); DEBUGF(meshms, "changed=%u", changed); - if (changed) { - if (meshms_failed(status = write_known_conversations(m, conv))) - goto end; - if (status != MESHMS_STATUS_UPDATED) { - WHYF("expecting %d (MESHMS_STATUS_UPDATED), got %s", MESHMS_STATUS_UPDATED, status); - status = MESHMS_STATUS_ERROR; - } - } + if (changed) + status = write_known_conversations(m, conv); end: if (m) rhizome_manifest_free(m); @@ -835,13 +1007,17 @@ static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_ // - never past their last message // - never rewind, only advance uint64_t new_offset = offset; - if (new_offset > conv->their_last_message) - new_offset = conv->their_last_message; - if (new_offset > conv->read_offset) { - DEBUGF(meshms, "Moving read marker for %s, from %"PRId64" to %"PRId64, - alloca_tohex_sid_t(conv->them), conv->read_offset, new_offset - ); - conv->read_offset = new_offset; + if (new_offset > conv->metadata.their_last_message) + new_offset = conv->metadata.their_last_message; + + DEBUGF(meshms, "Read marker for %s, to %"PRIu64" (asked for %"PRIu64", was %"PRIu64")", + alloca_tohex_sid_t(conv->them), + new_offset, + offset, + conv->metadata.read_offset); + + if (new_offset > conv->metadata.read_offset) { + conv->metadata.read_offset = new_offset; ret++; } if (their_sid) diff --git a/meshms.h b/meshms.h index e347707c..d153ca6c 100644 --- a/meshms.h +++ b/meshms.h @@ -48,6 +48,24 @@ __MESHMS_INLINE int meshms_failed(enum meshms_status status) { const char *meshms_status_message(enum meshms_status); +struct meshms_metadata{ + // what is the offset of their last message + uint64_t their_last_message; + // what is the offset of their last ack + uint64_t their_last_ack_offset; + // where in our ply, does their ack point + uint64_t their_last_ack; + // what is the last message we marked as read + uint64_t read_offset; + // our cached value for the last known size of their ply + uint64_t their_size; + + // where in their ply, does our last ack point + uint64_t my_last_ack; + // our cached value for the last known size of our ply + uint64_t my_size; +}; + struct meshms_conversations { struct meshms_conversations *_next; @@ -56,19 +74,14 @@ struct meshms_conversations { struct message_ply my_ply; struct message_ply their_ply; - - // what is the offset of their last message - uint64_t their_last_message; - // what is the last message we marked as read - uint64_t read_offset; - // our cached value for the last known size of their ply - uint64_t their_size; + + struct meshms_metadata metadata; }; /* Fetch the list of all MeshMS conversations into a binary tree whose nodes * are all allocated by malloc(3). */ -enum meshms_status meshms_conversations_list(const struct keyring_identity *id, const sid_t *my_sid, const sid_t *their_sid, struct meshms_conversations **conv); +enum meshms_status meshms_conversations_list(const struct keyring_identity *id, const sid_t *my_sid, struct meshms_conversations **conv); void meshms_free_conversations(struct meshms_conversations *conv); /* For iterating over a binary tree of all MeshMS conversations, as created by @@ -108,13 +121,14 @@ void meshms_conversation_iterator_advance(struct meshms_conversation_iterator *) struct meshms_message_iterator { // Public fields that remain fixed for the life of the iterator: struct keyring_identity *identity; - const sid_t *my_sid; - const sid_t *their_sid; - const rhizome_bid_t *my_ply_bid; - const rhizome_bid_t *their_ply_bid; - uint64_t latest_ack_offset; // offset in remote (their) ply of most recent ACK - uint64_t latest_ack_my_offset; // offset in my ply of most recent message ACKed by them - uint64_t read_offset; // offset in remote (their) ply of most recent message read by me + sid_t my_sid; + sid_t their_sid; + + struct message_ply my_ply; + struct message_ply their_ply; + + struct meshms_metadata metadata; + // The following public fields change per message: enum meshms_which_ply { NEITHER_PLY, MY_PLY, THEIR_PLY } which_ply; enum { MESSAGE_SENT, MESSAGE_RECEIVED, ACK_RECEIVED } type; @@ -131,15 +145,12 @@ struct meshms_message_iterator { uint64_t ack_offset; // for ACK_RECEIVED }; // Private implementation -- could change, so don't use them. - sid_t _my_sid; - struct meshms_conversations *_conv; struct message_ply_read _my_reader; struct message_ply_read _their_reader; uint64_t _end_range; - bool_t _in_ack; + uint8_t _in_ack:1; }; enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *, const sid_t *me, const sid_t *them); -int meshms_message_iterator_is_open(const struct meshms_message_iterator *); void meshms_message_iterator_close(struct meshms_message_iterator *); enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *); @@ -149,6 +160,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * * MESHMS_STATUS_UPDATED on success, any other value indicates a failure or * error (which is already logged). */ + enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipient, const char *message, size_t message_len); /* Update the read offset for one or more conversations. Returns diff --git a/meshms_cli.c b/meshms_cli.c index 653041a2..707a09c0 100644 --- a/meshms_cli.c +++ b/meshms_cli.c @@ -39,7 +39,7 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_ if (rhizome_opendb() == -1) goto end; - if (meshms_failed(status = meshms_conversations_list(NULL, &sid, NULL, &conv))) + if (meshms_failed(status = meshms_conversations_list(NULL, &sid, &conv))) goto end; const char *names[]={ @@ -57,16 +57,16 @@ static int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_ if (rows >= offset) { cli_put_long(context, rows, ":"); cli_put_hexvalue(context, it.current->them.binary, sizeof(it.current->them), ":"); - cli_put_string(context, it.current->read_offset < it.current->their_last_message ? "unread":"", ":"); - cli_put_long(context, it.current->their_last_message, ":"); - cli_put_long(context, it.current->read_offset, include_message?":":"\n"); + cli_put_string(context, it.current->metadata.read_offset < it.current->metadata.their_last_message ? "unread":"", ":"); + cli_put_long(context, it.current->metadata.their_last_message, ":"); + cli_put_long(context, it.current->metadata.read_offset, include_message?":":"\n"); if (include_message){ int output = 0; - if (it.current->their_last_message && it.current->their_ply.found){ + if (it.current->metadata.their_last_message && it.current->their_ply.found){ struct message_ply_read reader; bzero(&reader, sizeof reader); if (message_ply_read_open(&reader, &it.current->their_ply.bundle_id) == 0){ - reader.read.offset = it.current->their_last_message; + reader.read.offset = it.current->metadata.their_last_message; if (message_ply_read_prev(&reader)==0){ cli_put_string(context, (const char *)reader.record, "\n"); output = 1; @@ -175,7 +175,7 @@ static int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_ case MESSAGE_SENT: if (iter.delivered && !marked_delivered){ cli_put_long(context, id++, ":"); - cli_put_long(context, iter.latest_ack_offset, ":"); + cli_put_long(context, iter.metadata.their_last_ack_offset, ":"); cli_put_long(context, iter.timestamp ? (now - iter.timestamp):(long)-1, ":"); cli_put_string(context, "ACK", ":"); cli_put_string(context, "delivered", "\n"); @@ -193,7 +193,7 @@ static int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_ case MESSAGE_RECEIVED: if (iter.read && !marked_read) { cli_put_long(context, id++, ":"); - cli_put_long(context, iter.read_offset, ":"); + cli_put_long(context, iter.metadata.read_offset, ":"); cli_put_long(context, iter.timestamp ? (now - iter.timestamp):(long)-1, ":"); cli_put_string(context, "MARK", ":"); cli_put_string(context, "read", "\n"); diff --git a/meshms_restful.c b/meshms_restful.c index 097e941b..222b674c 100644 --- a/meshms_restful.c +++ b/meshms_restful.c @@ -42,35 +42,50 @@ static void finalise_union_meshms_sendmessage(httpd_request *r) form_buf_malloc_release(&r->u.sendmsg.message); } -#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(sizeof(rhizome_bid_t) + sizeof(uint64_t))) -#define alloca_meshms_token(bid, offset) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (bid), (offset)) +#define MAX_TOKEN_LEN 21 +#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(MAX_TOKEN_LEN)) +#define alloca_meshms_token(pos) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (pos)) -static char *meshms_token_to_str(char *buf, const rhizome_bid_t *bid, uint64_t offset) +static char *meshms_token_to_str(char *buf, struct newsince_position *pos) { - struct iovec iov[2]; - iov[0].iov_base = (void *) bid->binary; - iov[0].iov_len = sizeof bid->binary; - iov[1].iov_base = &offset; - iov[1].iov_len = sizeof offset; - size_t n = base64url_encodev(buf, iov, 2); - assert(n == MESHMS_TOKEN_STRLEN); + uint8_t tmp[MAX_TOKEN_LEN]; + int ofs = 0; + tmp[ofs++]=pos->which_ply; + ofs += pack_uint(tmp + ofs, pos->offset); + ofs += pack_uint(tmp + ofs, pos->their_ack); + assert(ofs <= MAX_TOKEN_LEN); + size_t n = base64url_encode(buf, tmp, ofs); + assert(n <= MESHMS_TOKEN_STRLEN); buf[n] = '\0'; return buf; } -static int strn_to_meshms_token(const char *str, rhizome_bid_t *bidp, uint64_t *offsetp, const char **afterp) +static int strn_to_meshms_token(const char *str, struct newsince_position *pos, const char **afterp) { - unsigned char token[sizeof bidp->binary + sizeof *offsetp]; - if (base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL) == sizeof token){ - memcpy(bidp->binary, token, sizeof bidp->binary); - memcpy(offsetp, token + sizeof bidp->binary, sizeof *offsetp); - (*afterp)++; - }else{ - // don't skip the token - *afterp=str; - bzero(bidp, sizeof *bidp); - *offsetp=0; - } + uint8_t token[MAX_TOKEN_LEN]; + size_t token_len = base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL); + if (token_len < 3) + goto blank; + + size_t ofs=0; + pos->which_ply = token[ofs++]; + int unpacked; + if ((unpacked = unpack_uint(token + ofs, token_len - ofs, &pos->offset))==-1) + goto blank; + + ofs += unpacked; + if ((unpacked = unpack_uint(token + ofs, token_len - ofs, &pos->their_ack))==-1) + goto blank; + + (*afterp)++; + return 1; + +blank: + // don't skip the token + *afterp=str; + pos->which_ply = MY_PLY; + pos->offset = 0; + pos->their_ack = 0; return 1; } @@ -159,7 +174,7 @@ static int restful_meshms_(httpd_request *r, const char *remainder) remainder = ""; } else if ( str_startswith(remainder, "/newsince/", &end) - && strn_to_meshms_token(end, &r->bid, &r->ui64, &end) + && strn_to_meshms_token(end, &r->u.msglist.token, &end) && strcmp(end, "messagelist.json") == 0 ) { handler = restful_meshms_newsince_messagelist_json; @@ -215,7 +230,7 @@ static int restful_meshms_conversationlist_json(httpd_request *r, const char *re r->u.mclist.rowcount = 0; r->u.mclist.conv = NULL; enum meshms_status status; - if (meshms_failed(status = meshms_conversations_list(NULL, &r->sid1, NULL, &r->u.mclist.conv))) + if (meshms_failed(status = meshms_conversations_list(NULL, &r->sid1, &r->u.mclist.conv))) return http_request_meshms_response(r, 0, NULL, status); if (r->u.mclist.conv != NULL) meshms_conversation_iterator_start(&r->u.mclist.iter, r->u.mclist.conv); @@ -275,11 +290,11 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques strbuf_putc(b, ','); strbuf_json_hex(b, r->u.mclist.iter.current->them.binary, sizeof r->u.mclist.iter.current->them.binary); strbuf_putc(b, ','); - strbuf_json_boolean(b, r->u.mclist.iter.current->read_offset >= r->u.mclist.iter.current->their_last_message); + strbuf_json_boolean(b, r->u.mclist.iter.current->metadata.read_offset >= r->u.mclist.iter.current->metadata.their_last_message); strbuf_putc(b, ','); - strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->their_last_message); + strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.their_last_message); strbuf_putc(b, ','); - strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->read_offset); + strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.read_offset); strbuf_puts(b, "]"); if (!strbuf_overrun(b)) { meshms_conversation_iterator_advance(&r->u.mclist.iter); @@ -307,8 +322,6 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r) if (r->u.msglist.dirty) { meshms_message_iterator_close(&r->u.msglist.iter); r->u.msglist.dirty = 0; - } - if (!meshms_message_iterator_is_open(&r->u.msglist.iter)) { enum meshms_status status; if ( meshms_failed(status = meshms_message_iterator_open(&r->u.msglist.iter, &r->sid1, &r->sid2)) || meshms_failed(status = meshms_message_iterator_prev(&r->u.msglist.iter)) @@ -318,6 +331,7 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r) if (!r->u.msglist.finished) { r->u.msglist.latest.which_ply = r->u.msglist.iter.which_ply; r->u.msglist.latest.offset = r->u.msglist.iter.offset; + r->u.msglist.current.their_ack = r->u.msglist.latest.their_ack = r->u.msglist.iter.metadata.their_last_ack; } } return MESHMS_STATUS_OK; @@ -334,6 +348,7 @@ static int restful_meshms_messagelist_json(httpd_request *r, const char *remaind r->u.msglist.token.which_ply = NEITHER_PLY; r->u.msglist.token.offset = 0; r->u.msglist.end_time = 0; + r->u.msglist.dirty = 1; enum meshms_status status; if (meshms_failed(status = reopen_meshms_message_iterator(r))) return http_request_meshms_response(r, 0, NULL, status); @@ -354,17 +369,7 @@ static int restful_meshms_newsince_messagelist_json(httpd_request *r, const char enum meshms_status status; if (meshms_failed(status = reopen_meshms_message_iterator(r))) return http_request_meshms_response(r, 0, NULL, status); - if (rhizome_bid_t_is_zero(r->bid) || cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.my_ply_bid) == 0) - r->u.msglist.token.which_ply = MY_PLY; - else if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.their_ply_bid) == 0) - r->u.msglist.token.which_ply = THEIR_PLY; - else { - http_request_simple_response(&r->http, 404, "Unmatched token"); - return 404; - } - r->u.msglist.token.offset = r->ui64; r->u.msglist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000; - r->u.msglist.current = r->u.msglist.token; http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshms_messagelist_json_content); return 1; } @@ -391,7 +396,7 @@ static int restful_meshms_messagelist_json_content(struct http_request *hr, unsi return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshms_messagelist_json_content_chunk); } -static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos); +static void _messagelist_json_ack(struct httpd_request *r, strbuf b); static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr, strbuf b) { @@ -415,8 +420,8 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr strbuf_puts(b, "{\n"); if (!r->u.msglist.end_time) { strbuf_sprintf(b, "\"read_offset\":%"PRIu64",\n\"latest_ack_offset\":%"PRIu64",\n", - r->u.msglist.iter.read_offset, - r->u.msglist.iter.latest_ack_my_offset + r->u.msglist.iter.metadata.read_offset, + r->u.msglist.iter.metadata.their_last_ack ); } strbuf_puts(b, "\"header\":["); @@ -433,39 +438,57 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr case LIST_FIRST: case LIST_ROWS: { + r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply; + r->u.msglist.current.offset = r->u.msglist.iter.offset; if ( r->u.msglist.finished || (r->u.msglist.token.which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token.offset) ) { time_ms_t now; if (r->u.msglist.end_time && (now = gettime_ms()) < r->u.msglist.end_time) { - int appended_row = _messagelist_json_ack(r, b, r->u.msglist.current); - if (strbuf_overrun(b)) - return 1; - if (appended_row) + // If messages before the requested token have now been acked, add an extra ack record to the end + if (r->u.msglist.token.their_ack != r->u.msglist.latest.their_ack){ + r->u.msglist.current.their_ack = r->u.msglist.latest.their_ack; + _messagelist_json_ack(r, b); + if (strbuf_overrun(b)) + return 1; ++r->u.msglist.rowcount; + } r->u.msglist.token = r->u.msglist.latest; meshms_message_iterator_close(&r->u.msglist.iter); + r->u.msglist.dirty = 1; http_request_pause_response(&r->http, r->u.msglist.end_time); return 0; } r->u.msglist.phase = LIST_END; } else { - r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply; - r->u.msglist.current.offset = r->u.msglist.iter.offset; + int rows=0; + switch (r->u.msglist.iter.type) { case MESSAGE_SENT: + // if you haven't seen the current ack && this is the message that was acked. + // output the ack now + if (r->u.msglist.token.their_ack != r->u.msglist.latest.their_ack && + r->u.msglist.iter.offset <= r->u.msglist.latest.their_ack){ + _messagelist_json_ack(r, b); + if (!strbuf_overrun(b)){ + ++r->u.msglist.rowcount; + r->u.msglist.token.their_ack = r->u.msglist.latest.their_ack; + r->u.msglist.current.their_ack = 0; + } + return 1; + } if (r->u.msglist.rowcount != 0) strbuf_putc(b, ','); strbuf_puts(b, "\n["); strbuf_json_string(b, ">"); strbuf_putc(b, ','); - strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary); + strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary); strbuf_putc(b, ','); - strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary); + strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary); strbuf_putc(b, ','); strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset); strbuf_putc(b, ','); - strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->my_ply.bundle_id, r->u.msglist.iter.offset)); + strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current)); strbuf_putc(b, ','); strbuf_json_string(b, r->u.msglist.iter.text); strbuf_putc(b, ','); @@ -477,6 +500,7 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr strbuf_putc(b, ','); strbuf_json_null(b); strbuf_puts(b, "]"); + rows++; break; case MESSAGE_RECEIVED: if (r->u.msglist.rowcount != 0) @@ -484,13 +508,13 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr strbuf_puts(b, "\n["); strbuf_json_string(b, "<"); strbuf_putc(b, ','); - strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary); + strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary); strbuf_putc(b, ','); - strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary); + strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary); strbuf_putc(b, ','); strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset); strbuf_putc(b, ','); - strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->their_ply.bundle_id, r->u.msglist.iter.offset)); + strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current)); strbuf_putc(b, ','); strbuf_json_string(b, r->u.msglist.iter.text); strbuf_putc(b, ','); @@ -502,13 +526,13 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr strbuf_putc(b, ','); strbuf_json_null(b); strbuf_puts(b, "]"); + rows++; break; case ACK_RECEIVED: - _messagelist_json_ack(r, b, r->u.msglist.current); break; } if (!strbuf_overrun(b)) { - ++r->u.msglist.rowcount; + r->u.msglist.rowcount+=rows; enum meshms_status status; if (meshms_failed(status = meshms_message_iterator_prev(&r->u.msglist.iter))) return http_request_meshms_response(r, 0, NULL, status); @@ -531,26 +555,20 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr return 0; } -static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos) +static void _messagelist_json_ack(struct httpd_request *r, strbuf b) { - // Don't send old (irrelevant) ACKs. - if (r->u.msglist.iter.latest_ack_my_offset <= r->u.msglist.highest_ack_offset) - return 0; if (r->u.msglist.rowcount != 0) strbuf_putc(b, ','); strbuf_puts(b, "\n["); strbuf_json_string(b, "ACK"); strbuf_putc(b, ','); - strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary); + strbuf_json_hex(b, r->u.msglist.iter.my_sid.binary, sizeof r->u.msglist.iter.my_sid.binary); strbuf_putc(b, ','); - strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary); + strbuf_json_hex(b, r->u.msglist.iter.their_sid.binary, sizeof r->u.msglist.iter.their_sid.binary); strbuf_putc(b, ','); - strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_offset); + strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.metadata.their_last_ack_offset); strbuf_putc(b, ','); - // Same token as the message row just sent. - strbuf_json_string(b, alloca_meshms_token( - pos.which_ply == MY_PLY ? r->u.msglist.iter.my_ply_bid : r->u.msglist.iter.their_ply_bid, - pos.offset)); + strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.current)); strbuf_putc(b, ','); strbuf_json_null(b); strbuf_putc(b, ','); @@ -560,10 +578,8 @@ static int _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsi strbuf_putc(b, ','); strbuf_json_null(b); // no timestamp on ACKs strbuf_putc(b, ','); - strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_my_offset); + strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.metadata.their_last_ack); strbuf_puts(b, "]"); - r->u.msglist.highest_ack_offset = r->u.msglist.iter.latest_ack_my_offset; - return 1; } static HTTP_REQUEST_PARSER restful_meshms_sendmessage_end; diff --git a/message_ply.c b/message_ply.c index e8dbf56b..f2bee3a5 100644 --- a/message_ply.c +++ b/message_ply.c @@ -136,6 +136,16 @@ int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid return ret; } +void message_ply_read_rewind(struct message_ply_read *ply) +{ + ply->read.offset = ply->read.length; +} + +int message_ply_is_open(struct message_ply_read *ply) +{ + return ply->read.length>0; +} + void message_ply_read_close(struct message_ply_read *ply) { if (ply->record){ @@ -152,7 +162,7 @@ void message_ply_read_close(struct message_ply_read *ply) int message_ply_read_prev(struct message_ply_read *ply) { ply->record_end_offset = ply->read.offset; - unsigned char footer[2]; + uint8_t footer[2]; if (ply->read.offset <= sizeof footer) { DEBUG(meshms, "EOF"); return -1; @@ -223,6 +233,7 @@ void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, u { ob_checkpoint(b); ob_append_packed_ui64(b, message_offset); + // append the number of bytes acked (should be smaller than an absolute offset) if (previous_ack_offset) ob_append_packed_ui64(b, message_offset - previous_ack_offset); append_footer(b, MESSAGE_BLOCK_TYPE_ACK); @@ -233,4 +244,5 @@ void message_ply_append_message(struct overlay_buffer *b, const char *message, s ob_checkpoint(b); ob_append_strn(b, message, message_len); append_footer(b, MESSAGE_BLOCK_TYPE_MESSAGE); -} \ No newline at end of file +} + diff --git a/message_ply.h b/message_ply.h index 199a4da4..9efccc96 100644 --- a/message_ply.h +++ b/message_ply.h @@ -28,15 +28,17 @@ struct message_ply_read { uint64_t record_end_offset; uint16_t record_length; size_t record_size; - char type; + uint8_t type; // raw record data - unsigned char *record; + uint8_t *record; }; int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid); void message_ply_read_close(struct message_ply_read *ply); int message_ply_read_prev(struct message_ply_read *ply); int message_ply_find_prev(struct message_ply_read *ply, char type); +int message_ply_is_open(struct message_ply_read *ply); +void message_ply_read_rewind(struct message_ply_read *ply); void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset); void message_ply_append_timestamp(struct overlay_buffer *b); diff --git a/rhizome_store.c b/rhizome_store.c index d4682eb2..d77417aa 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -1263,6 +1263,8 @@ void rhizome_read_close(struct rhizome_read *read) END); } read->length = 0; + read->offset = 0; + read->tail = 0; } struct cache_entry{ @@ -1476,6 +1478,8 @@ static enum rhizome_payload_status read_derive_key(rhizome_manifest *m, struct r enum rhizome_payload_status rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state) { + if (m->filesize == 0 && !m->has_filehash) + return RHIZOME_PAYLOAD_STATUS_EMPTY; enum rhizome_payload_status status = rhizome_open_read(read_state, &m->filehash); if (status == RHIZOME_PAYLOAD_STATUS_STORED) status = read_derive_key(m, read_state); diff --git a/testdefs_meshms.sh b/testdefs_meshms.sh index ebdc07ed..a4b806b8 100644 --- a/testdefs_meshms.sh +++ b/testdefs_meshms.sh @@ -97,12 +97,14 @@ meshms_add_messages() { MESSAGE[$n]="<" meshms_send_message $sid2 $sid1 "${TEXT[$n]}" let ++NRECV + let sent_since_ack=0 ;; 'A') MESSAGE[$n]=ACK [ $i -ne 0 -a $sent_since_ack -eq 0 ] && error "two ACKs in a row (at position $i)" - meshms_send_messages $sid2 $sid1 + meshms_list_messages $sid2 $sid1 let ++NACK + let sent_since_ack=0 ;; *) error "invalid message symbol '$sym' (at position $i)" diff --git a/tests/meshms b/tests/meshms index 5e9f7cfb..dd337af4 100755 --- a/tests/meshms +++ b/tests/meshms @@ -22,7 +22,7 @@ source "${0%/*}/../testframework.sh" source "${0%/*}/../testdefs.sh" source "${0%/*}/../testdefs_rhizome.sh" -rexp_age="[0-9]\+" +rexp_age="[-0-9]\+" teardown() { stop_all_servald_servers @@ -34,66 +34,60 @@ teardown() { setup_logging() { executeOk_servald config \ set debug.meshms on \ - set debug.rhizome on \ - set debug.rhizome_manifest on \ - set debug.rhizome_store on \ set log.console.level debug \ set log.console.show_time on } -doc_MessageDelivery="Send messages, ack and read them in a 2 party conversation" -setup_MessageDelivery() { +setup_common() { setup_servald set_instance +A - create_identities 2 + create_identities $1 setup_logging } -test_MessageDelivery() { - # 1. empty list + +doc_InitiallyEmpty="Return an empty list before sending anything" +setup_InitiallyEmpty() { + setup_common 2 +} +test_InitiallyEmpty() { executeOk_servald meshms list messages $SIDA1 $SIDA2 + tfw_cat --stdout assertStdoutIs --stdout --line=1 -e '5\n' assertStdoutIs --stdout --line=2 -e '_id:offset:age:type:message\n' assertStdoutLineCount '==' 2 - # 2. create a manifest with a single message and list it back - executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi" +} + +doc_SendAndList="Send outgoing messages and list them" +setup_SendAndList() { + setup_common 3 +} +test_SendAndList() { + # create a single message and list it back + executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 1" + tfw_cat --stderr + executeOk_servald meshms send message $SIDA1 $SIDA3 "Message 1" + tfw_cat --stderr executeOk_servald meshms list messages $SIDA1 $SIDA2 - assertStdoutGrep --stdout --matches=1 ":>:Hi\$" - assertStdoutLineCount '==' 3 - # 3. append a second message and list them both - executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you" + tfw_cat --stderr + assertStdoutGrep --stdout --matches=1 "^0:12:$rexp_age:>:Message 1\$" + executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 2" + tfw_cat --stderr executeOk_servald meshms list messages $SIDA1 $SIDA2 - assertStdoutGrep --stdout --matches=1 ":>:How are you\$" - assertStdoutGrep --stdout --matches=1 ":>:Hi\$" - assertStdoutLineCount '==' 4 - # 4. list the messages from the receivers point of view (which ACKs them) - executeOk_servald meshms list messages $SIDA2 $SIDA1 - assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$" - assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:<:Hi\$" - assertStdoutLineCount '==' 4 - CONV_BID=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*bid=\([0-9A-F]*\).*/\1/p') - CONV_SECRET=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*secret=\([0-9A-F]*\).*/\1/p') - tfw_log "CONV_BID=$CONV_BID CONV_SECRET=$CONV_SECRET" - # 5. mark the first message as read - executeOk_servald meshms read messages $SIDA2 $SIDA1 5 - check_meshms_bundles - executeOk_servald meshms list messages $SIDA2 $SIDA1 - assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$" - assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:MARK:read\$" - assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$" + tfw_cat --stderr + assertStdoutGrep --stdout --matches=1 "^0:30:$rexp_age:>:Message 2\$" + assertStdoutGrep --stdout --matches=1 "^1:12:$rexp_age:>:Message 1\$" + executeOk_servald meshms send message $SIDA1 $SIDA2 "Message 3" + executeOk_servald meshms list messages $SIDA1 $SIDA2 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:48:$rexp_age:>:Message 3\$" + assertStdoutGrep --stdout --matches=1 "^1:30:$rexp_age:>:Message 2\$" + assertStdoutGrep --stdout --matches=1 "^2:12:$rexp_age:>:Message 1\$" assertStdoutLineCount '==' 5 - # 6. mark all messages as read - executeOk_servald meshms read messages $SIDA2 - check_meshms_bundles executeOk_servald meshms list messages $SIDA2 $SIDA1 - assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:MARK:read\$" - assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:<:How are you\$" - assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$" - assertStdoutLineCount '==' 5 - # 7. list messages from the senders point of view after they have been delivered - executeOk_servald meshms list messages $SIDA1 $SIDA2 - assertStdoutGrep --stdout --matches=1 "^0:3:$rexp_age:ACK:delivered\$" - assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:>:How are you\$" - assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:>:Hi\$" + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:48:$rexp_age:<:Message 3\$" + assertStdoutGrep --stdout --matches=1 "^1:30:$rexp_age:<:Message 2\$" + assertStdoutGrep --stdout --matches=1 "^2:12:$rexp_age:<:Message 1\$" assertStdoutLineCount '==' 5 } @@ -171,6 +165,10 @@ setup_reorderList() { set_instance +A create_identities 5 setup_logging + # ensure conversations are known + executeOk_servald meshms send message $SIDA1 $SIDA2 "Start" + executeOk_servald meshms send message $SIDA1 $SIDA3 "Start" + executeOk_servald meshms send message $SIDA1 $SIDA4 "Start" } test_reorderList() { # new incoming messages should bump to the top @@ -212,21 +210,20 @@ test_listConversations() { assertStdoutIs --stderr --line=2 -e '_id:recipient:read:last_message:read_offset:message\n' assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0:\$" assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0:Message2\$" - assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0:Message4\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0:Message4\$" assertStdoutLineCount '==' 5 executeOk_servald meshms list conversations $SIDA1 1 - tfw_cat --stderr assertStdoutLineCount '==' 4 executeOk_servald meshms list conversations $SIDA1 1 1 - tfw_cat --stderr assertStdoutLineCount '==' 3 # mark all incoming messages as read executeOk_servald meshms read messages $SIDA1 - tfw_cat --stderr + # explicitly mark sida3 as known + executeOk_servald meshms read messages $SIDA1 $SIDA3 executeOk_servald meshms list conversations $SIDA1 assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$" assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$" - assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$" assertStdoutLineCount '==' 5 executeOk_servald meshms list messages $SIDA1 $SIDA2 executeOk_servald meshms list messages $SIDA1 $SIDA4 diff --git a/tests/meshmsjava b/tests/meshmsjava index 4005db70..2d4bd201 100755 --- a/tests/meshmsjava +++ b/tests/meshmsjava @@ -77,19 +77,19 @@ test_MeshmsListConversations() { assertStdoutLineCount '==' 3 assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA2, read=true, last_message=0, read_offset=0" assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA3, read=false, last_message=14, read_offset=0" - assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=false, last_message=24, read_offset=0" + assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=false, last_message=18, read_offset=0" executeOk_servald meshms read messages $SIDA1 executeJavaOk org.servalproject.test.Meshms meshms-list-conversations $SIDA1 assertStdoutLineCount '==' 3 assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA2, read=true, last_message=0, read_offset=0" assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA3, read=true, last_message=14, read_offset=14" - assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=true, last_message=24, read_offset=24" + assertStdoutGrep "my_sid=$SIDA1, their_sid=$SIDA4, read=true, last_message=18, read_offset=18" } doc_MeshmsListMessages="Java API list MeshMS messages in one conversation" setup_MeshmsListMessages() { setup - meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>' + meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>' let NROWS=NSENT+NRECV+(NACK?1:0) executeOk_servald meshms list messages $SIDA1 $SIDA2 delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):[0-9]\+:>:.*/\1/p;q}' "$TFWSTDOUT") @@ -162,7 +162,7 @@ setup_MeshmsListMessagesNewSince() { executeOk_servald config set api.restful.newsince_timeout 1s } setup - meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>' + meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>' let NROWS=NSENT+NRECV+(NACK?1:0) executeJavaOk org.servalproject.test.Meshms meshms-list-messages "$SIDA1" "$SIDA2" tfw_cat --stdout --stderr @@ -288,7 +288,7 @@ setup_MeshmsReadAllConversations() { executeOk_servald meshms list conversations $SIDA1 assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$" assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$" - assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$" } test_MeshmsReadAllConversations() { executeJavaOk org.servalproject.test.Meshms meshms-mark-all-conversations-read $SIDA1 @@ -296,7 +296,7 @@ test_MeshmsReadAllConversations() { executeOk_servald meshms list conversations $SIDA1 assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$" assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$" - assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$" } doc_MeshmsReadAllMessages="Java API MeshMS mark all conversations read" diff --git a/tests/meshmsrestful b/tests/meshmsrestful index c7a1d2f3..50ff0295 100755 --- a/tests/meshmsrestful +++ b/tests/meshmsrestful @@ -69,9 +69,6 @@ set_meshms_config() { executeOk_servald config \ set debug.http_server on \ set debug.httpd on \ - set debug.rhizome_manifest on \ - set debug.rhizome_store on \ - set debug.rhizome on \ set debug.meshms on \ set debug.verbose on \ set log.console.level debug @@ -164,12 +161,13 @@ test_MeshmsListConversations() { { my_sid: \"$SIDA1\", their_sid: \"$SIDA4\", read: false, - last_message: 20, + last_message: 14, read_offset: 0 } ])" # mark all incoming messages as read executeOk_servald meshms read messages $SIDA1 + executeOk_servald meshms read messages $SIDA1 $SIDA3 tfw_cat --stderr executeOk curl \ --silent --fail --show-error \ @@ -205,8 +203,8 @@ test_MeshmsListConversations() { { my_sid: \"$SIDA1\", their_sid: \"$SIDA4\", read: true, - last_message: 20, - read_offset: 20 + last_message: 14, + read_offset: 14 } ])" } @@ -215,7 +213,7 @@ doc_MeshmsListMessages="HTTP RESTful list MeshMS messages in one conversation as setup_MeshmsListMessages() { IDENTITY_COUNT=2 setup - meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>' + meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>' let NROWS=NSENT+NRECV+(NACK?1:0) executeOk_servald meshms list messages $SIDA1 $SIDA2 delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):[0-9]\+:>:.*/\1/p;q}' "$TFWSTDOUT") @@ -297,7 +295,7 @@ setup_MeshmsListMessagesNewSince() { executeOk_servald config set api.restful.newsince_timeout 1s } setup - meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<><>>A<<>' + meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<<>><>>A<<>' let NROWS=NSENT+NRECV+(NACK?1:0) executeOk curl \ --silent --fail --show-error \ @@ -327,11 +325,10 @@ test_MeshmsListMessagesNewSince() { for ((i = 0; i < NROWS; i += 3)); do transform_list_json messagelist$i.json messages$i.json tfw_preserve messages$i.json - { echo '{"a":'; jq '.[:'$i']' messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json - # If an ACK is new since the token, then it will be in its correct place - # in messages$i.json, otherwise the ACK will appear in messages$i.json - # anyway. - assertJq tmp.json '.a == (if .a | contains([{type:"ACK"}]) then .b else .b | map(select(.type != "ACK")) end)' + jq '.[:'$i']' messages.json > messages_expected$i.json + tfw_preserve messages_expected$i.json + { echo '{"a":'; cat messages_expected$i.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json + assertJq tmp.json '.a == .b' done } @@ -385,7 +382,7 @@ test_MeshmsListMessagesNewSinceArrival() { 'A') waitfor="ACK";; *) error "message=${message}";; esac - wait_until --timeout=120 grepall "$waitfor" newsince{1,2,3}.json + wait_until --timeout=10 grepall "$waitfor" newsince{1,2,3}.json done fork_terminate_all fork_wait_all @@ -596,7 +593,7 @@ setup_MeshmsReadAllConversations() { executeOk_servald meshms list conversations $SIDA1 assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$" assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$" - assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$" } test_MeshmsReadAllConversations() { executeOk curl \ @@ -612,7 +609,7 @@ test_MeshmsReadAllConversations() { executeOk_servald meshms list conversations $SIDA1 assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$" assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$" - assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$" } doc_MeshmsPostSpuriousContent="HTTP RESTful MeshMS rejects unwanted content in POST request" @@ -625,7 +622,7 @@ setup_MeshmsPostSpuriousContent() { executeOk_servald meshms send message $SIDA1 $SIDA2 "Message3" executeOk_servald meshms send message $SIDA2 $SIDA1 "Message4" executeOk_servald meshms list conversations $SIDA1 - assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:35:0\$" } test_MeshmsPostSpuriousContent() { executeOk curl \ @@ -642,7 +639,7 @@ test_MeshmsPostSpuriousContent() { assertJq http_body 'contains({"http_status_code": 400})' assertJqGrep --ignore-case http_body '.http_status_message' 'content length' executeOk_servald meshms list conversations $SIDA1 - assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$" + assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:35:0\$" } doc_MeshmsReadAllMessages="HTTP RESTful MeshMS mark all conversations read" @@ -656,6 +653,8 @@ setup_MeshmsReadAllMessages() { executeOk_servald meshms send message $SIDA1 $SIDA4 "Message4" executeOk_servald meshms send message $SIDA4 $SIDA1 "Message5" executeOk_servald meshms send message $SIDA1 $SIDA2 "Message6" + executeOk_servald meshms list messages $SIDA2 $SIDA1 + tfw_cat --stdout executeOk_servald meshms list conversations $SIDA2 assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:45:0\$" }