Fix RESTful MeshMS newsince ACK logic

/restful/meshms/newsince/SID1/SID2/newsince/TOKEN/messagelist.json now
always lists an ACK as soon as it is received, instead of waiting for a
message to arrive as well.
This commit is contained in:
Andrew Bettison 2014-11-11 17:11:43 +10:30
parent 3c4703feb4
commit a4be1b0b0c
8 changed files with 79 additions and 48 deletions

11
httpd.h
View File

@ -167,10 +167,13 @@ typedef struct httpd_request
/* For responses that list MeshMS messages in a single conversation.
*/
struct {
enum meshms_which_ply token_which_ply;
uint64_t token_offset;
enum meshms_which_ply latest_which_ply;
uint64_t latest_offset;
struct newsince_position {
enum meshms_which_ply which_ply;
uint64_t offset;
}
token,
current,
latest;
time_ms_t end_time;
uint64_t highest_ack_offset;
enum list_phase phase;

View File

@ -40,7 +40,7 @@ public class MeshMSMessage {
public final String text;
public final boolean isDelivered;
public final boolean isRead;
public final long timestamp;
public final Long timestamp;
public final Long ackOffset;
protected MeshMSMessage(int rowNumber,
@ -52,7 +52,7 @@ public class MeshMSMessage {
String text,
boolean delivered,
boolean read,
long timestamp,
Long timestamp,
Long ack_offset) throws ServalDInterfaceException
{
if (my_sid == null)
@ -65,6 +65,8 @@ public class MeshMSMessage {
throw new ServalDInterfaceException("token is null");
if (type == Type.ACK_RECEIVED && ack_offset == null)
throw new ServalDInterfaceException("ack_offset is null");
if (type != Type.ACK_RECEIVED && timestamp == null)
throw new ServalDInterfaceException("timestamp is null");
this._rowNumber = rowNumber;
this.type = type;
this.mySid = my_sid;

View File

@ -66,7 +66,7 @@ public class MeshMSMessageList {
.addColumn("text", String.class, JSONTokeniser.Narrow.ALLOW_NULL)
.addColumn("delivered", Boolean.class)
.addColumn("read", Boolean.class)
.addColumn("timestamp", Long.class)
.addColumn("timestamp", Long.class, JSONTokeniser.Narrow.ALLOW_NULL)
.addColumn("ack_offset", Long.class, JSONTokeniser.Narrow.ALLOW_NULL);
}

View File

@ -880,6 +880,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
iter->text_length = 0;
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1)
iter->ack_offset = 0;
iter->read = 0;
return status;
case MESHMS_BLOCK_TYPE_MESSAGE:
iter->type = MESSAGE_RECEIVED;

View File

@ -142,7 +142,7 @@ struct meshms_message_iterator {
uint64_t latest_ack_my_offset; // offset in my ply of most recent message ACKed by them
uint64_t read_offset; // offset in remote (their) ply of most recent message read by me
// The following public fields change per message:
enum meshms_which_ply { MY_PLY, THEIR_PLY } which_ply;
enum meshms_which_ply { NEITHER_PLY, MY_PLY, THEIR_PLY } which_ply;
enum { MESSAGE_SENT, MESSAGE_RECEIVED, ACK_RECEIVED } type;
// For MESSAGE_SENT 'offset' is the byte position within the local ply
// (mine). For MESSAGE_RECEIVED and ACK_RECEIVED, it is the byte position

View File

@ -296,8 +296,8 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r)
return status;
r->u.msglist.finished = status != MESHMS_STATUS_UPDATED;
if (!r->u.msglist.finished) {
r->u.msglist.latest_which_ply = r->u.msglist.iter.which_ply;
r->u.msglist.latest_offset = r->u.msglist.iter.offset;
r->u.msglist.latest.which_ply = r->u.msglist.iter.which_ply;
r->u.msglist.latest.offset = r->u.msglist.iter.offset;
}
}
return MESHMS_STATUS_OK;
@ -311,7 +311,8 @@ static int restful_meshms_messagelist_json(httpd_request *r, const char *remaind
r->finalise_union = finalise_union_meshms_messagelist;
r->u.msglist.rowcount = 0;
r->u.msglist.phase = LIST_HEADER;
r->u.msglist.token_offset = 0;
r->u.msglist.token.which_ply = NEITHER_PLY;
r->u.msglist.token.offset = 0;
r->u.msglist.end_time = 0;
enum meshms_status status;
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
@ -332,15 +333,16 @@ static int restful_meshms_newsince_messagelist_json(httpd_request *r, const char
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
return http_request_meshms_response(r, 0, NULL, status);
if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.my_ply_bid) == 0)
r->u.msglist.token_which_ply = MY_PLY;
r->u.msglist.token.which_ply = MY_PLY;
else if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.their_ply_bid) == 0)
r->u.msglist.token_which_ply = THEIR_PLY;
r->u.msglist.token.which_ply = THEIR_PLY;
else {
http_request_simple_response(&r->http, 404, "Unmatched token");
return 404;
}
r->u.msglist.token_offset = r->ui64;
r->u.msglist.token.offset = r->ui64;
r->u.msglist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000;
r->u.msglist.current = r->u.msglist.token;
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshms_messagelist_json_content);
return 1;
}
@ -355,6 +357,8 @@ static int restful_meshms_messagelist_json_content(struct http_request *hr, unsi
return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshms_messagelist_json_content_chunk);
}
static void _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos);
static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr, strbuf b)
{
httpd_request *r = (httpd_request *) hr;
@ -396,12 +400,12 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
case LIST_ROWS:
{
if ( r->u.msglist.finished
|| (r->u.msglist.token_which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token_offset)
|| (r->u.msglist.token.which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token.offset)
) {
time_ms_t now;
if (r->u.msglist.end_time && (now = gettime_ms()) < r->u.msglist.end_time) {
r->u.msglist.token_which_ply = r->u.msglist.latest_which_ply;
r->u.msglist.token_offset = r->u.msglist.latest_offset;
_messagelist_json_ack(r, b, r->u.msglist.current);
r->u.msglist.token = r->u.msglist.latest;
meshms_message_iterator_close(&r->u.msglist.iter);
time_ms_t wake_at = now + config.api.restful.newsince_poll_ms;
if (wake_at > r->u.msglist.end_time)
@ -411,6 +415,8 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
}
r->u.msglist.phase = LIST_END;
} else {
r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply;
r->u.msglist.current.offset = r->u.msglist.iter.offset;
switch (r->u.msglist.iter.type) {
case MESSAGE_SENT:
if (r->u.msglist.rowcount != 0)
@ -463,33 +469,7 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
strbuf_puts(b, "]");
break;
case ACK_RECEIVED:
// Don't send old (irrelevant) ACKs.
if (r->u.msglist.iter.ack_offset > r->u.msglist.highest_ack_offset) {
if (r->u.msglist.rowcount != 0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[");
strbuf_json_string(b, "ACK");
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
strbuf_putc(b, ',');
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->their_ply.bundle_id, r->u.msglist.iter.offset));
strbuf_putc(b, ',');
strbuf_json_string(b, r->u.msglist.iter.text);
strbuf_putc(b, ',');
strbuf_json_boolean(b, 1);
strbuf_putc(b, ',');
strbuf_json_boolean(b, r->u.msglist.iter.read);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%d", r->u.msglist.iter.timestamp);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.ack_offset);
strbuf_puts(b, "]");
r->u.msglist.highest_ack_offset = r->u.msglist.iter.ack_offset;
}
_messagelist_json_ack(r, b, r->u.msglist.current);
break;
}
if (!strbuf_overrun(b)) {
@ -516,6 +496,40 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
return 0;
}
static void _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos)
{
// Don't send old (irrelevant) ACKs.
if (r->u.msglist.iter.latest_ack_my_offset > r->u.msglist.highest_ack_offset) {
if (r->u.msglist.rowcount != 0)
strbuf_putc(b, ',');
strbuf_puts(b, "\n[");
strbuf_json_string(b, "ACK");
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_offset);
strbuf_putc(b, ',');
// Same token as the message row just sent.
strbuf_json_string(b, alloca_meshms_token(
pos.which_ply == MY_PLY ? r->u.msglist.iter.my_ply_bid : r->u.msglist.iter.their_ply_bid,
pos.offset));
strbuf_putc(b, ',');
strbuf_json_null(b);
strbuf_putc(b, ',');
strbuf_json_boolean(b, 1);
strbuf_putc(b, ',');
strbuf_json_boolean(b, 0);
strbuf_putc(b, ',');
strbuf_json_null(b); // no timestamp on ACKs
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_my_offset);
strbuf_puts(b, "]");
r->u.msglist.highest_ack_offset = r->u.msglist.iter.latest_ack_my_offset;
}
}
static HTTP_REQUEST_PARSER restful_meshms_sendmessage_end;
static int send_mime_part_start(struct http_request *);
static int send_mime_part_end(struct http_request *);

View File

@ -179,6 +179,9 @@ test_MeshmsListMessagesNewSince() {
>tmp
[ $i -gt 0 ] && sed -n -e "1,${i}p" messages >tmp
tfw_preserve tmp
# If tmp contains an ACK_RECEIVED, then so should messages$i. Otherwise,
# ignore any ACK_RECEIVED in messages$i.
grep -q 'type=ACK_RECEIVED' tmp || sed -i -e '/type=ACK_RECEIVED/d' messages$i
assert cmp tmp messages$i
done
}
@ -217,7 +220,12 @@ test_MeshmsListMessagesNewSinceArrival() {
wait_until [ -e messages1 -a -e messages2 -a -e messages3 ]
for message in '>Rumplestiltskin' 'A' '<Howdydoody' '>Eulenspiegel'; do
meshms_add_messages $SIDA1 $SIDA2 "${message:0:1}" "${message:1}"
wait_until grepall "${message:1}" messages{1,2,3}
case ${message:0:1} in
'<'|'>') waitfor="${message:1}";;
'A') waitfor="ACK";;
*) error "message=${message}";;
esac
wait_until grepall "$waitfor" messages{1,2,3}
done
fork_terminate_all
fork_wait_all

View File

@ -325,8 +325,11 @@ test_MeshmsListMessagesNewSince() {
for ((i = 0; i < NROWS; i += 3)); do
transform_list_json messagelist$i.json messages$i.json
tfw_preserve messages$i.json
{ echo '{"a":'; cat messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
assertJq tmp.json '.a[:'$i'] == .b'
{ echo '{"a":'; jq '.[:'$i']' messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
# If an ACK is new since the token, then it will be in its correct place
# in messages$i.json, otherwise the ACK will appear in messages$i.json
# anyway.
assertJq tmp.json '.a == (if .a | contains([{type:"ACK"}]) then .b else .b | map(select(.type != "ACK")) end)'
done
}
@ -363,7 +366,7 @@ setup_MeshmsListMessagesNewSinceArrival() {
}
test_MeshmsListMessagesNewSinceArrival() {
for i in 1 2 3; do
fork %curl$i executeOk curl \
fork %curl$i executeOk --timeout=360 curl \
--silent --fail --show-error \
--no-buffer \
--output newsince$i.json \