prevent creation of duplicate or retrograde acks: only acks that

advance the ack position are written.
This commit is contained in:
gardners 2013-05-10 12:03:37 +09:30
parent 197f3960dd
commit fad28fbe55

View File

@ -384,6 +384,51 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, void *context)
return 0; return 0;
} }
int meshms_get_last_ack_offset(const char *left_sid,const char *right_sid)
{
rhizome_manifest *m_left=NULL;
m_left=meshms_find_or_create_manifestid(left_sid,right_sid,0);
if (!m_left) {
DEBUGF("Couldn't find manifest for thread ply");
return 0;
}
unsigned char *left_messages=malloc(m_left->fileLength);
if (!left_messages) {
WHYF("malloc(%d) failed while reading meshms logs",m_left->fileLength);
return 0;
}
if (meshms_read_message(m_left,left_messages)) {
DEBUGF("Couldn't read message log for thread ply");
rhizome_manifest_free(m_left); return 0;
}
int left_len=m_left->fileLength;
rhizome_manifest_free(m_left); m_left=NULL;
// Scan through messages and look for acks
int left_ack_limit=0;
int left_offset=0;
for(left_offset=0;left_offset<left_len;)
{
int block_type=meshms_block_type(left_messages,left_offset,left_len);
switch(block_type) {
case RHIZOME_MESHMS_BLOCK_TYPE_ACK:
{
int o=left_offset;
deserialize_ack(left_messages,&o,left_len,&left_ack_limit);
}
break;
}
unsigned int left_block_len;
int o=left_offset;
if (decode_length_forwards(left_messages,&o,left_len,&left_block_len)) break;
left_offset+=left_block_len;
}
free(left_messages);
return left_ack_limit;
}
int app_meshms_ack_messages(const struct cli_parsed *parsed, void *context) int app_meshms_ack_messages(const struct cli_parsed *parsed, void *context)
{ {
int ret = 0; int ret = 0;
@ -419,6 +464,13 @@ int app_meshms_ack_messages(const struct cli_parsed *parsed, void *context)
if (recipient_sid[0] && str_to_sid_t(&aSid, recipient_sid) == -1) if (recipient_sid[0] && str_to_sid_t(&aSid, recipient_sid) == -1)
return WHYF("invalid recipient_sid: %s", recipient_sid); return WHYF("invalid recipient_sid: %s", recipient_sid);
DEBUGF("Message log previously acknowledged to %d",
meshms_get_last_ack_offset(sender_sid,recipient_sid));
if (meshms_get_last_ack_offset(sender_sid,recipient_sid)>=message_offset) {
INFO("Already acknowledged.");
return 0;
}
// Create serialised ack message for appending to the conversation ply // Create serialised ack message for appending to the conversation ply
int length_int = 0; int length_int = 0;
unsigned char buffer_serialize[100]; unsigned char buffer_serialize[100];
@ -429,5 +481,7 @@ int app_meshms_ack_messages(const struct cli_parsed *parsed, void *context)
ret|=meshms_append_messageblock(sender_sid,recipient_sid, ret|=meshms_append_messageblock(sender_sid,recipient_sid,
buffer_serialize,length_int); buffer_serialize,length_int);
if (!ret) INFO("Acknowledged.");
return ret; return ret;
} }