Add helpers for parsing ply records

This commit is contained in:
Jeremy Lakeman 2017-02-15 15:35:20 +10:30
parent 940b8c2d4b
commit b7b77e81ee
6 changed files with 65 additions and 33 deletions

View File

@ -108,11 +108,10 @@ static int update_stats(struct meshmb_feeds *feeds, struct feed_metadata *metada
if (reader->record_end_offset <= metadata->size)
break;
if (reader->type == MESSAGE_BLOCK_TYPE_TIME){
if (reader->record_length<4){
WARN("Malformed ply, expected 4 byte timestamp");
if (message_ply_parse_timestamp(reader, &timestamp)!=0){
WARN("Malformed ply, expected timestamp");
continue;
}
timestamp = read_uint32(reader->record);
}else if(reader->type == MESSAGE_BLOCK_TYPE_MESSAGE){
if (metadata->last_message_offset == reader->record_end_offset)

View File

@ -85,11 +85,10 @@ static int app_meshmb_read(const struct cli_parsed *parsed, struct cli_context *
while(message_ply_read_prev(&read)==0){
switch(read.type){
case MESSAGE_BLOCK_TYPE_TIME:
if (read.record_length<4){
WARN("Malformed ply, expected 4 byte timestamp");
if (message_ply_parse_timestamp(&read, &timestamp)!=0){
WARN("Malformed ply, expected timestamp");
continue;
}
timestamp = read_uint32(read.record);
break;
case MESSAGE_BLOCK_TYPE_MESSAGE:

View File

@ -174,11 +174,10 @@ static int next_ply_message(httpd_request *r){
switch(r->u.plylist.ply_reader.type){
case MESSAGE_BLOCK_TYPE_TIME:
if (r->u.plylist.ply_reader.record_length<4){
WARN("Malformed ply, expected 4 byte timestamp");
if (message_ply_parse_timestamp(&r->u.plylist.ply_reader, &r->u.plylist.timestamp)!=0){
WARN("Malformed ply, expected timestamp");
continue;
}
r->u.plylist.timestamp = read_uint32(r->u.plylist.ply_reader.record);
break;
case MESSAGE_BLOCK_TYPE_MESSAGE:

View File

@ -213,11 +213,10 @@ static enum meshms_status update_their_stats(struct meshms_metadata *metadata, s
case MESSAGE_BLOCK_TYPE_ACK:
if (!found_their_ack){
found_their_ack = 1;
uint64_t value=0;
metadata->their_last_ack_offset = reader->record_end_offset;
if (unpack_uint(reader->record, reader->record_length, &value) != -1){
metadata->their_last_ack = value;
}
message_ply_parse_ack(reader,
&metadata->their_last_ack,
NULL, NULL);
DEBUGF(meshms, "Found their last ack @%"PRIu64" = %"PRIu64,
metadata->their_last_ack_offset, metadata->their_last_ack);
}
@ -244,11 +243,8 @@ static enum meshms_status update_my_stats(struct meshms_metadata *metadata, stru
reader->read.offset = reader->read.length;
if (message_ply_find_prev(reader, MESSAGE_BLOCK_TYPE_ACK)==0){
uint64_t my_ack = 0;
if (unpack_uint(reader->record, reader->record_length, &my_ack) != -1){
metadata->my_last_ack = my_ack;
DEBUGF(meshms, "Found my last ack %"PRId64, my_ack);
}
message_ply_parse_ack(reader, &metadata->my_last_ack, NULL, NULL);
DEBUGF(meshms, "Found my last ack %"PRId64, metadata->my_last_ack);
}
metadata->my_size = ply->size;
message_ply_read_rewind(reader);
@ -794,8 +790,9 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->their_offset = iter->_their_reader.record_end_offset;
iter->text = NULL;
iter->text_length = 0;
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1)
iter->ack_offset = 0;
message_ply_parse_ack(&iter->_their_reader,
&iter->ack_offset,
NULL, NULL);
iter->read = 0;
return MESHMS_STATUS_UPDATED;
case MESSAGE_BLOCK_TYPE_MESSAGE:
@ -825,28 +822,25 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->which_ply = MY_PLY;
switch (iter->_my_reader.type) {
case MESSAGE_BLOCK_TYPE_TIME:
if (iter->_my_reader.record_length<4){
WARN("Malformed MeshMS2 ply journal, expected 4 byte timestamp");
if (message_ply_parse_timestamp(&iter->_my_reader, &iter->timestamp)!=0){
WARN("Malformed MeshMS2 ply journal, malformed timestamp");
return MESHMS_STATUS_PROTOCOL_FAULT;
}
iter->timestamp = read_uint32(iter->_my_reader.record);
DEBUGF(meshms, "Parsed timestamp %ds old", gettime() - iter->timestamp);
break;
case MESSAGE_BLOCK_TYPE_ACK:
// Read the received messages up to the ack'ed offset
if (iter->their_ply.found) {
iter->my_offset = iter->_my_reader.record_end_offset;
int ofs = unpack_uint(iter->_my_reader.record, iter->_my_reader.record_length, (uint64_t*)&iter->_their_reader.read.offset);
if (ofs == -1) {
if (message_ply_parse_ack(&iter->_my_reader,
(uint64_t*)&iter->_their_reader.read.offset,
&iter->_end_range,
NULL
) == -1){
WHYF("Malformed ACK");
return MESHMS_STATUS_PROTOCOL_FAULT;
}
uint64_t end_range;
int x = unpack_uint(iter->_my_reader.record + ofs, iter->_my_reader.record_length - ofs, &end_range);
if (x == -1)
iter->_end_range = 0;
else
iter->_end_range = iter->_their_reader.read.offset - end_range;
// TODO tail
iter->_in_ack = 1;
}

View File

@ -261,6 +261,44 @@ int message_ply_read_prev(struct message_ply_read *ply)
return 0;
}
int message_ply_parse_timestamp(struct message_ply_read *ply, time_s_t *timestamp)
{
assert(ply->type == MESSAGE_BLOCK_TYPE_TIME);
if (ply->record_length<4)
return -1;
*timestamp = read_uint32(ply->record);
return 0;
}
int message_ply_parse_ack(struct message_ply_read *ply, uint64_t *end_offset, uint64_t *start_offset, rhizome_bid_t **bid)
{
int ofs=0;
*end_offset=0;
if (start_offset)
*start_offset = 0;
if (bid)
*bid = NULL;
int r = unpack_uint(&ply->record[ofs], ply->record_length, end_offset);
if (r == -1)
return -1;
ofs+=r;
uint64_t length;
r = unpack_uint(&ply->record[ofs], ply->record_length - ofs, &length);
if (r == -1)
return 0;
if (start_offset)
*start_offset = *end_offset - length;
ofs += r;
if (bid){
if (ofs + sizeof(rhizome_bid_t) <= ply->record_length)
*bid = (rhizome_bid_t*)&ply->record[ofs];
}
return 0;
}
// keep reading past messages until you find this type.
int message_ply_find_prev(struct message_ply_read *ply, const char message_type)
{
@ -287,7 +325,7 @@ void message_ply_append_timestamp(struct overlay_buffer *b)
append_footer(b, MESSAGE_BLOCK_TYPE_TIME);
}
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid)
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, const rhizome_bid_t *bid)
{
ob_checkpoint(b);
ob_append_packed_ui64(b, message_offset);

View File

@ -50,6 +50,9 @@ int message_ply_find_prev(struct message_ply_read *ply, char type);
int message_ply_is_open(struct message_ply_read *ply);
void message_ply_read_rewind(struct message_ply_read *ply);
int message_ply_parse_timestamp(struct message_ply_read *ply, time_s_t *timestamp);
int message_ply_parse_ack(struct message_ply_read *ply, uint64_t *end_offset, uint64_t *start_offset, rhizome_bid_t **bid);
int message_ply_write_open(
struct message_ply_write *ply_write,
const struct keyring_identity *id,
@ -64,7 +67,7 @@ int message_ply_write_open(
int message_ply_write_finish(struct message_ply_write *write);
void message_ply_write_close(struct message_ply_write *write);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, rhizome_bid_t *bid);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset, const rhizome_bid_t *bid);
void message_ply_append_timestamp(struct overlay_buffer *b);
void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len);
int message_ply_append(const struct keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b,