From f1139d4c0ec7d7e418020b6b9bfc91b4aeecddd9 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 23 Jul 2013 14:21:46 +0930 Subject: [PATCH] Reduce meshms entry encoding size --- meshms.c | 136 +++++++++++++++++++++++++++-------------------- overlay_buffer.c | 34 ++++++++++++ serval.h | 4 ++ tests/meshms | 60 ++++++++++++++------- 4 files changed, 156 insertions(+), 78 deletions(-) diff --git a/meshms.c b/meshms.c index 337d633b..7f7995e2 100644 --- a/meshms.c +++ b/meshms.c @@ -33,6 +33,7 @@ struct ply_read{ uint64_t record_end_offset; uint16_t record_length; int buffer_size; + char type; unsigned char *buffer; }; @@ -150,7 +151,15 @@ static int create_ply(const char *my_sidhex, struct conversations *conv, rhizome return 0; } +static int append_footer(unsigned char *buffer, char type, int payload_len){ + payload_len = (payload_len << 4) | (type&0xF); + write_uint16(buffer, payload_len); + return 2; +} + static int ply_read_open(struct ply_read *ply, const char *id, rhizome_manifest *m){ + if (config.debug.meshms) + DEBUGF("Opening ply %s", id); if (rhizome_retrieve_manifest(id, m)) return -1; if (rhizome_open_decrypt_read(m, NULL, &ply->read, 0)) @@ -164,6 +173,7 @@ static int ply_read_close(struct ply_read *ply){ free(ply->buffer); ply->buffer=NULL; } + ply->buffer_size=0; return rhizome_read_close(&ply->read); } @@ -174,38 +184,45 @@ static int ply_read_next(struct ply_read *ply){ if (config.debug.meshms) DEBUGF("Attempting to read next record ending @%"PRId64,ply->read.offset); ply->record_end_offset=ply->read.offset; - ply->read.offset-=2; - if (ply->read.offset<=0) + unsigned char footer[2]; + ply->read.offset-=sizeof(footer); + if (ply->read.offset<=0){ + if (config.debug.meshms) + DEBUGF("EOF"); return 1; - unsigned char offset[2]; - if (rhizome_read(&ply->read, offset, sizeof(offset))!=2) + } + if (rhizome_read(&ply->read, footer, sizeof(footer))!=sizeof(footer)) return -1; // (rhizome_read automatically advances the offset by the number of bytes read) + ply->record_length=read_uint16(footer); + ply->type = ply->record_length & 0xF; + ply->record_length = ply->record_length>>4; - ply->record_length=read_uint16(offset); if (config.debug.meshms) - DEBUGF("Found record length %d", ply->record_length); + DEBUGF("Found record %d, length %d", ply->type, ply->record_length); // need to allow for advancing the tail and cutting a message in half. - if (ply->record_length > ply->read.offset-2) + if (ply->record_length + sizeof(footer) > ply->read.offset){ + if (config.debug.meshms) + DEBUGF("EOF"); return 1; - - uint64_t record_start = ply->read.offset -= ply->record_length + 5; - if (ply->buffer_size < ply->record_length +3){ - ply->buffer_size = ply->record_length +3; + } + + ply->read.offset -= ply->record_length + sizeof(footer); + uint64_t record_start = ply->read.offset; + + if (ply->buffer_size < ply->record_length){ + ply->buffer_size = ply->record_length; unsigned char *b=realloc(ply->buffer, ply->buffer_size); if (!b) return WHY("realloc() failed"); ply->buffer = b; } - if (rhizome_read(&ply->read, ply->buffer, ply->record_length +3)!=ply->record_length +3) - return -1; + int read = rhizome_read(&ply->read, ply->buffer, ply->record_length); + if (read!=ply->record_length) + return WHYF("Expected %d bytes read, got %d", ply->record_length, read); - uint16_t length_check = read_uint16(ply->buffer); - if (length_check != ply->record_length) - return WHYF("Length check failed, expected %u found %u @%"PRId64, - ply->record_length, length_check, record_start); ply->read.offset = record_start; return 0; } @@ -213,10 +230,8 @@ static int ply_read_next(struct ply_read *ply){ static int ply_find_next(struct ply_read *ply, char type){ while(1){ int ret = ply_read_next(ply); - if (ret) + if (ret || ply->type==type) return ret; - if (ply->buffer[2]==type) - return 0; } } @@ -290,12 +305,12 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv DEBUGF("Found last message @%"PRId64, last_message_offset); ply_read_close(&ply); - // find our last ack - uint64_t last_ack = 0; + // find our previous ack + uint64_t previous_ack = 0; if (conv->found_my_ply){ if (config.debug.meshms) - DEBUG("Locating our last ack"); + DEBUG("Locating our previous ack"); m_ours = rhizome_new_manifest(); if (!m_ours) @@ -309,18 +324,20 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv ret = ply_find_next(&ply, MESHMS_BLOCK_TYPE_ACK); if (ret<0) goto end; + if (ret==0){ - last_ack = read_uint64(&ply.buffer[3]); - if (config.debug.meshms) - DEBUGF("Found last ack for %"PRId64, last_ack); + if (unpack_uint(ply.buffer, ply.record_length, &previous_ack)<0) + previous_ack=0; } + if (config.debug.meshms) + DEBUGF("Previous ack is %"PRId64, previous_ack); ply_read_close(&ply); }else{ if (config.debug.meshms) DEBUGF("No outgoing ply"); } - if (last_ack >= last_message_offset){ + if (previous_ack >= last_message_offset){ // their last message has already been acked ret=0; goto end; @@ -329,17 +346,14 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv // append an ack for their message // TODO shorter format here? if (config.debug.meshms) - DEBUGF("Creating ACK for %"PRId64" - %"PRId64, last_ack, last_message_offset); - unsigned char buffer[5+8+8]; - int ofs=2; - buffer[ofs++]=MESHMS_BLOCK_TYPE_ACK; - write_uint64(&buffer[ofs], last_message_offset); - ofs+=8; - write_uint64(&buffer[ofs], last_ack); - ofs+=8; - write_uint16(&buffer[0], ofs - 3); - write_uint16(&buffer[ofs], ofs - 3); - ofs+=2; + DEBUGF("Creating ACK for %"PRId64" - %"PRId64, previous_ack, last_message_offset); + + unsigned char buffer[24]; + int ofs=0; + ofs+=pack_uint(&buffer[ofs], last_message_offset); + if (previous_ack) + ofs+=pack_uint(&buffer[ofs], last_message_offset - previous_ack); + ofs+=append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_ACK, ofs); ret = append_meshms_buffer(my_sidhex, conv, buffer, ofs); end: @@ -439,17 +453,10 @@ int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context int message_len = strlen(message)+1; // TODO, new format here. - unsigned char buffer[message_len+13]; - int ofs=2; - buffer[ofs++]=MESHMS_BLOCK_TYPE_MESSAGE; - write_uint64(&buffer[ofs], 0);//timestamp - ofs+=8; - strcpy((char*)&buffer[ofs], message); // message - ofs+=message_len; - write_uint16(&buffer[0], ofs - 3); - write_uint16(&buffer[ofs], ofs - 3); - ofs+=2; - int ret = append_meshms_buffer(my_sidhex, conv, buffer, ofs); + unsigned char buffer[message_len+3]; + strcpy((char*)buffer, message); // message + message_len+=append_footer(buffer+message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len); + int ret = append_meshms_buffer(my_sidhex, conv, buffer, message_len); free_conversations(conv); return ret; @@ -513,25 +520,38 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context // find their last ACK so we know if messages have been received int r = ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_ACK); - if (r==0) - their_last_ack = read_uint64(&read_theirs.buffer[3]); + if (r==0){ + if (unpack_uint(read_theirs.buffer, read_theirs.record_length, &their_last_ack)<0) + their_last_ack=0; + if (config.debug.meshms) + DEBUGF("Found their last ack @%"PRId64, their_last_ack); + } } int id=0; while(ply_read_next(&read_ours)==0){ - char type = read_ours.buffer[2]; if (config.debug.meshms) - DEBUGF("%"PRId64", found %d", read_ours.read.offset, type); - switch(type){ + DEBUGF("%"PRId64", found %d", read_ours.read.offset, read_ours.type); + switch(read_ours.type){ case MESHMS_BLOCK_TYPE_ACK: // read their message list, and insert all messages that are included in the ack range if (conv->found_their_ply){ - read_theirs.read.offset = read_uint64(&read_ours.buffer[3]); + int ofs=unpack_uint(read_ours.buffer, read_ours.record_length, (uint64_t*)&read_theirs.read.offset); + if (ofs<0) + break; + uint64_t end_range; + int x = unpack_uint(read_ours.buffer+ofs, read_ours.record_length - ofs, &end_range); + if (x<0) + end_range=0; + else + end_range = read_theirs.read.offset - end_range; + // TODO tail // just incase we don't have the full bundle anymore if (read_theirs.read.offset > read_theirs.read.length) read_theirs.read.offset = read_theirs.read.length; - uint64_t end_range = read_uint64(&read_ours.buffer[3+8]); + if (config.debug.meshms) + DEBUGF("Reading other log from %"PRId64", to %"PRId64, read_theirs.read.offset, end_range); while(ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_MESSAGE)==0){ if (read_theirs.read.offset < end_range) break; @@ -539,7 +559,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context cli_put_long(context, read_theirs.read.offset, ":"); cli_put_string(context, their_sidhex, ":"); cli_put_string(context, "read", ":"); - cli_put_string(context, (char *)&read_theirs.buffer[11], "\n"); + cli_put_string(context, (char *)read_theirs.buffer, "\n"); } } break; @@ -549,7 +569,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context cli_put_long(context, read_ours.read.offset, ":"); cli_put_string(context, my_sidhex, ":"); cli_put_string(context, their_last_ack >= read_ours.record_end_offset ? "delivered":"", ":"); - cli_put_string(context, (char *)&read_ours.buffer[11], "\n"); + cli_put_string(context, (char *)read_ours.buffer, "\n"); break; } } diff --git a/overlay_buffer.c b/overlay_buffer.c index a2dd6429..1b54e292 100644 --- a/overlay_buffer.c +++ b/overlay_buffer.c @@ -276,6 +276,40 @@ int ob_append_ui64(struct overlay_buffer *b, uint64_t v) return 0; } +int measure_packed_uint(uint64_t v){ + int ret=1; + while(v){ + v>>=7; + ret++; + } + return ret; +} + +int pack_uint(unsigned char *buffer, uint64_t v){ + int ret=0; + do{ + *buffer++=(v&0x7f) | (v>0x7f?0x80:0); + v>>=7; + ret++; + }while(v); + return ret; +} + +int unpack_uint(unsigned char *buffer, int buff_size, uint64_t *v){ + int i=0; + *v=0; + while(1){ + if (i>=buff_size) + return -1; + char byte = buffer[i]; + *v |= (byte&0x7f)<<(i*7); + i++; + if (!(byte&0x80)) + break; + } + return i; +} + int ob_append_packed_ui32(struct overlay_buffer *b, uint32_t v) { do{ diff --git a/serval.h b/serval.h index 0d2ddd71..49a2e4cd 100644 --- a/serval.h +++ b/serval.h @@ -805,6 +805,10 @@ uint64_t read_uint64(unsigned char *o); uint32_t read_uint32(unsigned char *o); uint16_t read_uint16(unsigned char *o); +int pack_uint(unsigned char *buffer, uint64_t v); +int measure_packed_uint(uint64_t v); +int unpack_uint(unsigned char *buffer, int buff_size, uint64_t *v); + int slip_encode(int format, unsigned char *src, int src_bytes, unsigned char *dst, int dst_len); int slip_decode(struct slip_decode_state *state); diff --git a/tests/meshms b/tests/meshms index 9b8432a6..44483f11 100755 --- a/tests/meshms +++ b/tests/meshms @@ -65,43 +65,63 @@ setup_AddMessages() { set_instance +A create_identities 2 executeOk_servald config \ - set debug.rhizome on \ set debug.meshms on \ set log.console.level debug } test_AddMessages() { - executeOk_servald meshms list messages $SIDA1 $SIDA2 - assertStdoutLineCount '==' 2 - executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi" + # 1. empty list executeOk_servald meshms list messages $SIDA1 $SIDA2 assertStdoutIs --stdout --line=1 -e '5\n' assertStdoutIs --stdout --line=2 -e '_id:offset:sender:status:message\n' - assertStdoutGrep --stdout --matches=1 "^0:0:$SIDA1::Hi\$" + assertStdoutLineCount '==' 2 + # 2. create a manifest with a single message and list it back + executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi" + executeOk_servald meshms list messages $SIDA1 $SIDA2 + assertStdoutGrep --stdout --matches=1 "::Hi\$" assertStdoutLineCount '==' 3 + # 3. append a second message and list them both executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you" executeOk_servald meshms list messages $SIDA1 $SIDA2 tfw_cat --stdout - assertStdoutGrep --stdout --matches=1 "^0:16:$SIDA1::How are you\$" - assertStdoutGrep --stdout --matches=1 "^1:0:$SIDA1::Hi\$" + assertStdoutGrep --stdout --matches=1 "::How are you\$" + assertStdoutGrep --stdout --matches=1 "::Hi\$" assertStdoutLineCount '==' 4 + # 4. list the messages from the receivers point of view (which ACKs them) executeOk_servald meshms list messages $SIDA2 $SIDA1 tfw_cat --stdout - assertStdoutGrep --stdout --matches=1 "^0:16:$SIDA1:read:How are you\$" - assertStdoutGrep --stdout --matches=1 "^1:0:$SIDA1:read:Hi\$" + assertStdoutGrep --stdout --matches=1 ":How are you\$" + assertStdoutGrep --stdout --matches=1 ":Hi\$" assertStdoutLineCount '==' 4 - executeOk_servald meshms send message $SIDA2 $SIDA1 "Hello fine" - executeOk_servald meshms list messages $SIDA2 $SIDA1 - tfw_cat --stdout - assertStdoutGrep --stdout --matches=1 "^0:21:$SIDA2::Hello fine\$" - assertStdoutGrep --stdout --matches=1 "^1:16:$SIDA1:read:How are you\$" - assertStdoutGrep --stdout --matches=1 "^2:0:$SIDA1:read:Hi\$" - assertStdoutLineCount '==' 5 + # 5. list messages from the senders point of view after they have been delivered executeOk_servald meshms list messages $SIDA1 $SIDA2 - assertStdoutGrep --stdout --matches=1 "^0:21:$SIDA2:read:Hello fine\$" - assertStdoutGrep --stdout --matches=1 "^1:16:$SIDA1:delivered:How are you\$" - assertStdoutGrep --stdout --matches=1 "^2:0:$SIDA1:delivered:Hi\$" - assertStdoutLineCount '==' 5 tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 ":delivered:How are you\$" + assertStdoutGrep --stdout --matches=1 ":delivered:Hi\$" + assertStdoutLineCount '==' 4 + # 6. both parties add more messages without acking + executeOk_servald meshms send message $SIDA2 $SIDA1 "Help Im trapped in a test case factory" + executeOk_servald meshms send message $SIDA2 $SIDA1 "Never mind" + executeOk_servald meshms send message $SIDA1 $SIDA2 "Hello can you hear me" + executeOk_servald meshms send message $SIDA1 $SIDA2 "Still waiting" + # 7. each person see's the final threaded message list in a different order + executeOk_servald meshms list messages $SIDA2 $SIDA1 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:43:$SIDA1:read:Still waiting\$" + assertStdoutGrep --stdout --matches=1 "^1:19:$SIDA1:read:Hello can you hear me\$" + assertStdoutGrep --stdout --matches=1 "^2:44:$SIDA2::Never mind\$" + assertStdoutGrep --stdout --matches=1 "^3:3:$SIDA2::Help Im trapped in a test case factory\$" + assertStdoutGrep --stdout --matches=1 "^4:5:$SIDA1:read:How are you\$" + assertStdoutGrep --stdout --matches=1 "^5:0:$SIDA1:read:Hi\$" + assertStdoutLineCount '==' 8 + executeOk_servald meshms list messages $SIDA1 $SIDA2 + tfw_cat --stdout + assertStdoutGrep --stdout --matches=1 "^0:44:$SIDA2:read:Never mind\$" + assertStdoutGrep --stdout --matches=1 "^1:3:$SIDA2:read:Help Im trapped in a test case factory\$" + assertStdoutGrep --stdout --matches=1 "^2:43:$SIDA1:delivered:Still waiting\$" + assertStdoutGrep --stdout --matches=1 "^3:19:$SIDA1:delivered:Hello can you hear me\$" + assertStdoutGrep --stdout --matches=1 "^4:5:$SIDA1:delivered:How are you\$" + assertStdoutGrep --stdout --matches=1 "^5:0:$SIDA1:delivered:Hi\$" + assertStdoutLineCount '==' 8 } runTests "$@"