mirror of
https://github.com/servalproject/serval-dna.git
synced 2024-12-24 07:16:43 +00:00
Fix RESTful MeshMS newsince ACK logic
/restful/meshms/newsince/SID1/SID2/newsince/TOKEN/messagelist.json now always lists an ACK as soon as it is received, instead of waiting for a message to arrive as well.
This commit is contained in:
parent
3c4703feb4
commit
a4be1b0b0c
11
httpd.h
11
httpd.h
@ -167,10 +167,13 @@ typedef struct httpd_request
|
||||
/* For responses that list MeshMS messages in a single conversation.
|
||||
*/
|
||||
struct {
|
||||
enum meshms_which_ply token_which_ply;
|
||||
uint64_t token_offset;
|
||||
enum meshms_which_ply latest_which_ply;
|
||||
uint64_t latest_offset;
|
||||
struct newsince_position {
|
||||
enum meshms_which_ply which_ply;
|
||||
uint64_t offset;
|
||||
}
|
||||
token,
|
||||
current,
|
||||
latest;
|
||||
time_ms_t end_time;
|
||||
uint64_t highest_ack_offset;
|
||||
enum list_phase phase;
|
||||
|
@ -40,7 +40,7 @@ public class MeshMSMessage {
|
||||
public final String text;
|
||||
public final boolean isDelivered;
|
||||
public final boolean isRead;
|
||||
public final long timestamp;
|
||||
public final Long timestamp;
|
||||
public final Long ackOffset;
|
||||
|
||||
protected MeshMSMessage(int rowNumber,
|
||||
@ -52,7 +52,7 @@ public class MeshMSMessage {
|
||||
String text,
|
||||
boolean delivered,
|
||||
boolean read,
|
||||
long timestamp,
|
||||
Long timestamp,
|
||||
Long ack_offset) throws ServalDInterfaceException
|
||||
{
|
||||
if (my_sid == null)
|
||||
@ -65,6 +65,8 @@ public class MeshMSMessage {
|
||||
throw new ServalDInterfaceException("token is null");
|
||||
if (type == Type.ACK_RECEIVED && ack_offset == null)
|
||||
throw new ServalDInterfaceException("ack_offset is null");
|
||||
if (type != Type.ACK_RECEIVED && timestamp == null)
|
||||
throw new ServalDInterfaceException("timestamp is null");
|
||||
this._rowNumber = rowNumber;
|
||||
this.type = type;
|
||||
this.mySid = my_sid;
|
||||
|
@ -66,7 +66,7 @@ public class MeshMSMessageList {
|
||||
.addColumn("text", String.class, JSONTokeniser.Narrow.ALLOW_NULL)
|
||||
.addColumn("delivered", Boolean.class)
|
||||
.addColumn("read", Boolean.class)
|
||||
.addColumn("timestamp", Long.class)
|
||||
.addColumn("timestamp", Long.class, JSONTokeniser.Narrow.ALLOW_NULL)
|
||||
.addColumn("ack_offset", Long.class, JSONTokeniser.Narrow.ALLOW_NULL);
|
||||
}
|
||||
|
||||
|
1
meshms.c
1
meshms.c
@ -880,6 +880,7 @@ enum meshms_status meshms_message_iterator_prev(struct meshms_message_iterator *
|
||||
iter->text_length = 0;
|
||||
if (unpack_uint(iter->_their_reader.record, iter->_their_reader.record_length, &iter->ack_offset) == -1)
|
||||
iter->ack_offset = 0;
|
||||
iter->read = 0;
|
||||
return status;
|
||||
case MESHMS_BLOCK_TYPE_MESSAGE:
|
||||
iter->type = MESSAGE_RECEIVED;
|
||||
|
2
meshms.h
2
meshms.h
@ -142,7 +142,7 @@ struct meshms_message_iterator {
|
||||
uint64_t latest_ack_my_offset; // offset in my ply of most recent message ACKed by them
|
||||
uint64_t read_offset; // offset in remote (their) ply of most recent message read by me
|
||||
// The following public fields change per message:
|
||||
enum meshms_which_ply { MY_PLY, THEIR_PLY } which_ply;
|
||||
enum meshms_which_ply { NEITHER_PLY, MY_PLY, THEIR_PLY } which_ply;
|
||||
enum { MESSAGE_SENT, MESSAGE_RECEIVED, ACK_RECEIVED } type;
|
||||
// For MESSAGE_SENT 'offset' is the byte position within the local ply
|
||||
// (mine). For MESSAGE_RECEIVED and ACK_RECEIVED, it is the byte position
|
||||
|
@ -296,8 +296,8 @@ static enum meshms_status reopen_meshms_message_iterator(httpd_request *r)
|
||||
return status;
|
||||
r->u.msglist.finished = status != MESHMS_STATUS_UPDATED;
|
||||
if (!r->u.msglist.finished) {
|
||||
r->u.msglist.latest_which_ply = r->u.msglist.iter.which_ply;
|
||||
r->u.msglist.latest_offset = r->u.msglist.iter.offset;
|
||||
r->u.msglist.latest.which_ply = r->u.msglist.iter.which_ply;
|
||||
r->u.msglist.latest.offset = r->u.msglist.iter.offset;
|
||||
}
|
||||
}
|
||||
return MESHMS_STATUS_OK;
|
||||
@ -311,7 +311,8 @@ static int restful_meshms_messagelist_json(httpd_request *r, const char *remaind
|
||||
r->finalise_union = finalise_union_meshms_messagelist;
|
||||
r->u.msglist.rowcount = 0;
|
||||
r->u.msglist.phase = LIST_HEADER;
|
||||
r->u.msglist.token_offset = 0;
|
||||
r->u.msglist.token.which_ply = NEITHER_PLY;
|
||||
r->u.msglist.token.offset = 0;
|
||||
r->u.msglist.end_time = 0;
|
||||
enum meshms_status status;
|
||||
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
|
||||
@ -332,15 +333,16 @@ static int restful_meshms_newsince_messagelist_json(httpd_request *r, const char
|
||||
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
|
||||
return http_request_meshms_response(r, 0, NULL, status);
|
||||
if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.my_ply_bid) == 0)
|
||||
r->u.msglist.token_which_ply = MY_PLY;
|
||||
r->u.msglist.token.which_ply = MY_PLY;
|
||||
else if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.their_ply_bid) == 0)
|
||||
r->u.msglist.token_which_ply = THEIR_PLY;
|
||||
r->u.msglist.token.which_ply = THEIR_PLY;
|
||||
else {
|
||||
http_request_simple_response(&r->http, 404, "Unmatched token");
|
||||
return 404;
|
||||
}
|
||||
r->u.msglist.token_offset = r->ui64;
|
||||
r->u.msglist.token.offset = r->ui64;
|
||||
r->u.msglist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000;
|
||||
r->u.msglist.current = r->u.msglist.token;
|
||||
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshms_messagelist_json_content);
|
||||
return 1;
|
||||
}
|
||||
@ -355,6 +357,8 @@ static int restful_meshms_messagelist_json_content(struct http_request *hr, unsi
|
||||
return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshms_messagelist_json_content_chunk);
|
||||
}
|
||||
|
||||
static void _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos);
|
||||
|
||||
static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr, strbuf b)
|
||||
{
|
||||
httpd_request *r = (httpd_request *) hr;
|
||||
@ -396,12 +400,12 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
|
||||
case LIST_ROWS:
|
||||
{
|
||||
if ( r->u.msglist.finished
|
||||
|| (r->u.msglist.token_which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token_offset)
|
||||
|| (r->u.msglist.token.which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token.offset)
|
||||
) {
|
||||
time_ms_t now;
|
||||
if (r->u.msglist.end_time && (now = gettime_ms()) < r->u.msglist.end_time) {
|
||||
r->u.msglist.token_which_ply = r->u.msglist.latest_which_ply;
|
||||
r->u.msglist.token_offset = r->u.msglist.latest_offset;
|
||||
_messagelist_json_ack(r, b, r->u.msglist.current);
|
||||
r->u.msglist.token = r->u.msglist.latest;
|
||||
meshms_message_iterator_close(&r->u.msglist.iter);
|
||||
time_ms_t wake_at = now + config.api.restful.newsince_poll_ms;
|
||||
if (wake_at > r->u.msglist.end_time)
|
||||
@ -411,6 +415,8 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
|
||||
}
|
||||
r->u.msglist.phase = LIST_END;
|
||||
} else {
|
||||
r->u.msglist.current.which_ply = r->u.msglist.iter.which_ply;
|
||||
r->u.msglist.current.offset = r->u.msglist.iter.offset;
|
||||
switch (r->u.msglist.iter.type) {
|
||||
case MESSAGE_SENT:
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
@ -463,33 +469,7 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
|
||||
strbuf_puts(b, "]");
|
||||
break;
|
||||
case ACK_RECEIVED:
|
||||
// Don't send old (irrelevant) ACKs.
|
||||
if (r->u.msglist.iter.ack_offset > r->u.msglist.highest_ack_offset) {
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_json_string(b, "ACK");
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->their_ply.bundle_id, r->u.msglist.iter.offset));
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, r->u.msglist.iter.text);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 1);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, r->u.msglist.iter.read);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%d", r->u.msglist.iter.timestamp);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.ack_offset);
|
||||
strbuf_puts(b, "]");
|
||||
r->u.msglist.highest_ack_offset = r->u.msglist.iter.ack_offset;
|
||||
}
|
||||
_messagelist_json_ack(r, b, r->u.msglist.current);
|
||||
break;
|
||||
}
|
||||
if (!strbuf_overrun(b)) {
|
||||
@ -516,6 +496,40 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void _messagelist_json_ack(struct httpd_request *r, strbuf b, struct newsince_position pos)
|
||||
{
|
||||
// Don't send old (irrelevant) ACKs.
|
||||
if (r->u.msglist.iter.latest_ack_my_offset > r->u.msglist.highest_ack_offset) {
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_json_string(b, "ACK");
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_offset);
|
||||
strbuf_putc(b, ',');
|
||||
// Same token as the message row just sent.
|
||||
strbuf_json_string(b, alloca_meshms_token(
|
||||
pos.which_ply == MY_PLY ? r->u.msglist.iter.my_ply_bid : r->u.msglist.iter.their_ply_bid,
|
||||
pos.offset));
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_null(b);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 1);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 0);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_null(b); // no timestamp on ACKs
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.latest_ack_my_offset);
|
||||
strbuf_puts(b, "]");
|
||||
r->u.msglist.highest_ack_offset = r->u.msglist.iter.latest_ack_my_offset;
|
||||
}
|
||||
}
|
||||
|
||||
static HTTP_REQUEST_PARSER restful_meshms_sendmessage_end;
|
||||
static int send_mime_part_start(struct http_request *);
|
||||
static int send_mime_part_end(struct http_request *);
|
||||
|
@ -179,6 +179,9 @@ test_MeshmsListMessagesNewSince() {
|
||||
>tmp
|
||||
[ $i -gt 0 ] && sed -n -e "1,${i}p" messages >tmp
|
||||
tfw_preserve tmp
|
||||
# If tmp contains an ACK_RECEIVED, then so should messages$i. Otherwise,
|
||||
# ignore any ACK_RECEIVED in messages$i.
|
||||
grep -q 'type=ACK_RECEIVED' tmp || sed -i -e '/type=ACK_RECEIVED/d' messages$i
|
||||
assert cmp tmp messages$i
|
||||
done
|
||||
}
|
||||
@ -217,7 +220,12 @@ test_MeshmsListMessagesNewSinceArrival() {
|
||||
wait_until [ -e messages1 -a -e messages2 -a -e messages3 ]
|
||||
for message in '>Rumplestiltskin' 'A' '<Howdydoody' '>Eulenspiegel'; do
|
||||
meshms_add_messages $SIDA1 $SIDA2 "${message:0:1}" "${message:1}"
|
||||
wait_until grepall "${message:1}" messages{1,2,3}
|
||||
case ${message:0:1} in
|
||||
'<'|'>') waitfor="${message:1}";;
|
||||
'A') waitfor="ACK";;
|
||||
*) error "message=${message}";;
|
||||
esac
|
||||
wait_until grepall "$waitfor" messages{1,2,3}
|
||||
done
|
||||
fork_terminate_all
|
||||
fork_wait_all
|
||||
|
@ -325,8 +325,11 @@ test_MeshmsListMessagesNewSince() {
|
||||
for ((i = 0; i < NROWS; i += 3)); do
|
||||
transform_list_json messagelist$i.json messages$i.json
|
||||
tfw_preserve messages$i.json
|
||||
{ echo '{"a":'; cat messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
|
||||
assertJq tmp.json '.a[:'$i'] == .b'
|
||||
{ echo '{"a":'; jq '.[:'$i']' messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
|
||||
# If an ACK is new since the token, then it will be in its correct place
|
||||
# in messages$i.json, otherwise the ACK will appear in messages$i.json
|
||||
# anyway.
|
||||
assertJq tmp.json '.a == (if .a | contains([{type:"ACK"}]) then .b else .b | map(select(.type != "ACK")) end)'
|
||||
done
|
||||
}
|
||||
|
||||
@ -363,7 +366,7 @@ setup_MeshmsListMessagesNewSinceArrival() {
|
||||
}
|
||||
test_MeshmsListMessagesNewSinceArrival() {
|
||||
for i in 1 2 3; do
|
||||
fork %curl$i executeOk curl \
|
||||
fork %curl$i executeOk --timeout=360 curl \
|
||||
--silent --fail --show-error \
|
||||
--no-buffer \
|
||||
--output newsince$i.json \
|
||||
|
Loading…
Reference in New Issue
Block a user