meshms merged thread display substantial progress

This commit is contained in:
gardners 2013-05-09 20:21:21 +09:30
parent 9fcc23f43d
commit fb2bb43cad
5 changed files with 209 additions and 41 deletions

View File

@ -2363,8 +2363,10 @@ struct cli_schema command_line_options[]={
{app_meshms_read_messagelog,{"meshms","read", "messagelog" KEYRING_PIN_OPTIONS,
"[<manifestid>]",NULL},CLIFLAG_STANDALONE,
"List messages between a sender and a recipient"},
{app_meshms_add_message,{"meshms","add","message" KEYRING_PIN_OPTIONS, "[<sender_did>]","[<recipient_did>]","[<sender_sid>]","[<recipient_sid>]","[<payload>]",NULL},CLIFLAG_STANDALONE,
"Add a file to Rhizome and optionally write its manifest to the given path"},
{app_meshms_list_messages,{"meshms","list","messages" KEYRING_PIN_OPTIONS, "<sender_sid>","<recipient_sid>",NULL},CLIFLAG_STANDALONE,
"Create a MeshMS from <sender_sid> to <recipient_sid>"},
{app_meshms_add_message,{"meshms","add","message" KEYRING_PIN_OPTIONS, "[<sender_did>]","[<recipient_did>]","<sender_sid>","<recipient_sid>","<payload>",NULL},CLIFLAG_STANDALONE,
"Create a MeshMS from <sender_sid> to <recipient_sid>"},
{app_rhizome_append_manifest, {"rhizome", "append", "manifest", "<filepath>", "<manifestpath>", NULL}, CLIFLAG_STANDALONE,
"Append a manifest to the end of the file it belongs to."},
{app_rhizome_hash_file,{"rhizome","hash","file","<filepath>",NULL},CLIFLAG_STANDALONE,

161
meshms.c
View File

@ -27,13 +27,18 @@ int meshms_append_messageblock(const char *sender_sid,
const char *recipient_sid,
const unsigned char *buffer_serialize,
int length_int);
int decode_length_forwards(unsigned char *buffer,int *offset,int blength,
unsigned int *length);
int meshms_block_type(unsigned char *buffer,int offset, int blength);
int deserialize_ack(unsigned char *buffer,int *offset, int buffer_size,
int *ack_address);
rhizome_manifest *meshms_find_or_create_manifestid
(const char *sender_sid_hex,const char *recipient_sid_hex, int createP)
{
sid_t authorSid;
if (str_to_sid_t(&authorSid, sender_sid_hex)==-1)
{ WHYF("invalid sender_sid: %s", sender_sid_hex); return NULL; }
{ WHYF("invalid sender_sid: '%s'", sender_sid_hex); return NULL; }
// Get manifest structure to hold the manifest we find or create
rhizome_manifest *m = rhizome_new_manifest();
@ -290,7 +295,7 @@ int app_meshms_read_messagelog(const struct cli_parsed *parsed, void *context)
}
int app_meshms_list(const struct cli_parsed *parsed, void *context)
int app_meshms_list_messages(const struct cli_parsed *parsed, void *context)
{
if (create_serval_instance_dir() == -1)
return -1;
@ -301,49 +306,143 @@ int app_meshms_list(const struct cli_parsed *parsed, void *context)
if (config.debug.verbose)
DEBUG_cli_parsed(parsed);
//sender_sid = author_sid
const char *sender_sid, *recipient_sid;
//left_sid = author_sid
const char *left_sid, *right_sid;
// Parse mandatory arguments
cli_arg(parsed, "sender_sid", &sender_sid, cli_optional_sid, "");
cli_arg(parsed, "recipient_sid", &recipient_sid, cli_optional_sid, "");
cli_arg(parsed, "sender_sid", &left_sid, cli_optional_sid, "");
cli_arg(parsed, "recipient_sid", &right_sid, cli_optional_sid, "");
// Sanity check passed arguments
if ( (strcmp(sender_sid,"") == 0) || (strcmp(recipient_sid,"" ) == 0) )
if ( (strcmp(left_sid,"") == 0) || (strcmp(right_sid,"" ) == 0) )
{
cli_puts("One or more missing arguments"); cli_delim("\n");
}
sid_t aSid;
if (sender_sid[0] && str_to_sid_t(&aSid, sender_sid) == -1)
return WHYF("invalid sender_sid: %s", sender_sid);
if (recipient_sid[0] && str_to_sid_t(&aSid, recipient_sid) == -1)
return WHYF("invalid recipient_sid: %s", recipient_sid);
if (left_sid[0] && str_to_sid_t(&aSid, left_sid) == -1)
return WHYF("invalid left_sid: %s", left_sid);
if (right_sid[0] && str_to_sid_t(&aSid, right_sid) == -1)
return WHYF("invalid right_sid: %s", right_sid);
// Obtain message logs for both sides of the conversation, if available
rhizome_manifest *m_sender=NULL,*m_recipient=NULL;
m_sender=meshms_find_or_create_manifestid(sender_sid,recipient_sid,0);
m_recipient=meshms_find_or_create_manifestid(sender_sid,recipient_sid,0);
int sender_len=0, recipient_len=0;
unsigned char *sender_messages=NULL, *recipient_messages=NULL;
if (m_sender) {
sender_messages=malloc(m_sender->fileLength);
if (!sender_messages) {
WHYF("malloc(%d) failed while reading meshms logs",m_sender->fileLength);
rhizome_manifest *m_left=NULL,*m_right=NULL;
m_left=meshms_find_or_create_manifestid(left_sid,right_sid,0);
m_right=meshms_find_or_create_manifestid(left_sid,right_sid,0);
int left_len=0, right_len=0;
unsigned char *left_messages=NULL, *right_messages=NULL;
if (m_left) {
left_messages=malloc(m_left->fileLength);
if (!left_messages) {
WHYF("malloc(%d) failed while reading meshms logs",m_left->fileLength);
return -1;
}
if (!meshms_read_message(m_sender,sender_messages))
sender_len=m_sender->fileLength;
if (!meshms_read_message(m_left,left_messages))
left_len=m_left->fileLength;
}
if (m_recipient) {
recipient_messages=malloc(m_recipient->fileLength);
if (!recipient_messages) {
WHYF("malloc(%d) failed while reading meshms logs",m_recipient->fileLength);
if (m_right) {
right_messages=malloc(m_right->fileLength);
if (!right_messages) {
WHYF("malloc(%d) failed while reading meshms logs",m_right->fileLength);
return -1;
}
if (!meshms_read_message(m_recipient,recipient_messages))
recipient_len=m_recipient->fileLength;
if (!meshms_read_message(m_right,right_messages))
right_len=m_right->fileLength;
}
rhizome_manifest_free(m_sender); m_sender=NULL;
rhizome_manifest_free(m_recipient); m_recipient=NULL;
rhizome_manifest_free(m_left); m_left=NULL;
rhizome_manifest_free(m_right); m_right=NULL;
return -1;
DEBUGF("left_len=%d, right_len=%d",left_len,right_len);
#define MAX_MESSAGES_IN_THREAD 16384
int offsets[MAX_MESSAGES_IN_THREAD];
int sides[MAX_MESSAGES_IN_THREAD];
int message_count=0;
// Scan through messages and acks to generate forward-ordered list, and determine
// last message from left that has been ack'd by right. We will then traverse the
// list in reverse order to display the messages.
int right_ack_limit=0;
int left_ack=0, left_offset=0, right_offset=0;
for(left_offset=0;left_offset<left_len;)
{
for(;right_offset<=left_ack;)
{
unsigned int right_block_len;
int o=right_offset;
if (decode_length_forwards(right_messages,&o,right_len,
&right_block_len)) break;
int block_type=meshms_block_type(right_messages,right_offset,right_len);
switch(block_type) {
case RHIZOME_MESHMS_BLOCK_TYPE_BID_REFERENCE:
case RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE:
offsets[message_count]=right_offset;
sides[message_count++]=1;
break;
case RHIZOME_MESHMS_BLOCK_TYPE_ACK:
{
int o=right_offset;
deserialize_ack(right_messages,&o,right_len,&right_ack_limit);
}
break;
}
right_offset+=right_block_len;
if (message_count>=MAX_MESSAGES_IN_THREAD) break;
}
if (message_count>=MAX_MESSAGES_IN_THREAD) break;
int block_type=meshms_block_type(left_messages,left_offset,left_len);
switch(block_type) {
case RHIZOME_MESHMS_BLOCK_TYPE_BID_REFERENCE:
case RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE:
offsets[message_count]=right_offset;
sides[message_count++]=0;
break;
case RHIZOME_MESHMS_BLOCK_TYPE_ACK:
{
int o=right_offset;
deserialize_ack(right_messages,&o,right_len,&right_ack_limit);
}
break;
}
unsigned int left_block_len;
int o=left_offset;
if (decode_length_forwards(left_messages,&o,left_len,&left_block_len)) break;
left_offset+=left_block_len;
}
// Process any outstanding messages from the right side
for(;right_offset<=right_len;)
{
DEBUGF("right tail: right_offset=%d, right_len=%d",right_offset,right_len);
unsigned int right_block_len;
int o=right_offset;
if (decode_length_forwards(right_messages,&o,right_len,
&right_block_len)) {
break;
}
int block_type=meshms_block_type(right_messages,right_offset,right_len);
switch(block_type) {
case RHIZOME_MESHMS_BLOCK_TYPE_BID_REFERENCE:
case RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE:
offsets[message_count]=right_offset;
sides[message_count++]=1;
DEBUGF("Appended right tail message");
break;
case RHIZOME_MESHMS_BLOCK_TYPE_ACK:
{
int o=right_offset;
deserialize_ack(right_messages,&o,right_len,&right_ack_limit);
}
break;
}
right_offset+=right_block_len;
if (message_count>=MAX_MESSAGES_IN_THREAD) break;
}
// Display list of messages in reverse order
int i;
for(i=message_count-1;i>=0;i--)
{
DEBUGF("%s : 0x%08x",
sides[i]?"right":" left",offsets[i]);
}
return 0;
}

View File

@ -51,6 +51,10 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define RHIZOME_HTTP_PORT 4110
#define RHIZOME_HTTP_PORT_MAX 4150
#define RHIZOME_MESHMS_BLOCK_TYPE_ACK 0x01
#define RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE 0x02
#define RHIZOME_MESHMS_BLOCK_TYPE_BID_REFERENCE 0x03
typedef struct rhizome_bk_binary {
unsigned char binary[RHIZOME_BUNDLE_KEY_BYTES];
} rhizome_bk_t;

View File

@ -19,6 +19,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "serval.h"
#include "rhizome.h"
void hex_dump(unsigned char *data, int size)
{
@ -92,17 +93,24 @@ int encode_length_backwards(unsigned char *buffer,int *offset,
}
int decode_length_forwards(unsigned char *buffer,int *offset,
unsigned int buffer_length,
unsigned int *length)
{
*length=0;
if (*offset>=buffer_length) return -1;
switch(buffer[*offset]) {
case 0xff:
(*offset)++;
if (*offset>=buffer_length) return -1;
int i;
for(i=0;i<32;i+=8) (*length)|=buffer[(*offset)++]<<i;
for(i=0;i<32;i+=8) {
(*length)|=buffer[(*offset)++]<<i;
if (*offset>=buffer_length) return -1;
}
return 0;
default:
*length=buffer[(*offset)++];
if (*offset>=buffer_length) return -1;
return 0;
}
}
@ -112,6 +120,7 @@ int decode_length_backwards(unsigned char *buffer,int offset,
{
// it is assumed that offset points to the first byte after the end
// of the length field.
if (offset<=0) return -1;
offset--;
*length=0;
switch(buffer[offset]) {
@ -126,6 +135,20 @@ int decode_length_backwards(unsigned char *buffer,int offset,
}
}
int pack_int(unsigned char *buffer,int *offset,int32_t v)
{
int i;
for(i=0;i<32;i+=8) { buffer[(*offset)++]=(v>>i)&0xff; }
return 0;
}
int unpack_int(unsigned char *buffer,int *offset,int32_t *v)
{
int i;
*v=0;
for(i=0;i<32;i+=8) (*v)|=(((uint32_t)buffer[(*offset)++])<<i);
return 0;
}
int pack_time(unsigned char *buffer,int *offset,uint64_t time)
{
@ -142,7 +165,6 @@ int unpack_time(unsigned char *buffer,int *offset,uint64_t *time)
return 0;
}
unsigned char did_chars[16]="0123456789+#*abX";
int unpack_did(unsigned char *buffer,int *offset,char *did_out)
@ -186,11 +208,21 @@ int pack_did(unsigned char *buffer,int *offset,const char *did)
return -1; // number too long
}
int serialize_ack(unsigned char *buffer,int *offset,int ack_address)
{
encode_length_forwards(buffer,offset,1+1+4+1);
buffer[(*offset)++]=RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE;
pack_int(buffer,offset,ack_address);
encode_length_backwards(buffer,offset,1+1+4+1);
return 0;
}
int serialize_meshms(unsigned char *buffer,int *offset,unsigned int length,const char *sender_did,const char *recipient_did, unsigned long long time, const char *payload, int payload_length)
{
int ret = 0;
encode_length_forwards(buffer,offset,length);
buffer[(*offset)++]=RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE;
pack_did(buffer,offset,sender_did);
pack_did(buffer,offset,recipient_did);
pack_time(buffer,offset,time);
@ -207,6 +239,29 @@ int serialize_meshms(unsigned char *buffer,int *offset,unsigned int length,const
return ret;
}
int meshms_block_type(unsigned char *buffer,int offset, int blength)
{
unsigned int length=0;
if (offset>=blength) return -1;
decode_length_forwards(buffer,&offset,blength,&length);
if (offset>=blength) return -1;
unsigned char block_type=buffer[offset];
return block_type;
}
int deserialize_ack(unsigned char *buffer,int *offset, int buffer_size,
int *ack_address)
{
unsigned int length=0;
int length_length=*offset;
decode_length_forwards(buffer,offset,buffer_size,&length);
length_length=(*offset)-length_length;
unsigned char block_type=buffer[(*offset)++];
if (block_type!=RHIZOME_MESHMS_BLOCK_TYPE_ACK) return -1;
unpack_int(buffer,offset,ack_address);
(*offset)+=length_length;
return 0;
}
int deserialize_meshms(unsigned char *buffer,int *offset, int buffer_size)
{
@ -225,10 +280,17 @@ int deserialize_meshms(unsigned char *buffer,int *offset, int buffer_size)
cli_printf("%d",*offset);cli_puts("|");
int length_length=*offset;
decode_length_forwards(buffer,offset,&length);
decode_length_forwards(buffer,offset,buffer_size,&length);
length_length=(*offset)-length_length;
cli_printf("%d",length);cli_puts("|");
unsigned char block_type=buffer[(*offset)++];
if (block_type!=RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE) {
WHYF("Corrupt meshms message block: type=0x%02x instead of 0x%02x",
block_type,RHIZOME_MESHMS_BLOCK_TYPE_MESSAGE);
return -1;
}
char sender_did_out[64];
unpack_did(buffer,offset,sender_did_out);
cli_printf("%s",sender_did_out);cli_puts("|");
@ -293,21 +355,21 @@ int main(int argc,char **argv)
encode_length_backwards(buffer,&offset,123);
for(i=0;i<offset;i++) printf("%02x\n",buffer[i]);
unsigned int length;
decode_length_backwards(buffer,offset,&length);
decode_length_backwards(buffer,offset,buffer_size,&length);
printf("decoding reverse-encoded length=123 yields %d (offset was %d, should be 1)\n",
length,offset);
offset=0;
encode_length_backwards(buffer,&offset,0x1234567);
for(i=0;i<offset;i++) printf("%02x\n",buffer[i]);
decode_length_backwards(buffer,offset,&length);
decode_length_backwards(buffer,offset,buffer_size,&length);
printf("decoding reverse-encoded length=0x1234567 yields 0x%x (offset was %d, should be 5)\n",
length,offset);
offset=0;
encode_length_forwards(buffer,&offset,123);
for(i=0;i<offset;i++) printf("%02x\n",buffer[i]);
decode_length_forwards(buffer,&offset,&length);
decode_length_forwards(buffer,&offset,buffer_size,&length);
offset=0;
printf("decoding forward-encoded length=123 yields %d (offset is %d, should be 1)\n",
length,offset);
@ -316,7 +378,7 @@ int main(int argc,char **argv)
encode_length_forwards(buffer,&offset,0x1234567);
for(i=0;i<offset;i++) printf("%02x\n",buffer[i]);
offset=0;
decode_length_forwards(buffer,&offset,&length);
decode_length_forwards(buffer,&offset,buffer_size,&length);
printf("decoding reverse-encoded length=0x1234567 yields 0x%x (offset is %d, should be 5)\n",
length,offset);

View File

@ -719,6 +719,7 @@ int app_vomp_console(const struct cli_parsed *parsed, void *context);
int app_meshms_read_messagelog(const struct cli_parsed *parsed, void *context);
int app_meshms_add_message(const struct cli_parsed *parsed, void *context);
int app_meshms_list_messages(const struct cli_parsed *parsed, void *context);
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);