Reduce meshms entry encoding size

This commit is contained in:
Jeremy Lakeman 2013-07-23 14:21:46 +09:30
parent ddfb7d9417
commit f1139d4c0e
4 changed files with 156 additions and 78 deletions

134
meshms.c
View File

@ -33,6 +33,7 @@ struct ply_read{
uint64_t record_end_offset;
uint16_t record_length;
int buffer_size;
char type;
unsigned char *buffer;
};
@ -150,7 +151,15 @@ static int create_ply(const char *my_sidhex, struct conversations *conv, rhizome
return 0;
}
static int append_footer(unsigned char *buffer, char type, int payload_len){
payload_len = (payload_len << 4) | (type&0xF);
write_uint16(buffer, payload_len);
return 2;
}
static int ply_read_open(struct ply_read *ply, const char *id, rhizome_manifest *m){
if (config.debug.meshms)
DEBUGF("Opening ply %s", id);
if (rhizome_retrieve_manifest(id, m))
return -1;
if (rhizome_open_decrypt_read(m, NULL, &ply->read, 0))
@ -164,6 +173,7 @@ static int ply_read_close(struct ply_read *ply){
free(ply->buffer);
ply->buffer=NULL;
}
ply->buffer_size=0;
return rhizome_read_close(&ply->read);
}
@ -174,38 +184,45 @@ static int ply_read_next(struct ply_read *ply){
if (config.debug.meshms)
DEBUGF("Attempting to read next record ending @%"PRId64,ply->read.offset);
ply->record_end_offset=ply->read.offset;
ply->read.offset-=2;
if (ply->read.offset<=0)
unsigned char footer[2];
ply->read.offset-=sizeof(footer);
if (ply->read.offset<=0){
if (config.debug.meshms)
DEBUGF("EOF");
return 1;
unsigned char offset[2];
if (rhizome_read(&ply->read, offset, sizeof(offset))!=2)
}
if (rhizome_read(&ply->read, footer, sizeof(footer))!=sizeof(footer))
return -1;
// (rhizome_read automatically advances the offset by the number of bytes read)
ply->record_length=read_uint16(footer);
ply->type = ply->record_length & 0xF;
ply->record_length = ply->record_length>>4;
ply->record_length=read_uint16(offset);
if (config.debug.meshms)
DEBUGF("Found record length %d", ply->record_length);
DEBUGF("Found record %d, length %d", ply->type, ply->record_length);
// need to allow for advancing the tail and cutting a message in half.
if (ply->record_length > ply->read.offset-2)
if (ply->record_length + sizeof(footer) > ply->read.offset){
if (config.debug.meshms)
DEBUGF("EOF");
return 1;
}
uint64_t record_start = ply->read.offset -= ply->record_length + 5;
if (ply->buffer_size < ply->record_length +3){
ply->buffer_size = ply->record_length +3;
ply->read.offset -= ply->record_length + sizeof(footer);
uint64_t record_start = ply->read.offset;
if (ply->buffer_size < ply->record_length){
ply->buffer_size = ply->record_length;
unsigned char *b=realloc(ply->buffer, ply->buffer_size);
if (!b)
return WHY("realloc() failed");
ply->buffer = b;
}
if (rhizome_read(&ply->read, ply->buffer, ply->record_length +3)!=ply->record_length +3)
return -1;
int read = rhizome_read(&ply->read, ply->buffer, ply->record_length);
if (read!=ply->record_length)
return WHYF("Expected %d bytes read, got %d", ply->record_length, read);
uint16_t length_check = read_uint16(ply->buffer);
if (length_check != ply->record_length)
return WHYF("Length check failed, expected %u found %u @%"PRId64,
ply->record_length, length_check, record_start);
ply->read.offset = record_start;
return 0;
}
@ -213,10 +230,8 @@ static int ply_read_next(struct ply_read *ply){
static int ply_find_next(struct ply_read *ply, char type){
while(1){
int ret = ply_read_next(ply);
if (ret)
if (ret || ply->type==type)
return ret;
if (ply->buffer[2]==type)
return 0;
}
}
@ -290,12 +305,12 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv
DEBUGF("Found last message @%"PRId64, last_message_offset);
ply_read_close(&ply);
// find our last ack
uint64_t last_ack = 0;
// find our previous ack
uint64_t previous_ack = 0;
if (conv->found_my_ply){
if (config.debug.meshms)
DEBUG("Locating our last ack");
DEBUG("Locating our previous ack");
m_ours = rhizome_new_manifest();
if (!m_ours)
@ -309,18 +324,20 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv
ret = ply_find_next(&ply, MESHMS_BLOCK_TYPE_ACK);
if (ret<0)
goto end;
if (ret==0){
last_ack = read_uint64(&ply.buffer[3]);
if (config.debug.meshms)
DEBUGF("Found last ack for %"PRId64, last_ack);
if (unpack_uint(ply.buffer, ply.record_length, &previous_ack)<0)
previous_ack=0;
}
if (config.debug.meshms)
DEBUGF("Previous ack is %"PRId64, previous_ack);
ply_read_close(&ply);
}else{
if (config.debug.meshms)
DEBUGF("No outgoing ply");
}
if (last_ack >= last_message_offset){
if (previous_ack >= last_message_offset){
// their last message has already been acked
ret=0;
goto end;
@ -329,17 +346,14 @@ static int update_conversation(const char *my_sidhex, struct conversations *conv
// append an ack for their message
// TODO shorter format here?
if (config.debug.meshms)
DEBUGF("Creating ACK for %"PRId64" - %"PRId64, last_ack, last_message_offset);
unsigned char buffer[5+8+8];
int ofs=2;
buffer[ofs++]=MESHMS_BLOCK_TYPE_ACK;
write_uint64(&buffer[ofs], last_message_offset);
ofs+=8;
write_uint64(&buffer[ofs], last_ack);
ofs+=8;
write_uint16(&buffer[0], ofs - 3);
write_uint16(&buffer[ofs], ofs - 3);
ofs+=2;
DEBUGF("Creating ACK for %"PRId64" - %"PRId64, previous_ack, last_message_offset);
unsigned char buffer[24];
int ofs=0;
ofs+=pack_uint(&buffer[ofs], last_message_offset);
if (previous_ack)
ofs+=pack_uint(&buffer[ofs], last_message_offset - previous_ack);
ofs+=append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_ACK, ofs);
ret = append_meshms_buffer(my_sidhex, conv, buffer, ofs);
end:
@ -439,17 +453,10 @@ int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context
int message_len = strlen(message)+1;
// TODO, new format here.
unsigned char buffer[message_len+13];
int ofs=2;
buffer[ofs++]=MESHMS_BLOCK_TYPE_MESSAGE;
write_uint64(&buffer[ofs], 0);//timestamp
ofs+=8;
strcpy((char*)&buffer[ofs], message); // message
ofs+=message_len;
write_uint16(&buffer[0], ofs - 3);
write_uint16(&buffer[ofs], ofs - 3);
ofs+=2;
int ret = append_meshms_buffer(my_sidhex, conv, buffer, ofs);
unsigned char buffer[message_len+3];
strcpy((char*)buffer, message); // message
message_len+=append_footer(buffer+message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len);
int ret = append_meshms_buffer(my_sidhex, conv, buffer, message_len);
free_conversations(conv);
return ret;
@ -513,25 +520,38 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
// find their last ACK so we know if messages have been received
int r = ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_ACK);
if (r==0)
their_last_ack = read_uint64(&read_theirs.buffer[3]);
if (r==0){
if (unpack_uint(read_theirs.buffer, read_theirs.record_length, &their_last_ack)<0)
their_last_ack=0;
if (config.debug.meshms)
DEBUGF("Found their last ack @%"PRId64, their_last_ack);
}
}
int id=0;
while(ply_read_next(&read_ours)==0){
char type = read_ours.buffer[2];
if (config.debug.meshms)
DEBUGF("%"PRId64", found %d", read_ours.read.offset, type);
switch(type){
DEBUGF("%"PRId64", found %d", read_ours.read.offset, read_ours.type);
switch(read_ours.type){
case MESHMS_BLOCK_TYPE_ACK:
// read their message list, and insert all messages that are included in the ack range
if (conv->found_their_ply){
read_theirs.read.offset = read_uint64(&read_ours.buffer[3]);
int ofs=unpack_uint(read_ours.buffer, read_ours.record_length, (uint64_t*)&read_theirs.read.offset);
if (ofs<0)
break;
uint64_t end_range;
int x = unpack_uint(read_ours.buffer+ofs, read_ours.record_length - ofs, &end_range);
if (x<0)
end_range=0;
else
end_range = read_theirs.read.offset - end_range;
// TODO tail
// just incase we don't have the full bundle anymore
if (read_theirs.read.offset > read_theirs.read.length)
read_theirs.read.offset = read_theirs.read.length;
uint64_t end_range = read_uint64(&read_ours.buffer[3+8]);
if (config.debug.meshms)
DEBUGF("Reading other log from %"PRId64", to %"PRId64, read_theirs.read.offset, end_range);
while(ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_MESSAGE)==0){
if (read_theirs.read.offset < end_range)
break;
@ -539,7 +559,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
cli_put_long(context, read_theirs.read.offset, ":");
cli_put_string(context, their_sidhex, ":");
cli_put_string(context, "read", ":");
cli_put_string(context, (char *)&read_theirs.buffer[11], "\n");
cli_put_string(context, (char *)read_theirs.buffer, "\n");
}
}
break;
@ -549,7 +569,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
cli_put_long(context, read_ours.read.offset, ":");
cli_put_string(context, my_sidhex, ":");
cli_put_string(context, their_last_ack >= read_ours.record_end_offset ? "delivered":"", ":");
cli_put_string(context, (char *)&read_ours.buffer[11], "\n");
cli_put_string(context, (char *)read_ours.buffer, "\n");
break;
}
}

View File

@ -276,6 +276,40 @@ int ob_append_ui64(struct overlay_buffer *b, uint64_t v)
return 0;
}
int measure_packed_uint(uint64_t v){
int ret=1;
while(v){
v>>=7;
ret++;
}
return ret;
}
int pack_uint(unsigned char *buffer, uint64_t v){
int ret=0;
do{
*buffer++=(v&0x7f) | (v>0x7f?0x80:0);
v>>=7;
ret++;
}while(v);
return ret;
}
int unpack_uint(unsigned char *buffer, int buff_size, uint64_t *v){
int i=0;
*v=0;
while(1){
if (i>=buff_size)
return -1;
char byte = buffer[i];
*v |= (byte&0x7f)<<(i*7);
i++;
if (!(byte&0x80))
break;
}
return i;
}
int ob_append_packed_ui32(struct overlay_buffer *b, uint32_t v)
{
do{

View File

@ -805,6 +805,10 @@ uint64_t read_uint64(unsigned char *o);
uint32_t read_uint32(unsigned char *o);
uint16_t read_uint16(unsigned char *o);
int pack_uint(unsigned char *buffer, uint64_t v);
int measure_packed_uint(uint64_t v);
int unpack_uint(unsigned char *buffer, int buff_size, uint64_t *v);
int slip_encode(int format,
unsigned char *src, int src_bytes, unsigned char *dst, int dst_len);
int slip_decode(struct slip_decode_state *state);

View File

@ -65,43 +65,63 @@ setup_AddMessages() {
set_instance +A
create_identities 2
executeOk_servald config \
set debug.rhizome on \
set debug.meshms on \
set log.console.level debug
}
test_AddMessages() {
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutLineCount '==' 2
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
# 1. empty list
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutIs --stdout --line=1 -e '5\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:sender:status:message\n'
assertStdoutGrep --stdout --matches=1 "^0:0:$SIDA1::Hi\$"
assertStdoutLineCount '==' 2
# 2. create a manifest with a single message and list it back
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 "::Hi\$"
assertStdoutLineCount '==' 3
# 3. append a second message and list them both
executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you"
executeOk_servald meshms list messages $SIDA1 $SIDA2
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:16:$SIDA1::How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:0:$SIDA1::Hi\$"
assertStdoutGrep --stdout --matches=1 "::How are you\$"
assertStdoutGrep --stdout --matches=1 "::Hi\$"
assertStdoutLineCount '==' 4
# 4. list the messages from the receivers point of view (which ACKs them)
executeOk_servald meshms list messages $SIDA2 $SIDA1
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:16:$SIDA1:read:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:0:$SIDA1:read:Hi\$"
assertStdoutGrep --stdout --matches=1 ":How are you\$"
assertStdoutGrep --stdout --matches=1 ":Hi\$"
assertStdoutLineCount '==' 4
executeOk_servald meshms send message $SIDA2 $SIDA1 "Hello fine"
executeOk_servald meshms list messages $SIDA2 $SIDA1
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:21:$SIDA2::Hello fine\$"
assertStdoutGrep --stdout --matches=1 "^1:16:$SIDA1:read:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:0:$SIDA1:read:Hi\$"
assertStdoutLineCount '==' 5
# 5. list messages from the senders point of view after they have been delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 "^0:21:$SIDA2:read:Hello fine\$"
assertStdoutGrep --stdout --matches=1 "^1:16:$SIDA1:delivered:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:0:$SIDA1:delivered:Hi\$"
assertStdoutLineCount '==' 5
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 ":delivered:How are you\$"
assertStdoutGrep --stdout --matches=1 ":delivered:Hi\$"
assertStdoutLineCount '==' 4
# 6. both parties add more messages without acking
executeOk_servald meshms send message $SIDA2 $SIDA1 "Help Im trapped in a test case factory"
executeOk_servald meshms send message $SIDA2 $SIDA1 "Never mind"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hello can you hear me"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Still waiting"
# 7. each person see's the final threaded message list in a different order
executeOk_servald meshms list messages $SIDA2 $SIDA1
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:43:$SIDA1:read:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^1:19:$SIDA1:read:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^2:44:$SIDA2::Never mind\$"
assertStdoutGrep --stdout --matches=1 "^3:3:$SIDA2::Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^4:5:$SIDA1:read:How are you\$"
assertStdoutGrep --stdout --matches=1 "^5:0:$SIDA1:read:Hi\$"
assertStdoutLineCount '==' 8
executeOk_servald meshms list messages $SIDA1 $SIDA2
tfw_cat --stdout
assertStdoutGrep --stdout --matches=1 "^0:44:$SIDA2:read:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^1:3:$SIDA2:read:Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^2:43:$SIDA1:delivered:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^3:19:$SIDA1:delivered:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^4:5:$SIDA1:delivered:How are you\$"
assertStdoutGrep --stdout --matches=1 "^5:0:$SIDA1:delivered:Hi\$"
assertStdoutLineCount '==' 8
}
runTests "$@"