Add separate rows for delivered and read status

- add explicit transaction around blob writes so the commit can be retried
- remove status columns
- remove SID's from message log output to reduce size and complexity
This commit is contained in:
Jeremy Lakeman 2013-08-02 14:02:56 +09:30
parent 68bf04f69d
commit 91a600fba7
3 changed files with 92 additions and 44 deletions

View File

@ -744,10 +744,10 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
int ret=-1;
const char *names[]={
"_id","offset","sender","status","message"
"_id","offset","type","message"
};
cli_columns(context, 5, names);
cli_columns(context, 4, names);
rhizome_manifest *m_ours=NULL, *m_theirs=NULL;
struct ply_read read_ours, read_theirs;
@ -772,6 +772,8 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
goto end;
uint64_t their_last_ack=0;
uint64_t their_ack_offset=0;
int64_t unread_mark=conv->read_offset;
if (conv->found_their_ply){
rhizome_manifest *m_theirs = rhizome_new_manifest();
@ -785,14 +787,26 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
if (r==0){
if (unpack_uint(read_theirs.buffer, read_theirs.record_length, &their_last_ack)<0)
their_last_ack=0;
else
their_ack_offset = read_theirs.record_end_offset;
if (config.debug.meshms)
DEBUGF("Found their last ack @%"PRId64, their_last_ack);
}
}
int id=0;
while(ply_read_next(&read_ours)==0){
if (config.debug.meshms)
DEBUGF("Offset %"PRId64", type %d, read_offset %"PRId64, read_ours.read.offset, read_ours.type, conv->read_offset);
if (their_last_ack && their_last_ack >= read_ours.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, their_ack_offset, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
their_last_ack = 0;
}
switch(read_ours.type){
case MESHMS_BLOCK_TYPE_ACK:
// read their message list, and insert all messages that are included in the ack range
@ -811,15 +825,24 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
// just incase we don't have the full bundle anymore
if (read_theirs.read.offset > read_theirs.read.length)
read_theirs.read.offset = read_theirs.read.length;
if (config.debug.meshms)
DEBUGF("Reading other log from %"PRId64", to %"PRId64, read_theirs.read.offset, end_range);
while(ply_find_next(&read_theirs, MESHMS_BLOCK_TYPE_MESSAGE)==0){
if (read_theirs.read.offset < end_range)
break;
if (unread_mark >= (int64_t)read_theirs.record_end_offset){
cli_put_long(context, id++, ":");
cli_put_long(context, unread_mark, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");
unread_mark = -1;
}
cli_put_long(context, id++, ":");
cli_put_long(context, read_theirs.record_end_offset, ":");
cli_put_hexvalue(context, their_sid.binary, sizeof(their_sid), ":");
cli_put_string(context, read_theirs.record_end_offset <= conv->read_offset?"":"unread", ":");
cli_put_string(context, "<", ":");
cli_put_string(context, (char *)read_theirs.buffer, "\n");
}
}
@ -828,8 +851,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
// TODO new message format here
cli_put_long(context, id++, ":");
cli_put_long(context, read_ours.record_end_offset, ":");
cli_put_hexvalue(context, my_sid.binary, sizeof(my_sid), ":");
cli_put_string(context, their_last_ack >= read_ours.record_end_offset ? "delivered":"", ":");
cli_put_string(context, ">", ":");
cli_put_string(context, (char *)read_ours.buffer, "\n");
break;
}

View File

@ -163,16 +163,23 @@ static int write_get_lock(struct rhizome_write *write_state){
if (write_state->blob_fd>=0 || write_state->sql_blob)
return 0;
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") == -1)
return -1;
while(1){
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data",
write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob);
if (ret==SQLITE_OK)
if (ret==SQLITE_OK){
sqlite_retry_done(&retry, "sqlite3_blob_open");
return 0;
}
if (!sqlite_code_busy(ret))
return WHYF("sqlite3_blob_write() failed: %s",
sqlite3_errmsg(rhizome_db));
if (sqlite_retry(&retry, "sqlite3_blob_open")==0)
return -1;
return WHYF("Giving up");
}
}
@ -202,13 +209,15 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
while(1){
int ret=sqlite3_blob_write(write_state->sql_blob, buffer, data_size, file_offset);
if (ret==SQLITE_OK)
if (ret==SQLITE_OK){
sqlite_retry_done(&retry, "sqlite3_blob_write");
break;
}
if (!sqlite_code_busy(ret))
return WHYF("sqlite3_blob_write() failed: %s",
sqlite3_errmsg(rhizome_db));
if (sqlite_retry(&retry, "sqlite3_blob_write")==0)
return -1;
return WHY("Giving up");
}
}
@ -221,13 +230,22 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u
// close database locks
static int write_release_lock(struct rhizome_write *write_state){
int ret=0;
if (write_state->blob_fd>=0)
return 0;
if (write_state->sql_blob)
sqlite3_blob_close(write_state->sql_blob);
if (write_state->sql_blob){
ret = sqlite3_blob_close(write_state->sql_blob);
if (ret)
WHYF("sqlite3_blob_close() failed: %s",
sqlite3_errmsg(rhizome_db));
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1)
ret=-1;
}
write_state->sql_blob=NULL;
return 0;
return ret;
}
// Write data buffers in any order, the data will be cached and streamed into the database in file order.
@ -349,7 +367,8 @@ int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsi
last_offset = (*ptr)->offset + (*ptr)->data_size;
ptr = &((*ptr)->_next);
}
write_release_lock(write_state);
if (write_release_lock(write_state))
ret=-1;
return ret;
}
@ -383,7 +402,8 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename){
}
}
end:
write_release_lock(write);
if (write_release_lock(write))
ret=-1;
fclose(f);
return ret;
}
@ -448,7 +468,8 @@ int rhizome_finish_write(struct rhizome_write *write){
close(fd);
write->blob_fd=-1;
}
write_release_lock(write);
if (write_release_lock(write))
goto failure;
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&write->sha512_context, hash_out);
@ -723,11 +744,12 @@ int rhizome_read(struct rhizome_read *read_state, unsigned char *buffer, int buf
if (read_state->hash){
if (buffer && bytes_read>0)
SHA512_Update(&read_state->sha512_context, buffer, bytes_read);
if (read_state->offset + bytes_read>=read_state->length){
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&read_state->sha512_context, hash_out);
if (strcasecmp(read_state->id, hash_out)){
WHYF("Expected hash=%s, got %s", read_state->id, hash_out);
RETURN(WHYF("Expected hash=%s, got %s", read_state->id, hash_out));
}
read_state->hash=0;
}
@ -960,7 +982,7 @@ int rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m,
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
// don't bother to decrypt the existing journal payload
ret = rhizome_open_read(&read_state, m->fileHexHash, 1);
ret = rhizome_open_read(&read_state, m->fileHexHash, advance_by>0?0:1);
if (ret)
goto failure;

View File

@ -41,42 +41,45 @@ setup_MessageDelivery() {
test_MessageDelivery() {
# 1. empty list
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutIs --stdout --line=1 -e '5\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:sender:status:message\n'
assertStdoutIs --stdout --line=1 -e '4\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:type:message\n'
assertStdoutLineCount '==' 2
# 2. create a manifest with a single message and list it back
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 "::Hi\$"
assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
assertStdoutLineCount '==' 3
# 3. append a second message and list them both
executeOk_servald meshms send message $SIDA1 $SIDA2 "How are you"
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 "::How are you\$"
assertStdoutGrep --stdout --matches=1 "::Hi\$"
assertStdoutGrep --stdout --matches=1 ":>:How are you\$"
assertStdoutGrep --stdout --matches=1 ":>:Hi\$"
assertStdoutLineCount '==' 4
# 4. list the messages from the receivers point of view (which ACKs them)
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:$SIDA1:unread:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:$SIDA1:unread:Hi\$"
assertStdoutGrep --stdout --matches=1 "^0:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:<:Hi\$"
assertStdoutLineCount '==' 4
# 5. mark the first message as read
executeOk_servald meshms read messages $SIDA2 $SIDA1 5
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 ":unread:How are you\$"
assertStdoutGrep --stdout --matches=1 "::Hi\$"
assertStdoutLineCount '==' 4
assertStdoutGrep --stdout --matches=1 "^0:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^2:5:<:Hi\$"
assertStdoutLineCount '==' 5
# 6. mark all messages as read
executeOk_servald meshms read messages $SIDA2
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "::How are you\$"
assertStdoutGrep --stdout --matches=1 "::Hi\$"
assertStdoutLineCount '==' 4
assertStdoutGrep --stdout --matches=1 "^0:19:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^1:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:<:Hi\$"
assertStdoutLineCount '==' 5
# 7. list messages from the senders point of view after they have been delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 ":delivered:How are you\$"
assertStdoutGrep --stdout --matches=1 ":delivered:Hi\$"
assertStdoutLineCount '==' 4
assertStdoutGrep --stdout --matches=1 "^0:3:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^1:19:>:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:>:Hi\$"
assertStdoutLineCount '==' 5
}
doc_MessageThreading="Messages sent at the same time, thread differently"
@ -101,7 +104,7 @@ has_unread_messages() {
messages_delivered() {
executeOk_servald meshms list messages $1 $2
if ! grep ":delivered:" $_tfw_tmp/stdout; then
if ! grep ":ACK:" $_tfw_tmp/stdout; then
return 1
fi
}
@ -110,21 +113,22 @@ test_MessageThreading() {
set_instance +B
wait_until has_unread_messages $SIDB
executeOk_servald meshms list messages $SIDB $SIDA
assertStdoutGrep --stdout --matches=1 "^0:40:$SIDA:unread:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^1:24:$SIDA:unread:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^2:54:$SIDB::Never mind\$"
assertStdoutGrep --stdout --matches=1 "^3:41:$SIDB::Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^0:40:<:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^1:24:<:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^2:54:>:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^3:41:>:Help Im trapped in a test case factory\$"
assertStdoutLineCount '==' 6
set_instance +A
wait_until has_unread_messages $SIDA
wait_until messages_delivered $SIDA $SIDB
executeOk_servald meshms list messages $SIDA $SIDB
assertStdoutGrep --stdout --matches=1 "^0:54:$SIDB:unread:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^1:41:$SIDB:unread:Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^2:40:$SIDA:delivered:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^3:24:$SIDA:delivered:Hello can you hear me\$"
assertStdoutLineCount '==' 6
assertStdoutGrep --stdout --matches=1 "^0:54:<:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^1:41:<:Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^2:57:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^3:40:>:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^4:24:>:Hello can you hear me\$"
assertStdoutLineCount '==' 7
}
doc_listConversations="List multiple conversations, with different numbers of messages"