From bfbd2ddccd011851ade4fc0ebb798ee16fefd540 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Mon, 27 Feb 2017 11:58:28 +1030 Subject: [PATCH] Abbreviate the bundle id in feed ack's --- meshmb.c | 47 +++++++++++++++++++++++++++-------------------- meshmb.h | 2 +- meshmb_cli.c | 2 +- meshms.c | 46 +++++++++++++++++++++++++++------------------- message_ply.c | 35 +++++++++++++++-------------------- message_ply.h | 12 ++++++++++-- 6 files changed, 81 insertions(+), 63 deletions(-) diff --git a/meshmb.c b/meshmb.c index 0f6439b0..6b27d2ce 100644 --- a/meshmb.c +++ b/meshmb.c @@ -69,32 +69,31 @@ int meshmb_activity_next(struct meshmb_activity_iterator *i){ continue; case MESSAGE_BLOCK_TYPE_ACK:{ - uint64_t ack_end; - rhizome_bid_t *bid; - if (message_ply_parse_ack(&i->ack_reader, - &ack_end, - &i->ack_start, - &bid - ) == -1) + struct message_ply_ack ack; + if (message_ply_parse_ack(&i->ack_reader, &ack) == -1) return -1; DEBUGF(meshmb, "Found ack for %s, %u to %u", - alloca_tohex_rhizome_bid_t(*bid), i->ack_start, ack_end); + alloca_tohex(ack.binary, ack.binary_length), ack.start_offset, ack.end_offset); - if (bcmp(&i->msg_ply, bid, sizeof(*bid))==0){ - // shortcut for consecutive acks for the same incoming feed - DEBUGF(meshmb, "Ply still open @%u", - i->msg_reader.read.offset); - } else { - message_ply_read_close(&i->msg_reader); - if (message_ply_read_open(&i->msg_reader, bid, NULL)==-1){ - bzero(&i->msg_ply, sizeof i->msg_ply); - continue; + struct feed_metadata *metadata; + if (tree_find(&i->feeds->root, (void**)&metadata, ack.binary, ack.binary_length, NULL, NULL)==TREE_FOUND){ + if (i->metadata == metadata){ + // shortcut for consecutive acks for the same incoming feed + DEBUGF(meshmb, "Ply still open @%u", + i->msg_reader.read.offset); + }else{ + message_ply_read_close(&i->msg_reader); + if (message_ply_read_open(&i->msg_reader, &metadata->ply.bundle_id, NULL)==-1){ + i->metadata = NULL; + continue; + } + i->metadata = metadata; } - i->msg_ply = *bid; } - i->msg_reader.read.offset = ack_end; + i->ack_start = ack.start_offset; + i->msg_reader.read.offset = ack.end_offset; } break; default: @@ -257,7 +256,15 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada { struct overlay_buffer *b = ob_new(); - message_ply_append_ack(b, metadata->ply.size, metadata->size, &metadata->ply.bundle_id); + struct message_ply_ack ack; + bzero(&ack, sizeof ack); + + ack.start_offset = metadata->size; + ack.end_offset = metadata->ply.size; + ack.binary = metadata->ply.bundle_id.binary; + ack.binary_length = (metadata->tree_depth >> 3) + 3; + + message_ply_append_ack(b, &ack); int r = rhizome_write_buffer(&feeds->ack_writer, ob_ptr(b), ob_position(b)); DEBUGF(meshmb, "Acked incoming messages"); ob_free(b); diff --git a/meshmb.h b/meshmb.h index 5125425c..5bbdd4f2 100644 --- a/meshmb.h +++ b/meshmb.h @@ -23,7 +23,7 @@ struct meshmb_activity_iterator{ struct message_ply_read ack_reader; time_s_t ack_timestamp; uint64_t ack_start; - rhizome_bid_t msg_ply; + struct feed_metadata *metadata; struct message_ply_read msg_reader; }; diff --git a/meshmb_cli.c b/meshmb_cli.c index 8163a498..6f81e54e 100644 --- a/meshmb_cli.c +++ b/meshmb_cli.c @@ -333,7 +333,7 @@ static int app_meshmb_activity(const struct cli_parsed *parsed, struct cli_conte switch(iterator->msg_reader.type){ case MESSAGE_BLOCK_TYPE_MESSAGE: cli_put_long(context, rowcount++, ":"); - cli_put_string(context, alloca_tohex_rhizome_bid_t(iterator->msg_ply), ":"); + cli_put_string(context, alloca_tohex_rhizome_bid_t(iterator->msg_reader.bundle_id), ":"); cli_put_string(context, alloca_tohex_identity_t(&iterator->msg_reader.author), ":"); cli_put_string(context, iterator->msg_reader.name, ":"); cli_put_long(context, iterator->ack_timestamp ? (long)(now - iterator->ack_timestamp) : (long)-1, ":"); diff --git a/meshms.c b/meshms.c index 675d4d5b..59c5a83b 100644 --- a/meshms.c +++ b/meshms.c @@ -214,9 +214,9 @@ static enum meshms_status update_their_stats(struct meshms_metadata *metadata, s if (!found_their_ack){ found_their_ack = 1; metadata->their_last_ack_offset = reader->record_end_offset; - message_ply_parse_ack(reader, - &metadata->their_last_ack, - NULL, NULL); + struct message_ply_ack ack; + message_ply_parse_ack(reader, &ack); + metadata->their_last_ack = ack.end_offset; DEBUGF(meshms, "Found their last ack @%"PRIu64" = %"PRIu64, metadata->their_last_ack_offset, metadata->their_last_ack); } @@ -243,7 +243,9 @@ static enum meshms_status update_my_stats(struct meshms_metadata *metadata, stru reader->read.offset = reader->read.length; if (message_ply_find_prev(reader, MESSAGE_BLOCK_TYPE_ACK)==0){ - message_ply_parse_ack(reader, &metadata->my_last_ack, NULL, NULL); + struct message_ply_ack ack; + message_ply_parse_ack(reader, &ack); + metadata->my_last_ack = ack.end_offset; DEBUGF(meshms, "Found my last ack %"PRId64, metadata->my_last_ack); } metadata->my_size = ply->size; @@ -299,8 +301,11 @@ static enum meshms_status update_conversation(const keyring_identity *id, struct DEBUGF(meshms, "Creating ACK for %"PRId64" - %"PRId64, conv->metadata.my_last_ack, conv->metadata.their_last_message); unsigned char buffer[30]; struct overlay_buffer *b = ob_static(buffer, sizeof buffer); - - message_ply_append_ack(b, conv->metadata.their_last_message, conv->metadata.my_last_ack, NULL); + struct message_ply_ack ack; + bzero(&ack, sizeof ack); + ack.end_offset = conv->metadata.their_last_message; + ack.start_offset = conv->metadata.my_last_ack; + message_ply_append_ack(b, &ack); message_ply_append_timestamp(b); assert(!ob_overrun(b)); @@ -785,17 +790,17 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * iter->which_ply = THEIR_PLY; if (iter->_their_reader.read.offset >= iter->_end_range) { switch (iter->_their_reader.type) { - case MESSAGE_BLOCK_TYPE_ACK: + case MESSAGE_BLOCK_TYPE_ACK:{ iter->type = ACK_RECEIVED; iter->their_offset = iter->_their_reader.record_end_offset; iter->text = NULL; iter->text_length = 0; - message_ply_parse_ack(&iter->_their_reader, - &iter->ack_offset, - NULL, NULL); + struct message_ply_ack ack; + message_ply_parse_ack(&iter->_their_reader, &ack); + iter->ack_offset = ack.end_offset; iter->read = 0; return MESHMS_STATUS_UPDATED; - case MESSAGE_BLOCK_TYPE_MESSAGE: + }case MESSAGE_BLOCK_TYPE_MESSAGE: iter->type = MESSAGE_RECEIVED; iter->their_offset = iter->_their_reader.record_end_offset; iter->text = (const char *)iter->_their_reader.record; @@ -832,15 +837,14 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * // Read the received messages up to the ack'ed offset if (iter->their_ply.found) { iter->my_offset = iter->_my_reader.record_end_offset; + struct message_ply_ack ack; - if (message_ply_parse_ack(&iter->_my_reader, - (uint64_t*)&iter->_their_reader.read.offset, - &iter->_end_range, - NULL - ) == -1){ + if (message_ply_parse_ack(&iter->_my_reader, &ack) == -1){ WHYF("Malformed ACK"); return MESHMS_STATUS_PROTOCOL_FAULT; } + iter->_their_reader.read.offset = ack.end_offset; + iter->_end_range = ack.start_offset; // TODO tail iter->_in_ack = 1; } @@ -910,9 +914,13 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie // lets do that in one hit uint8_t ack = (c->metadata.my_last_ack < c->metadata.their_last_message) ? 1:0; DEBUGF(meshms,"Our ack %"PRIu64", their message %"PRIu64, c->metadata.my_last_ack, c->metadata.their_last_message); - if (ack) - message_ply_append_ack(b, c->metadata.their_last_message, c->metadata.my_last_ack, NULL); - + if (ack){ + struct message_ply_ack ack; + bzero(&ack, sizeof ack); + ack.end_offset = c->metadata.their_last_message; + ack.start_offset = c->metadata.my_last_ack; + message_ply_append_ack(b, &ack); + } message_ply_append_message(b, message, message_len); message_ply_append_timestamp(b); diff --git a/message_ply.c b/message_ply.c index a3648999..fd583528 100644 --- a/message_ply.c +++ b/message_ply.c @@ -191,6 +191,7 @@ int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid && rhizome_open_decrypt_read(m, &ply->read) == RHIZOME_PAYLOAD_STATUS_STORED){ assert(m->filesize != RHIZOME_SIZE_UNSET); + ply->bundle_id = m->keypair.public_key; ply->author = m->author; ply->read.offset = ply->read.length = m->filesize; if (m->name && *m->name) @@ -281,17 +282,12 @@ int message_ply_parse_timestamp(struct message_ply_read *ply, time_s_t *timestam return 0; } -int message_ply_parse_ack(struct message_ply_read *ply, uint64_t *end_offset, uint64_t *start_offset, rhizome_bid_t **bid) +int message_ply_parse_ack(struct message_ply_read *ply, struct message_ply_ack *ack) { int ofs=0; + bzero(ack, sizeof *ack); - *end_offset=0; - if (start_offset) - *start_offset = 0; - if (bid) - *bid = NULL; - - int r = unpack_uint(&ply->record[ofs], ply->record_length, end_offset); + int r = unpack_uint(&ply->record[ofs], ply->record_length, &ack->end_offset); if (r == -1) return -1; ofs+=r; @@ -299,13 +295,11 @@ int message_ply_parse_ack(struct message_ply_read *ply, uint64_t *end_offset, ui r = unpack_uint(&ply->record[ofs], ply->record_length - ofs, &length); if (r == -1) return 0; - if (start_offset) - *start_offset = *end_offset - length; + ack->start_offset = ack->end_offset - length; ofs += r; - - if (bid){ - if (ofs + sizeof(rhizome_bid_t) <= ply->record_length) - *bid = (rhizome_bid_t*)&ply->record[ofs]; + if (ofs < ply->record_length){ + ack->binary = &ply->record[ofs]; + ack->binary_length = ply->record_length - ofs; } return 0; } @@ -336,15 +330,16 @@ void message_ply_append_timestamp(struct overlay_buffer *b) append_footer(b, MESSAGE_BLOCK_TYPE_TIME); } -void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, const rhizome_bid_t *bid) +void message_ply_append_ack(struct overlay_buffer *b, const struct message_ply_ack *ack) { + assert(ack->binary_length == 0 || ack->binary); ob_checkpoint(b); - ob_append_packed_ui64(b, message_offset); + ob_append_packed_ui64(b, ack->end_offset); // append the number of bytes acked (should be smaller than an absolute offset) - if (previous_ack_offset || bid) - ob_append_packed_ui64(b, message_offset - previous_ack_offset); - if (bid) - ob_append_bytes(b, bid->binary, sizeof *bid); + if (ack->start_offset || ack->binary_length) + ob_append_packed_ui64(b, ack->end_offset - ack->start_offset); + if (ack->binary_length) + ob_append_bytes(b, ack->binary, ack->binary_length); append_footer(b, MESSAGE_BLOCK_TYPE_ACK); } diff --git a/message_ply.h b/message_ply.h index fab9910d..d9b9ed36 100644 --- a/message_ply.h +++ b/message_ply.h @@ -25,6 +25,7 @@ struct message_ply_read { struct rhizome_read read; // block buffer struct rhizome_read_buffer buff; + rhizome_bid_t bundle_id; // copy of the manifest name field const char *name; // copy of the manifest author @@ -50,8 +51,15 @@ int message_ply_find_prev(struct message_ply_read *ply, char type); int message_ply_is_open(struct message_ply_read *ply); void message_ply_read_rewind(struct message_ply_read *ply); +struct message_ply_ack{ + uint64_t start_offset; + uint64_t end_offset; + const uint8_t *binary; + size_t binary_length; +}; + int message_ply_parse_timestamp(struct message_ply_read *ply, time_s_t *timestamp); -int message_ply_parse_ack(struct message_ply_read *ply, uint64_t *end_offset, uint64_t *start_offset, rhizome_bid_t **bid); +int message_ply_parse_ack(struct message_ply_read *ply, struct message_ply_ack *ack); int message_ply_write_open( struct message_ply_write *ply_write, @@ -67,7 +75,7 @@ int message_ply_write_open( int message_ply_write_finish(struct message_ply_write *write); void message_ply_write_close(struct message_ply_write *write); -void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, const rhizome_bid_t *bid); +void message_ply_append_ack(struct overlay_buffer *b, const struct message_ply_ack *ack); void message_ply_append_timestamp(struct overlay_buffer *b); void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len); int message_ply_append(const struct keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b,