Add timestamps to meshms messages and acks

This commit is contained in:
Jeremy Lakeman 2014-07-14 11:29:49 +09:30
parent 89623015fc
commit bc87f8c7c0
6 changed files with 85 additions and 43 deletions

View File

@ -31,6 +31,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MESHMS_BLOCK_TYPE_ACK 0x01
#define MESHMS_BLOCK_TYPE_MESSAGE 0x02 // NUL-terminated UTF8 string
#define MESHMS_BLOCK_TYPE_TIME 0x03 // local timestamp record
static unsigned mark_read(struct meshms_conversations *conv, const sid_t *their_sid, const uint64_t offset);
@ -212,7 +213,7 @@ static int create_ply(const sid_t *my_sid, struct meshms_conversations *conv, rh
return 0;
}
static int append_footer(unsigned char *buffer, char type, size_t message_len)
static size_t append_footer(unsigned char *buffer, char type, size_t message_len)
{
assert(message_len <= MESHMS_MESSAGE_MAX_LEN);
message_len = (message_len << 4) | (type&0xF);
@ -220,6 +221,14 @@ static int append_footer(unsigned char *buffer, char type, size_t message_len)
return 2;
}
// append a timestamp as a uint32_t with 1s precision
static size_t append_timestamp(uint8_t *buffer)
{
write_uint32(buffer, gettime());
size_t ofs=4;
return ofs+append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_TIME, ofs);
}
static enum meshms_status ply_read_open(struct meshms_ply_read *ply, const rhizome_bid_t *bid, rhizome_manifest *m)
{
if (config.debug.meshms)
@ -470,12 +479,13 @@ static enum meshms_status update_conversation(const sid_t *my_sid, struct meshms
// append an ack for their message
if (config.debug.meshms)
DEBUGF("Creating ACK for %"PRId64" - %"PRId64, previous_ack, conv->their_last_message);
unsigned char buffer[24];
unsigned char buffer[30];
int ofs=0;
ofs+=pack_uint(&buffer[ofs], conv->their_last_message);
if (previous_ack)
ofs+=pack_uint(&buffer[ofs], conv->their_last_message - previous_ack);
ofs+=append_footer(buffer+ofs, MESHMS_BLOCK_TYPE_ACK, ofs);
ofs+=append_timestamp(buffer+ofs);
status = append_meshms_buffer(my_sid, conv, buffer, ofs);
if (config.debug.meshms)
DEBUGF("status=%d", status);
@ -787,6 +797,7 @@ enum meshms_status meshms_message_iterator_open(struct meshms_message_iterator *
iter->my_ply_bid = &iter->_conv->my_ply.bundle_id;
iter->their_ply_bid = &iter->_conv->their_ply.bundle_id;
iter->read_offset = iter->_conv->read_offset;
iter->timestamp = 0;
// If I have never sent a message (or acked any of theirs), there are no messages in the thread.
if (iter->_conv->found_my_ply) {
if ((iter->_my_manifest = rhizome_new_manifest()) == NULL)
@ -896,6 +907,14 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
DEBUGF("Offset %"PRId64", type %d, read_offset %"PRId64, iter->_my_reader.read.offset, iter->_my_reader.type, iter->read_offset);
iter->which_ply = MY_PLY;
switch (iter->_my_reader.type) {
case MESHMS_BLOCK_TYPE_TIME:
if (iter->_my_reader.record_length<4){
WARN("Malformed MeshMS2 ply journal, expected 4 byte timestamp");
return MESHMS_STATUS_PROTOCOL_FAULT;
}
iter->timestamp = read_uint32(iter->_my_reader.record);
DEBUGF("Parsed timestamp %ds old", gettime() - iter->timestamp);
break;
case MESHMS_BLOCK_TYPE_ACK:
// Read the received messages up to the ack'ed offset
if (iter->_conv->found_their_ply) {
@ -943,12 +962,13 @@ enum meshms_status meshms_send_message(const sid_t *sender, const sid_t *recipie
assert(conv != NULL);
// construct a message payload
// TODO, new format here.
unsigned char buffer[message_len + 4];
unsigned char buffer[message_len + 4 + 6];
strncpy((char*)buffer, message, message_len);
// ensure message is NUL terminated
if (message[message_len - 1] != '\0')
buffer[message_len++] = '\0';
message_len += append_footer(buffer + message_len, MESHMS_BLOCK_TYPE_MESSAGE, message_len);
message_len+=append_timestamp(buffer + message_len);
status = append_meshms_buffer(sender, conv, buffer, message_len);
}
meshms_free_conversations(conv);
@ -1125,11 +1145,12 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
return status;
}
const char *names[]={
"_id","offset","type","message"
"_id","offset","age","type","message"
};
cli_columns(context, 4, names);
cli_columns(context, 5, names);
bool_t marked_delivered = 0;
bool_t marked_read = 0;
time_s_t now = gettime();
int id = 0;
while ((status = meshms_message_iterator_prev(&iter)) == MESHMS_STATUS_UPDATED) {
switch (iter.type) {
@ -1137,6 +1158,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
if (iter.delivered && !marked_delivered){
cli_put_long(context, id++, ":");
cli_put_long(context, iter.latest_ack_offset, ":");
cli_put_long(context, iter.timestamp?(int)(now - iter.timestamp):-1, ":");
cli_put_string(context, "ACK", ":");
cli_put_string(context, "delivered", "\n");
marked_delivered = 1;
@ -1144,6 +1166,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
// TODO new message format here
cli_put_long(context, id++, ":");
cli_put_long(context, iter.offset, ":");
cli_put_long(context, iter.timestamp?(int)(now - iter.timestamp):-1, ":");
cli_put_string(context, ">", ":");
cli_put_string(context, iter.text, "\n");
break;
@ -1153,6 +1176,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
if (iter.read && !marked_read) {
cli_put_long(context, id++, ":");
cli_put_long(context, iter.read_offset, ":");
cli_put_long(context, iter.timestamp?(int)(now - iter.timestamp):-1, ":");
cli_put_string(context, "MARK", ":");
cli_put_string(context, "read", "\n");
marked_read = 1;
@ -1160,6 +1184,7 @@ int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context
// TODO new message format here
cli_put_long(context, id++, ":");
cli_put_long(context, iter.offset, ":");
cli_put_long(context, iter.timestamp?(int)(now - iter.timestamp):-1, ":");
cli_put_string(context, "<", ":");
cli_put_string(context, iter.text, "\n");
break;

View File

@ -152,6 +152,7 @@ struct meshms_message_iterator {
// For MESSAGE_SENT 'offset' is the byte position within the local ply
// (mine). For MESSAGE_RECEIVED and ACK_RECEIVED, it is the byte position
// within the remote ply (theirs).
time_s_t timestamp;
uint64_t offset;
const char *text; // text of UTF8 message (NUL terminated)
size_t text_length; // excluding terminating NUL

11
os.c
View File

@ -155,6 +155,17 @@ time_ms_t gettime_ms()
return nowtv.tv_sec * 1000LL + nowtv.tv_usec / 1000;
}
time_s_t gettime()
{
struct timeval nowtv;
// If gettimeofday() fails or returns an invalid value, all else is lost!
if (gettimeofday(&nowtv, NULL) == -1)
FATAL_perror("gettimeofday");
if (nowtv.tv_sec < 0 || nowtv.tv_usec < 0 || nowtv.tv_usec >= 1000000)
FATALF("gettimeofday returned tv_sec=%ld tv_usec=%ld", (long)nowtv.tv_sec, (long)nowtv.tv_usec);
return nowtv.tv_sec;
}
// Returns sleep time remaining.
time_ms_t sleep_ms(time_ms_t milliseconds)
{

2
os.h
View File

@ -59,11 +59,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* }
*/
typedef int64_t time_ms_t;
typedef uint32_t time_s_t;
#define PRItime_ms_t PRId64
#define TIME_MS_NEVER_WILL INT64_MAX
#define TIME_MS_NEVER_HAS INT64_MIN
time_ms_t gettime_ms();
time_s_t gettime();
time_ms_t sleep_ms(time_ms_t milliseconds);
struct timeval time_ms_to_timeval(time_ms_t);

View File

@ -22,6 +22,8 @@ source "${0%/*}/../testframework.sh"
source "${0%/*}/../testdefs.sh"
source "${0%/*}/../testdefs_rhizome.sh"
rexp_age="[0-9]\+"
teardown() {
stop_all_servald_servers
kill_all_servald_processes
@ -48,8 +50,8 @@ setup_MessageDelivery() {
test_MessageDelivery() {
# 1. empty list
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutIs --stdout --line=1 -e '4\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:type:message\n'
assertStdoutIs --stdout --line=1 -e '5\n'
assertStdoutIs --stdout --line=2 -e '_id:offset:age:type:message\n'
assertStdoutLineCount '==' 2
# 2. create a manifest with a single message and list it back
executeOk_servald meshms send message $SIDA1 $SIDA2 "Hi"
@ -64,8 +66,8 @@ test_MessageDelivery() {
assertStdoutLineCount '==' 4
# 4. list the messages from the receivers point of view (which ACKs them)
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:<:Hi\$"
assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:<:Hi\$"
assertStdoutLineCount '==' 4
CONV_BID=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*bid=\([0-9A-F]*\).*/\1/p')
CONV_SECRET=$(replayStderr | sed -n -e '/MESHMS CONVERSATION BUNDLE/s/.*secret=\([0-9A-F]*\).*/\1/p')
@ -75,23 +77,23 @@ test_MessageDelivery() {
tfw_cat --stderr
check_meshms_bundles
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^2:5:<:Hi\$"
assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^1:5:$rexp_age:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$"
assertStdoutLineCount '==' 5
# 6. mark all messages as read
executeOk_servald meshms read messages $SIDA2
check_meshms_bundles
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^1:19:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:<:Hi\$"
assertStdoutGrep --stdout --matches=1 "^0:25:$rexp_age:MARK:read\$"
assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:<:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:<:Hi\$"
assertStdoutLineCount '==' 5
# 7. list messages from the senders point of view after they have been delivered
executeOk_servald meshms list messages $SIDA1 $SIDA2
assertStdoutGrep --stdout --matches=1 "^0:3:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^1:19:>:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:>:Hi\$"
assertStdoutGrep --stdout --matches=1 "^0:3:$rexp_age:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^1:25:$rexp_age:>:How are you\$"
assertStdoutGrep --stdout --matches=1 "^2:5:$rexp_age:>:Hi\$"
assertStdoutLineCount '==' 5
}
@ -146,20 +148,20 @@ test_MessageThreading() {
set_instance +B
wait_until has_unread_messages $SIDB
executeOk_servald meshms list messages $SIDB $SIDA
assertStdoutGrep --stdout --matches=1 "^0:40:<:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^1:24:<:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^2:54:>:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^3:41:>:Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^0:46:$rexp_age:<:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^1:24:$rexp_age:<:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^2:60:$rexp_age:>:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^3:41:$rexp_age:>:Help Im trapped in a test case factory\$"
assertStdoutLineCount '==' 6
set_instance +A
wait_until has_unread_messages $SIDA
wait_until messages_delivered $SIDA $SIDB
executeOk_servald meshms list messages $SIDA $SIDB
assertStdoutGrep --stdout --matches=1 "^0:54:<:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^1:41:<:Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^2:57:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^3:40:>:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^4:24:>:Hello can you hear me\$"
assertStdoutGrep --stdout --matches=1 "^0:60:$rexp_age:<:Never mind\$"
assertStdoutGrep --stdout --matches=1 "^1:41:$rexp_age:<:Help Im trapped in a test case factory\$"
assertStdoutGrep --stdout --matches=1 "^2:69:$rexp_age:ACK:delivered\$"
assertStdoutGrep --stdout --matches=1 "^3:46:$rexp_age:>:Still waiting\$"
assertStdoutGrep --stdout --matches=1 "^4:24:$rexp_age:>:Hello can you hear me\$"
assertStdoutLineCount '==' 7
}
@ -182,7 +184,7 @@ test_listConversations() {
assertStdoutIs --stderr --line=2 -e '_id:recipient:read:last_message:read_offset\n'
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"
assertStdoutLineCount '==' 5
executeOk_servald meshms list conversations $SIDA1 1
tfw_cat --stderr
@ -196,7 +198,7 @@ test_listConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
assertStdoutLineCount '==' 5
executeOk_servald meshms list messages $SIDA1 $SIDA2
executeOk_servald meshms list messages $SIDA1 $SIDA4

View File

@ -160,7 +160,7 @@ test_MeshmsListConversations() {
{ my_sid: \"$SIDA1\",
their_sid: \"$SIDA4\",
read: false,
last_message: 14,
last_message: 20,
read_offset: 0
}
])"
@ -201,8 +201,8 @@ test_MeshmsListConversations() {
{ my_sid: \"$SIDA1\",
their_sid: \"$SIDA4\",
read: true,
last_message: 14,
read_offset: 14
last_message: 20,
read_offset: 20
}
])"
}
@ -214,10 +214,11 @@ setup_MeshmsListMessages() {
meshms_add_messages $SIDA1 $SIDA2 '><>>A>A<>><><><>>>A>A><<<<A<>><>>A<<>'
let NROWS=NSENT+NRECV+(NACK?1:0)
executeOk_servald meshms list messages $SIDA1 $SIDA2
delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):>:.*/\1/p;q}' "$TFWSTDOUT")
delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):[0-9]\+:>:.*/\1/p;q}' "$TFWSTDOUT")
[ -z "$delivered_offset" ] && delivered_offset=0
read_offset=$(sed -n -e 's/^[0-9]\+:\([0-9]\+\):MARK:read$/\1/p' "$TFWSTDOUT")
read_offset=$(sed -n -e 's/^[0-9]\+:\([0-9]\+\):[0-9]\+:MARK:read$/\1/p' "$TFWSTDOUT")
[ -z "$read_offset" ] && read_offset=0
tfw_log "delivered: $delivered_offset; read: $read_offset"
}
test_MeshmsListMessages() {
executeOk curl \
@ -573,7 +574,7 @@ setup_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3:unread:11:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:14:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4:unread:20:0\$"
}
test_MeshmsReadAllConversations() {
executeOk curl \
@ -588,7 +589,7 @@ test_MeshmsReadAllConversations() {
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2::0:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA3::11:11\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::14:14\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA4::20:20\$"
}
doc_MeshmsPostSpuriousContent="HTTP RESTful MeshMS rejects unwanted content in POST request"
@ -601,7 +602,7 @@ setup_MeshmsPostSpuriousContent() {
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message3"
executeOk_servald meshms send message $SIDA2 $SIDA1 "Message4"
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:29:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$"
}
test_MeshmsPostSpuriousContent() {
executeOk curl \
@ -617,7 +618,7 @@ test_MeshmsPostSpuriousContent() {
assertJq http_body 'contains({"http_status_code": 400})'
assertJqGrep --ignore-case http_body '.http_status_message' 'content length'
executeOk_servald meshms list conversations $SIDA1
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:29:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA2:unread:47:0\$"
}
doc_MeshmsReadAllMessages="HTTP RESTful MeshMS mark all conversations read"
@ -632,7 +633,7 @@ setup_MeshmsReadAllMessages() {
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message5"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message6"
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:33:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:45:0\$"
}
test_MeshmsReadAllMessages() {
executeOk curl \
@ -645,7 +646,7 @@ test_MeshmsReadAllMessages() {
assertExitStatus == 0
assertStdoutIs 201
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stderr --matches=1 ":$SIDA1::33:33\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA1::45:45\$"
}
doc_MeshmsReadMessage="HTTP RESTful MeshMS mark a message as read"
@ -660,7 +661,7 @@ setup_MeshmsReadMessage() {
executeOk_servald meshms send message $SIDA4 $SIDA1 "Message5"
executeOk_servald meshms send message $SIDA1 $SIDA2 "Message6"
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:33:0\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:45:0\$"
}
test_MeshmsReadMessage() {
executeOk curl \
@ -673,7 +674,7 @@ test_MeshmsReadMessage() {
assertExitStatus == 0
assertStdoutIs 201
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:33:22\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:45:22\$"
executeOk curl \
--silent --show-error --write-out '%{http_code}' \
--output read.json \
@ -684,7 +685,7 @@ test_MeshmsReadMessage() {
assertExitStatus == 0
assertStdoutIs 200
executeOk_servald meshms list conversations $SIDA2
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:33:22\$"
assertStdoutGrep --stderr --matches=1 ":$SIDA1:unread:45:22\$"
}
runTests "$@"