From 940b8c2d4babf487f07654a18c526f9edb54e4d5 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 14 Feb 2017 10:45:06 +1030 Subject: [PATCH] Ack incoming feeds for threading --- meshmb.c | 108 +++++++++++++++++++++++++++++++++++++++++++----- meshms.c | 6 +-- message_ply.c | 6 ++- message_ply.h | 3 +- rhizome.h | 1 + rhizome_store.c | 13 +++--- 6 files changed, 114 insertions(+), 23 deletions(-) diff --git a/meshmb.c b/meshmb.c index d4d1a17e..5204532c 100644 --- a/meshmb.c +++ b/meshmb.c @@ -26,6 +26,9 @@ struct meshmb_feeds{ struct tree_root root; keyring_identity *id; sign_keypair_t bundle_keypair; + sign_keypair_t ack_bundle_keypair; + rhizome_manifest *ack_manifest; + struct rhizome_write ack_writer; bool_t dirty; uint8_t generation; }; @@ -34,7 +37,34 @@ struct meshmb_feeds{ #define MAX_NAME_LEN (256) // ?? #define MAX_MSG_LEN (256) // ?? -static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metadata, struct message_ply_read *reader) +static int finish_ack_writing(struct meshmb_feeds *feeds){ + if (!feeds->ack_manifest) + return 0; + + int ret=-1; + + DEBUGF(meshmb, "Completing private ply for ack thread"); + enum rhizome_payload_status status = rhizome_finish_write(&feeds->ack_writer); + status = rhizome_finish_store(&feeds->ack_writer, feeds->ack_manifest, status); + if (status == RHIZOME_PAYLOAD_STATUS_NEW){ + rhizome_manifest *mout=NULL; + struct rhizome_bundle_result result = rhizome_manifest_finalise(feeds->ack_manifest, &mout, 0); + if (mout && mout!=feeds->ack_manifest) + rhizome_manifest_free(mout); + + if (result.status == RHIZOME_BUNDLE_STATUS_NEW) + ret = 0; + rhizome_bundle_result_free(&result); + } + bzero(&feeds->ack_writer, sizeof feeds->ack_writer); + + rhizome_manifest_free(feeds->ack_manifest); + feeds->ack_manifest = NULL; + + return ret; +} + +static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metadata, struct message_ply_read *reader) { if (!metadata->ply.found){ // get the current size from the db @@ -45,17 +75,18 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad END) == SQLITE_ROW) metadata->ply.found = 1; else - return; + return -1; } + DEBUGF(meshmb, "Size from %u to %u", metadata->size, metadata->ply.size); if (metadata->size == metadata->ply.size) - return; + return 0; if (!message_ply_is_open(reader) && message_ply_read_open(reader, &metadata->ply.bundle_id)!=0) - return; + return -1; - // TODO remember if user has overridden the name? + // TODO allow the user to specify an overridden name? if (metadata->details.name){ free((void*)metadata->details.name); metadata->details.name = NULL; @@ -71,7 +102,11 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad reader->read.offset = reader->read.length; time_s_t timestamp = 0; + uint64_t last_offset = metadata->last_message_offset; + while (message_ply_read_prev(reader) == 0){ + if (reader->record_end_offset <= metadata->size) + break; if (reader->type == MESSAGE_BLOCK_TYPE_TIME){ if (reader->record_length<4){ WARN("Malformed ply, expected 4 byte timestamp"); @@ -83,7 +118,7 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad if (metadata->last_message_offset == reader->record_end_offset) break; - metadata->last_message_offset = reader->record_end_offset; + last_offset = reader->record_end_offset; if (metadata->details.last_message) free((void*)metadata->details.last_message); @@ -96,13 +131,57 @@ static void update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metad } } + DEBUGF(meshmb, "Last message from %u to %u", metadata->last_message_offset, last_offset); + if (last_offset > metadata->last_message_offset){ + // add an ack to our journal to thread new messages + if (!feeds->ack_manifest){ + rhizome_manifest *m = rhizome_new_manifest(); + + DEBUGF(meshmb, "Opening private ply for ack thread"); + + struct rhizome_bundle_result result = rhizome_private_bundle(m, &feeds->ack_bundle_keypair); + switch(result.status){ + case RHIZOME_BUNDLE_STATUS_NEW: + rhizome_manifest_set_tail(m, 0); + rhizome_manifest_set_filesize(m, 0); + case RHIZOME_BUNDLE_STATUS_SAME: + { + enum rhizome_payload_status pstatus = rhizome_write_open_journal(&feeds->ack_writer, m, 0, RHIZOME_SIZE_UNSET); + if (pstatus==RHIZOME_PAYLOAD_STATUS_NEW) + break; + } + // fallthrough + case RHIZOME_BUNDLE_STATUS_BUSY: + rhizome_bundle_result_free(&result); + rhizome_manifest_free(m); + return -1; + + default: + // everything else should be impossible. + FATALF("Cannot create manifest: %s", alloca_rhizome_bundle_result(result)); + } + + rhizome_bundle_result_free(&result); + feeds->ack_manifest = m; + } + + { + struct overlay_buffer *b = ob_new(); + + message_ply_append_ack(b, last_offset, metadata->last_message_offset, &metadata->ply.bundle_id); + int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b)); + DEBUGF(meshmb, "Acked incoming messages"); + ob_free(b); + if (r == -1) + return -1; + } + } + + metadata->last_message_offset = last_offset; metadata->size = metadata->ply.size; - // TODO assemble ACK list for unified reading....? - feeds->dirty=1; - - return; + return 1; } // TODO, might be quicker to fetch all meshmb bundles and test if they are in the feed list @@ -182,6 +261,8 @@ static int write_metadata(void **record, void *context) int meshmb_flush(struct meshmb_feeds *feeds) { + finish_ack_writing(feeds); + if (!feeds->dirty){ DEBUGF(meshmb, "Ignoring flush, not dirty"); return feeds->generation; @@ -378,9 +459,16 @@ int meshmb_open(keyring_identity *id, struct meshmb_feeds **feeds) (*feeds)->id = id; rhizome_manifest *m = rhizome_new_manifest(); if (m){ + // deterministic bundle id's for storing active follow / ignore state; crypto_seed_keypair(&(*feeds)->bundle_keypair, "91656c3d62e9fe2678a1a81fabe3f413%s5a37120ca55d911634560e4d4dc1283f", alloca_tohex(id->sign_keypair->private_key.binary, sizeof id->sign_keypair->private_key)); + + // and for threading incoming feed messages; + crypto_seed_keypair(&(*feeds)->ack_bundle_keypair, + "de3f2e21d9735d41b1fd7ddf03a58f2b%s937a440c12f9478d026bbf579ab115c0", + alloca_tohex(id->sign_keypair->private_key.binary, sizeof id->sign_keypair->private_key)); + struct rhizome_bundle_result result = rhizome_private_bundle(m, &(*feeds)->bundle_keypair); DEBUGF(meshmb, "Private bundle %s, %s", alloca_tohex_identity_t(&(*feeds)->bundle_keypair.public_key), diff --git a/meshms.c b/meshms.c index 5e65cd08..5f401dea 100644 --- a/meshms.c +++ b/meshms.c @@ -304,7 +304,7 @@ static enum meshms_status update_conversation(const keyring_identity *id, struct unsigned char buffer[30]; struct overlay_buffer *b = ob_static(buffer, sizeof buffer); - message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack); + message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack, NULL); message_ply_append_timestamp(b); assert(!ob_overrun(b)); @@ -566,7 +566,7 @@ static enum meshms_status write_known_conversations(rhizome_manifest *m, struct rhizome_manifest_set_filehash(m, &write.id); rhizome_manifest_set_filesize(m, write.file_length); - struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 1); + struct rhizome_bundle_result result = rhizome_manifest_finalise(m, &mout, 0); switch (result.status) { case RHIZOME_BUNDLE_STATUS_ERROR: // error is already logged @@ -917,7 +917,7 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie uint8_t ack = (c->metadata.my_last_ack < c->metadata.their_last_message) ? 1:0; DEBUGF(meshms,"Our ack %"PRIu64", their message %"PRIu64, c->metadata.my_last_ack, c->metadata.their_last_message); if (ack) - message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack); + message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack, NULL); message_ply_append_message(b, message, message_len); message_ply_append_timestamp(b); diff --git a/message_ply.c b/message_ply.c index eacf021c..8fb922eb 100644 --- a/message_ply.c +++ b/message_ply.c @@ -287,13 +287,15 @@ void message_ply_append_timestamp(struct overlay_buffer *b) append_footer(b, MESSAGE_BLOCK_TYPE_TIME); } -void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset) +void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid) { ob_checkpoint(b); ob_append_packed_ui64(b, message_offset); // append the number of bytes acked (should be smaller than an absolute offset) - if (previous_ack_offset) + if (previous_ack_offset || bid) ob_append_packed_ui64(b, message_offset - previous_ack_offset); + if (bid) + ob_append_bytes(b, bid->binary, sizeof *bid); append_footer(b, MESSAGE_BLOCK_TYPE_ACK); } diff --git a/message_ply.h b/message_ply.h index 25cf328d..43d414ea 100644 --- a/message_ply.h +++ b/message_ply.h @@ -41,7 +41,6 @@ struct message_ply_read { struct message_ply_write{ rhizome_manifest *m; struct rhizome_write write; - }; int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid); @@ -65,7 +64,7 @@ int message_ply_write_open( int message_ply_write_finish(struct message_ply_write *write); void message_ply_write_close(struct message_ply_write *write); -void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset); +void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid); void message_ply_append_timestamp(struct overlay_buffer *b); void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len); int message_ply_append(const struct keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b, diff --git a/rhizome.h b/rhizome.h index af36e400..9f53dfdc 100644 --- a/rhizome.h +++ b/rhizome.h @@ -875,6 +875,7 @@ enum rhizome_payload_status rhizome_write_open_manifest(struct rhizome_write *wr enum rhizome_payload_status rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, uint64_t advance_by, uint64_t append_size); int rhizome_write_file(struct rhizome_write *write, const char *filename, off_t offset, uint64_t length); void rhizome_fail_write(struct rhizome_write *write); +int is_rhizome_write_open(const struct rhizome_write *write); enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write); enum rhizome_payload_status rhizome_finish_store(struct rhizome_write *write, rhizome_manifest *m, enum rhizome_payload_status status); enum rhizome_payload_status rhizome_import_payload_from_file(rhizome_manifest *m, const char *filepath); diff --git a/rhizome_store.c b/rhizome_store.c index 9c88f17d..8e9c33e1 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -690,6 +690,11 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename, off_t return ret; } +int is_rhizome_write_open(const struct rhizome_write *write) +{ + return write->temp_id ? 1:0; +} + void rhizome_fail_write(struct rhizome_write *write) { if (write->blob_fd != -1){ @@ -711,6 +716,7 @@ void rhizome_fail_write(struct rhizome_write *write) write->buffer_list=n->_next; free(n); } + write->temp_id=0; } static int keep_hash(struct rhizome_write *write_state, struct crypto_hash_sha512_state *hash_state) @@ -1827,12 +1833,6 @@ enum rhizome_payload_status rhizome_write_open_journal(struct rhizome_write *wri enum rhizome_payload_status rhizome_finish_store(struct rhizome_write *write, rhizome_manifest *m, enum rhizome_payload_status status) { DEBUGF(rhizome, "write=%p m=manifest %p, status=%d %s", write, m, status, rhizome_payload_status_message_nonnull(status)); - switch (status) { - case RHIZOME_PAYLOAD_STATUS_NEW: - break; - default: - break; - } int status_valid = 0; switch (status) { case RHIZOME_PAYLOAD_STATUS_EMPTY: @@ -1890,6 +1890,7 @@ enum rhizome_payload_status rhizome_append_journal_buffer(rhizome_manifest *m, u { struct rhizome_write write; bzero(&write, sizeof write); + assert(advance_by || (buffer && len)); enum rhizome_payload_status status = rhizome_write_open_journal(&write, m, advance_by, (uint64_t) len); if (status != RHIZOME_PAYLOAD_STATUS_NEW) return status;