From b7b77e81ee8496f6562cec4ffd6c273d012266fe Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 15 Feb 2017 15:35:20 +1030 Subject: [PATCH] Add helpers for parsing ply records --- meshmb.c | 5 ++--- meshmb_cli.c | 5 ++--- meshmb_restful.c | 5 ++--- meshms.c | 38 ++++++++++++++++---------------------- message_ply.c | 40 +++++++++++++++++++++++++++++++++++++++- message_ply.h | 5 ++++- 6 files changed, 65 insertions(+), 33 deletions(-) diff --git a/meshmb.c b/meshmb.c index 5204532c..e2ef024f 100644 --- a/meshmb.c +++ b/meshmb.c @@ -108,11 +108,10 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada if (reader->record_end_offset <= metadata->size) break; if (reader->type == MESSAGE_BLOCK_TYPE_TIME){ - if (reader->record_length<4){ - WARN("Malformed ply, expected 4 byte timestamp"); + if (message_ply_parse_timestamp(reader, ×tamp)!=0){ + WARN("Malformed ply, expected timestamp"); continue; } - timestamp = read_uint32(reader->record); }else if(reader->type == MESSAGE_BLOCK_TYPE_MESSAGE){ if (metadata->last_message_offset == reader->record_end_offset) diff --git a/meshmb_cli.c b/meshmb_cli.c index 01654021..f594f606 100644 --- a/meshmb_cli.c +++ b/meshmb_cli.c @@ -85,11 +85,10 @@ static int app_meshmb_read(const struct cli_parsed *parsed, struct cli_context * while(message_ply_read_prev(&read)==0){ switch(read.type){ case MESSAGE_BLOCK_TYPE_TIME: - if (read.record_length<4){ - WARN("Malformed ply, expected 4 byte timestamp"); + if (message_ply_parse_timestamp(&read, ×tamp)!=0){ + WARN("Malformed ply, expected timestamp"); continue; } - timestamp = read_uint32(read.record); break; case MESSAGE_BLOCK_TYPE_MESSAGE: diff --git a/meshmb_restful.c b/meshmb_restful.c index f2c08b4c..33454091 100644 --- a/meshmb_restful.c +++ b/meshmb_restful.c @@ -174,11 +174,10 @@ static int next_ply_message(httpd_request *r){ switch(r->u.plylist.ply_reader.type){ case MESSAGE_BLOCK_TYPE_TIME: - if (r->u.plylist.ply_reader.record_length<4){ - WARN("Malformed ply, expected 4 byte timestamp"); + if (message_ply_parse_timestamp(&r->u.plylist.ply_reader, &r->u.plylist.timestamp)!=0){ + WARN("Malformed ply, expected timestamp"); continue; } - r->u.plylist.timestamp = read_uint32(r->u.plylist.ply_reader.record); break; case MESSAGE_BLOCK_TYPE_MESSAGE: diff --git a/meshms.c b/meshms.c index 5f401dea..8e2a1e6b 100644 --- a/meshms.c +++ b/meshms.c @@ -213,11 +213,10 @@ static enum meshms_status update_their_stats(struct meshms_metadata *metadata, s case MESSAGE_BLOCK_TYPE_ACK: if (!found_their_ack){ found_their_ack = 1; - uint64_t value=0; metadata->their_last_ack_offset = reader->record_end_offset; - if (unpack_uint(reader->record, reader->record_length, &value) != -1){ - metadata->their_last_ack = value; - } + message_ply_parse_ack(reader, + &metadata->their_last_ack, + NULL, NULL); DEBUGF(meshms, "Found their last ack @%"PRIu64" = %"PRIu64, metadata->their_last_ack_offset, metadata->their_last_ack); } @@ -244,11 +243,8 @@ static enum meshms_status update_my_stats(struct meshms_metadata *metadata, stru reader->read.offset = reader->read.length; if (message_ply_find_prev(reader, MESSAGE_BLOCK_TYPE_ACK)==0){ - uint64_t my_ack = 0; - if (unpack_uint(reader->record, reader->record_length, &my_ack) != -1){ - metadata->my_last_ack = my_ack; - DEBUGF(meshms, "Found my last ack %"PRId64, my_ack); - } + message_ply_parse_ack(reader, &metadata->my_last_ack, NULL, NULL); + DEBUGF(meshms, "Found my last ack %"PRId64, metadata->my_last_ack); } metadata->my_size = ply->size; message_ply_read_rewind(reader); @@ -794,8 +790,9 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * iter->their_offset = iter->_their_reader.record_end_offset; iter->text = NULL; iter->text_length = 0; - if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1) - iter->ack_offset = 0; + message_ply_parse_ack(&iter->_their_reader, + &iter->ack_offset, + NULL, NULL); iter->read = 0; return MESHMS_STATUS_UPDATED; case MESSAGE_BLOCK_TYPE_MESSAGE: @@ -825,28 +822,25 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator * iter->which_ply = MY_PLY; switch (iter->_my_reader.type) { case MESSAGE_BLOCK_TYPE_TIME: - if (iter->_my_reader.record_length<4){ - WARN("Malformed MeshMS2 ply journal, expected 4 byte timestamp"); + if (message_ply_parse_timestamp(&iter->_my_reader, &iter->timestamp)!=0){ + WARN("Malformed MeshMS2 ply journal, malformed timestamp"); return MESHMS_STATUS_PROTOCOL_FAULT; } - iter->timestamp = read_uint32(iter->_my_reader.record); DEBUGF(meshms, "Parsed timestamp %ds old", gettime() - iter->timestamp); break; case MESSAGE_BLOCK_TYPE_ACK: // Read the received messages up to the ack'ed offset if (iter->their_ply.found) { iter->my_offset = iter->_my_reader.record_end_offset; - int ofs = unpack_uint(iter->_my_reader.record, iter->_my_reader.record_length, (uint64_t*)&iter->_their_reader.read.offset); - if (ofs == -1) { + + if (message_ply_parse_ack(&iter->_my_reader, + (uint64_t*)&iter->_their_reader.read.offset, + &iter->_end_range, + NULL + ) == -1){ WHYF("Malformed ACK"); return MESHMS_STATUS_PROTOCOL_FAULT; } - uint64_t end_range; - int x = unpack_uint(iter->_my_reader.record + ofs, iter->_my_reader.record_length - ofs, &end_range); - if (x == -1) - iter->_end_range = 0; - else - iter->_end_range = iter->_their_reader.read.offset - end_range; // TODO tail iter->_in_ack = 1; } diff --git a/message_ply.c b/message_ply.c index 8fb922eb..da765cac 100644 --- a/message_ply.c +++ b/message_ply.c @@ -261,6 +261,44 @@ int message_ply_read_prev(struct message_ply_read *ply) return 0; } +int message_ply_parse_timestamp(struct message_ply_read *ply, time_s_t *timestamp) +{ + assert(ply->type == MESSAGE_BLOCK_TYPE_TIME); + if (ply->record_length<4) + return -1; + *timestamp = read_uint32(ply->record); + return 0; +} + +int message_ply_parse_ack(struct message_ply_read *ply, uint64_t *end_offset, uint64_t *start_offset, rhizome_bid_t **bid) +{ + int ofs=0; + + *end_offset=0; + if (start_offset) + *start_offset = 0; + if (bid) + *bid = NULL; + + int r = unpack_uint(&ply->record[ofs], ply->record_length, end_offset); + if (r == -1) + return -1; + ofs+=r; + uint64_t length; + r = unpack_uint(&ply->record[ofs], ply->record_length - ofs, &length); + if (r == -1) + return 0; + if (start_offset) + *start_offset = *end_offset - length; + ofs += r; + + if (bid){ + if (ofs + sizeof(rhizome_bid_t) <= ply->record_length) + *bid = (rhizome_bid_t*)&ply->record[ofs]; + } + return 0; +} + // keep reading past messages until you find this type. int message_ply_find_prev(struct message_ply_read *ply, const char message_type) { @@ -287,7 +325,7 @@ void message_ply_append_timestamp(struct overlay_buffer *b) append_footer(b, MESSAGE_BLOCK_TYPE_TIME); } -void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid) +void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, const rhizome_bid_t *bid) { ob_checkpoint(b); ob_append_packed_ui64(b, message_offset); diff --git a/message_ply.h b/message_ply.h index 43d414ea..dcd31a25 100644 --- a/message_ply.h +++ b/message_ply.h @@ -50,6 +50,9 @@ int message_ply_find_prev(struct message_ply_read *ply, char type); int message_ply_is_open(struct message_ply_read *ply); void message_ply_read_rewind(struct message_ply_read *ply); +int message_ply_parse_timestamp(struct message_ply_read *ply, time_s_t *timestamp); +int message_ply_parse_ack(struct message_ply_read *ply, uint64_t *end_offset, uint64_t *start_offset, rhizome_bid_t **bid); + int message_ply_write_open( struct message_ply_write *ply_write, const struct keyring_identity *id, @@ -64,7 +67,7 @@ int message_ply_write_open( int message_ply_write_finish(struct message_ply_write *write); void message_ply_write_close(struct message_ply_write *write); -void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid); +void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, const rhizome_bid_t *bid); void message_ply_append_timestamp(struct overlay_buffer *b); void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len); int message_ply_append(const struct keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b,