mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-01-29 15:43:56 +00:00
Implement HTTP GET /restful/meshms/<SID>/<SID>/newsince/<token>/messagelist.json
Two new test cases
This commit is contained in:
parent
ebe444ffe3
commit
f4249707a4
10
meshms.c
10
meshms.c
@ -723,6 +723,8 @@ int meshms_message_iterator_open(struct meshms_message_iterator *iter, const sid
|
||||
iter->_my_sid = *me;
|
||||
iter->my_sid = &iter->_my_sid;
|
||||
iter->their_sid = &iter->_conv->them;
|
||||
iter->my_ply_bid = &iter->_conv->my_ply.bundle_id;
|
||||
iter->their_ply_bid = &iter->_conv->their_ply.bundle_id;
|
||||
iter->read_offset = iter->_conv->read_offset;
|
||||
// If I have never sent a message (or acked any of theirs), there are no messages in the thread.
|
||||
if (iter->_conv->found_my_ply) {
|
||||
@ -759,6 +761,11 @@ fail:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int meshms_message_iterator_is_open(const struct meshms_message_iterator *iter)
|
||||
{
|
||||
return iter->_conv != NULL;
|
||||
}
|
||||
|
||||
void meshms_message_iterator_close(struct meshms_message_iterator *iter)
|
||||
{
|
||||
if (iter->_my_manifest) {
|
||||
@ -772,6 +779,7 @@ void meshms_message_iterator_close(struct meshms_message_iterator *iter)
|
||||
iter->_their_manifest = NULL;
|
||||
}
|
||||
meshms_free_conversations(iter->_conv);
|
||||
iter->_conv = NULL;
|
||||
}
|
||||
|
||||
int meshms_message_iterator_prev(struct meshms_message_iterator *iter)
|
||||
@ -789,6 +797,7 @@ int meshms_message_iterator_prev(struct meshms_message_iterator *iter)
|
||||
DEBUGF("Reading other log from %"PRId64", to %"PRId64, iter->_their_reader.read.offset, iter->_end_range);
|
||||
if ((ret = ply_read_prev(&iter->_their_reader)) == -1)
|
||||
break;
|
||||
iter->which_ply = THEIR_PLY;
|
||||
if (ret == 0 && iter->_their_reader.read.offset >= iter->_end_range) {
|
||||
switch (iter->_their_reader.type) {
|
||||
case MESHMS_BLOCK_TYPE_ACK:
|
||||
@ -813,6 +822,7 @@ int meshms_message_iterator_prev(struct meshms_message_iterator *iter)
|
||||
else if ((ret = ply_read_prev(&iter->_my_reader)) == 0) {
|
||||
if (config.debug.meshms)
|
||||
DEBUGF("Offset %"PRId64", type %d, read_offset %"PRId64, iter->_my_reader.read.offset, iter->_my_reader.type, iter->read_offset);
|
||||
iter->which_ply = MY_PLY;
|
||||
switch (iter->_my_reader.type) {
|
||||
case MESHMS_BLOCK_TYPE_ACK:
|
||||
// Read the received messages up to the ack'ed offset
|
||||
|
4
meshms.h
4
meshms.h
@ -112,10 +112,13 @@ struct meshms_message_iterator {
|
||||
// Public fields that remain fixed for the life of the iterator:
|
||||
const sid_t *my_sid;
|
||||
const sid_t *their_sid;
|
||||
const rhizome_bid_t *my_ply_bid;
|
||||
const rhizome_bid_t *their_ply_bid;
|
||||
uint64_t latest_ack_offset; // offset in remote (their) ply of most recent ACK
|
||||
uint64_t latest_ack_my_offset; // offset in my ply of most recent message ACKed by them
|
||||
uint64_t read_offset; // offset in remote (their) ply of most recent message read by me
|
||||
// The following public fields change per message:
|
||||
enum meshms_which_ply { MY_PLY, THEIR_PLY } which_ply;
|
||||
enum { MESSAGE_SENT, MESSAGE_RECEIVED, ACK_RECEIVED } type;
|
||||
// For MESSAGE_SENT 'offset' is the byte position within the local ply
|
||||
// (mine). For MESSAGE_RECEIVED and ACK_RECEIVED, it is the byte position
|
||||
@ -138,6 +141,7 @@ struct meshms_message_iterator {
|
||||
bool_t _in_ack;
|
||||
};
|
||||
int meshms_message_iterator_open(struct meshms_message_iterator *, const sid_t *me, const sid_t *them);
|
||||
int meshms_message_iterator_is_open(const struct meshms_message_iterator *);
|
||||
void meshms_message_iterator_close(struct meshms_message_iterator *);
|
||||
int meshms_message_iterator_prev(struct meshms_message_iterator *);
|
||||
|
||||
|
253
rhizome_http.c
253
rhizome_http.c
@ -1056,8 +1056,35 @@ static int restful_rhizome_bid_decrypted_bin(rhizome_http_request *r, const char
|
||||
return 1;
|
||||
}
|
||||
|
||||
#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(sizeof(rhizome_bid_t) + sizeof(uint64_t)))
|
||||
#define alloca_meshms_token(bid, offset) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (bid), (offset))
|
||||
|
||||
static char *meshms_token_to_str(char *buf, const rhizome_bid_t *bid, uint64_t offset)
|
||||
{
|
||||
struct iovec iov[2];
|
||||
iov[0].iov_base = (void *) bid->binary;
|
||||
iov[0].iov_len = sizeof bid->binary;
|
||||
iov[1].iov_base = &offset;
|
||||
iov[1].iov_len = sizeof offset;
|
||||
size_t n = base64url_encodev(buf, iov, 2);
|
||||
assert(n == MESHMS_TOKEN_STRLEN);
|
||||
buf[n] = '\0';
|
||||
return buf;
|
||||
}
|
||||
|
||||
static int strn_to_meshms_token(const char *str, rhizome_bid_t *bidp, uint64_t *offsetp, const char **afterp)
|
||||
{
|
||||
unsigned char token[sizeof bidp->binary + sizeof *offsetp];
|
||||
if (base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL) != sizeof token)
|
||||
return 0;
|
||||
memcpy(bidp->binary, token, sizeof bidp->binary);
|
||||
memcpy(offsetp, token + sizeof bidp->binary, sizeof *offsetp);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static HTTP_HANDLER restful_meshms_conversationlist_json;
|
||||
static HTTP_HANDLER restful_meshms_messagelist_json;
|
||||
static HTTP_HANDLER restful_meshms_newsince_messagelist_json;
|
||||
|
||||
static int restful_meshms_(rhizome_http_request *r, const char *remainder)
|
||||
{
|
||||
@ -1077,6 +1104,13 @@ static int restful_meshms_(rhizome_http_request *r, const char *remainder)
|
||||
handler = restful_meshms_messagelist_json;
|
||||
remainder = "";
|
||||
}
|
||||
else if ( str_startswith(remainder, "/newsince/", &end)
|
||||
&& strn_to_meshms_token(end, &r->bid, &r->ui64, &end)
|
||||
&& strcmp(end, "/messagelist.json") == 0
|
||||
) {
|
||||
handler = restful_meshms_newsince_messagelist_json;
|
||||
remainder = "";
|
||||
}
|
||||
}
|
||||
}
|
||||
if (handler == NULL)
|
||||
@ -1181,24 +1215,23 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques
|
||||
return 0;
|
||||
}
|
||||
|
||||
#define MESHMS_TOKEN_STRLEN (BASE64_ENCODED_LEN(sizeof(rhizome_bid_t) + sizeof(uint64_t)))
|
||||
#define alloca_meshms_token(bid, offset) meshms_token_to_str(alloca(MESHMS_TOKEN_STRLEN + 1), (bid), (offset))
|
||||
|
||||
static char *meshms_token_to_str(char *buf, const rhizome_bid_t *bid, uint64_t offset)
|
||||
{
|
||||
struct iovec iov[2];
|
||||
iov[0].iov_base = (void *) bid->binary;
|
||||
iov[0].iov_len = sizeof bid->binary;
|
||||
iov[1].iov_base = &offset;
|
||||
iov[1].iov_len = sizeof offset;
|
||||
size_t n = base64url_encodev(buf, iov, 2);
|
||||
assert(n == MESHMS_TOKEN_STRLEN);
|
||||
buf[n] = '\0';
|
||||
return buf;
|
||||
}
|
||||
|
||||
static HTTP_CONTENT_GENERATOR restful_meshms_messagelist_json_content;
|
||||
|
||||
static int reopen_meshms_message_iterator(rhizome_http_request *r)
|
||||
{
|
||||
if (!meshms_message_iterator_is_open(&r->u.msglist.iter)) {
|
||||
if ( meshms_message_iterator_open(&r->u.msglist.iter, &r->sid1, &r->sid2) == -1
|
||||
|| (r->u.msglist.finished = meshms_message_iterator_prev(&r->u.msglist.iter)) == -1
|
||||
)
|
||||
return -1;
|
||||
if (!r->u.msglist.finished) {
|
||||
r->u.msglist.latest_which_ply = r->u.msglist.iter.which_ply;
|
||||
r->u.msglist.latest_offset = r->u.msglist.iter.offset;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int restful_meshms_messagelist_json(rhizome_http_request *r, const char *remainder)
|
||||
{
|
||||
if (*remainder)
|
||||
@ -1206,10 +1239,33 @@ static int restful_meshms_messagelist_json(rhizome_http_request *r, const char *
|
||||
assert(r->finalise_union == NULL);
|
||||
r->finalise_union = finalise_union_meshms_messagelist;
|
||||
r->u.msglist.rowcount = 0;
|
||||
meshms_message_iterator_open(&r->u.msglist.iter, &r->sid1, &r->sid2);
|
||||
if ((r->u.msglist.finished = meshms_message_iterator_prev(&r->u.msglist.iter)) == -1)
|
||||
return -1;
|
||||
r->u.msglist.phase = LIST_HEADER;
|
||||
r->u.msglist.token_offset = 0;
|
||||
r->u.msglist.end_time = 0;
|
||||
http_request_response_generated(&r->http, 200, "application/json", restful_meshms_messagelist_json_content);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int restful_meshms_newsince_messagelist_json(rhizome_http_request *r, const char *remainder)
|
||||
{
|
||||
if (*remainder)
|
||||
return 404;
|
||||
assert(r->finalise_union == NULL);
|
||||
r->finalise_union = finalise_union_meshms_messagelist;
|
||||
r->u.msglist.rowcount = 0;
|
||||
r->u.msglist.phase = LIST_HEADER;
|
||||
if (reopen_meshms_message_iterator(r) == -1)
|
||||
return -1;
|
||||
if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.my_ply_bid) == 0)
|
||||
r->u.msglist.token_which_ply = MY_PLY;
|
||||
else if (cmp_rhizome_bid_t(&r->bid, r->u.msglist.iter.their_ply_bid) == 0)
|
||||
r->u.msglist.token_which_ply = THEIR_PLY;
|
||||
else {
|
||||
http_request_simple_response(&r->http, 404, "Invalid token");
|
||||
return 404;
|
||||
}
|
||||
r->u.msglist.token_offset = r->ui64;
|
||||
r->u.msglist.end_time = gettime_ms() + config.rhizome.api.restful.newsince_timeout * 1000;
|
||||
http_request_response_generated(&r->http, 200, "application/json", restful_meshms_messagelist_json_content);
|
||||
return 1;
|
||||
}
|
||||
@ -1218,6 +1274,9 @@ static HTTP_CONTENT_GENERATOR_STRBUF_CHUNKER restful_meshms_messagelist_json_con
|
||||
|
||||
static int restful_meshms_messagelist_json_content(struct http_request *hr, unsigned char *buf, size_t bufsz, struct http_content_generator_result *result)
|
||||
{
|
||||
rhizome_http_request *r = (rhizome_http_request *) hr;
|
||||
if (reopen_meshms_message_iterator(r) == -1)
|
||||
return -1;
|
||||
return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshms_messagelist_json_content_chunk);
|
||||
}
|
||||
|
||||
@ -1239,10 +1298,14 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
|
||||
};
|
||||
switch (r->u.msglist.phase) {
|
||||
case LIST_HEADER:
|
||||
strbuf_sprintf(b, "{\n\"read_offset\":%"PRIu64",\n\"latest_ack_my_offset\":%"PRIu64",\n\"header\":[",
|
||||
r->u.msglist.iter.read_offset,
|
||||
r->u.msglist.iter.latest_ack_my_offset
|
||||
);
|
||||
strbuf_puts(b, "{\n");
|
||||
if (!r->u.msglist.end_time) {
|
||||
strbuf_sprintf(b, "\"read_offset\":%"PRIu64",\n\"latest_ack_offset\":%"PRIu64",\n",
|
||||
r->u.msglist.iter.read_offset,
|
||||
r->u.msglist.iter.latest_ack_my_offset
|
||||
);
|
||||
}
|
||||
strbuf_puts(b, "\"header\":[");
|
||||
unsigned i;
|
||||
for (i = 0; i != NELS(headers); ++i) {
|
||||
if (i)
|
||||
@ -1254,61 +1317,51 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
|
||||
r->u.msglist.phase = r->u.msglist.finished ? LIST_END : LIST_ROWS;
|
||||
return 1;
|
||||
case LIST_ROWS:
|
||||
if (!r->u.msglist.finished) {
|
||||
switch (r->u.msglist.iter.type) {
|
||||
case MESSAGE_SENT:
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_json_string(b, ">");
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->my_ply.bundle_id, r->u.msglist.iter.offset));
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, r->u.msglist.iter.text);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, r->u.msglist.iter.delivered);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 0);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_null(b);
|
||||
strbuf_puts(b, "]");
|
||||
break;
|
||||
case MESSAGE_RECEIVED:
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_json_string(b, "<");
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->their_ply.bundle_id, r->u.msglist.iter.offset));
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, r->u.msglist.iter.text);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 1);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, r->u.msglist.iter.read);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_null(b);
|
||||
strbuf_puts(b, "]");
|
||||
break;
|
||||
case ACK_RECEIVED:
|
||||
// Don't send old (irrelevant) ACKs.
|
||||
if (r->u.msglist.iter.ack_offset > r->u.msglist.highest_ack_offset) {
|
||||
{
|
||||
if ( r->u.msglist.finished
|
||||
|| (r->u.msglist.token_which_ply == r->u.msglist.iter.which_ply && r->u.msglist.iter.offset <= r->u.msglist.token_offset)
|
||||
) {
|
||||
time_ms_t now;
|
||||
if (r->u.msglist.end_time && (now = gettime_ms()) < r->u.msglist.end_time) {
|
||||
r->u.msglist.token_which_ply = r->u.msglist.latest_which_ply;
|
||||
r->u.msglist.token_offset = r->u.msglist.latest_offset;
|
||||
meshms_message_iterator_close(&r->u.msglist.iter);
|
||||
time_ms_t wake_at = now + config.rhizome.api.restful.newsince_poll_ms;
|
||||
if (wake_at > r->u.msglist.end_time)
|
||||
wake_at = r->u.msglist.end_time;
|
||||
http_request_pause_response(&r->http, wake_at);
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
switch (r->u.msglist.iter.type) {
|
||||
case MESSAGE_SENT:
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_json_string(b, "ACK");
|
||||
strbuf_json_string(b, ">");
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->my_ply.bundle_id, r->u.msglist.iter.offset));
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, r->u.msglist.iter.text);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, r->u.msglist.iter.delivered);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 0);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_null(b);
|
||||
strbuf_puts(b, "]");
|
||||
break;
|
||||
case MESSAGE_RECEIVED:
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_json_string(b, "<");
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
@ -1322,22 +1375,48 @@ static int restful_meshms_messagelist_json_content_chunk(struct http_request *hr
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 1);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 0);
|
||||
strbuf_json_boolean(b, r->u.msglist.iter.read);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.ack_offset);
|
||||
strbuf_json_null(b);
|
||||
strbuf_puts(b, "]");
|
||||
r->u.msglist.highest_ack_offset = r->u.msglist.iter.ack_offset;
|
||||
}
|
||||
break;
|
||||
break;
|
||||
case ACK_RECEIVED:
|
||||
// Don't send old (irrelevant) ACKs.
|
||||
if (r->u.msglist.iter.ack_offset > r->u.msglist.highest_ack_offset) {
|
||||
if (r->u.msglist.rowcount != 0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_json_string(b, "ACK");
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.my_sid->binary, sizeof r->u.msglist.iter.my_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_hex(b, r->u.msglist.iter.their_sid->binary, sizeof r->u.msglist.iter.their_sid->binary);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.offset);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, alloca_meshms_token(&r->u.msglist.iter._conv->their_ply.bundle_id, r->u.msglist.iter.offset));
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, r->u.msglist.iter.text);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, 1);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_boolean(b, r->u.msglist.iter.read);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.msglist.iter.ack_offset);
|
||||
strbuf_puts(b, "]");
|
||||
r->u.msglist.highest_ack_offset = r->u.msglist.iter.ack_offset;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (!strbuf_overrun(b)) {
|
||||
++r->u.msglist.rowcount;
|
||||
if ((r->u.msglist.finished = meshms_message_iterator_prev(&r->u.msglist.iter)) == -1)
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
if (!strbuf_overrun(b)) {
|
||||
++r->u.msglist.rowcount;
|
||||
if ((r->u.msglist.finished = meshms_message_iterator_prev(&r->u.msglist.iter)) == -1)
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
r->u.msglist.phase = LIST_END;
|
||||
}
|
||||
r->u.msglist.phase = LIST_END;
|
||||
// fall through...
|
||||
case LIST_END:
|
||||
strbuf_puts(b, "\n]\n}\n");
|
||||
|
@ -47,6 +47,14 @@ typedef struct rhizome_http_request
|
||||
sid_t sid1;
|
||||
sid_t sid2;
|
||||
|
||||
/* For requests/responses that contain a Rhizome Bundle ID.
|
||||
*/
|
||||
rhizome_bid_t bid;
|
||||
|
||||
/* For requests/responses that contain a 64-bit unsigned integer (eg, SQLite ROWID, byte offset).
|
||||
*/
|
||||
uint64_t ui64;
|
||||
|
||||
/* Finaliser for union contents (below).
|
||||
*/
|
||||
void (*finalise_union)(struct rhizome_http_request *);
|
||||
@ -132,11 +140,16 @@ typedef struct rhizome_http_request
|
||||
/* For responses that list MeshMS messages in a single conversation.
|
||||
*/
|
||||
struct {
|
||||
enum meshms_which_ply token_which_ply;
|
||||
uint64_t token_offset;
|
||||
enum meshms_which_ply latest_which_ply;
|
||||
uint64_t latest_offset;
|
||||
time_ms_t end_time;
|
||||
uint64_t highest_ack_offset;
|
||||
enum list_phase phase;
|
||||
size_t rowcount;
|
||||
struct meshms_message_iterator iter;
|
||||
int finished;
|
||||
uint64_t highest_ack_offset;
|
||||
}
|
||||
msglist;
|
||||
|
||||
|
@ -892,7 +892,7 @@ ssize_t rhizome_read(struct rhizome_read *read_state, unsigned char *buffer, siz
|
||||
}
|
||||
|
||||
if (read_state->crypt && buffer && bytes_read>0){
|
||||
dump("before decrypt", buffer, bytes_read);
|
||||
//dump("before decrypt", buffer, bytes_read);
|
||||
if(rhizome_crypt_xor_block(
|
||||
buffer, bytes_read,
|
||||
read_state->offset + read_state->tail,
|
||||
|
@ -76,6 +76,7 @@ setup() {
|
||||
set rhizome.api.restful.users.harry.password potter \
|
||||
set rhizome.api.restful.users.ron.password weasley \
|
||||
set rhizome.api.restful.users.hermione.password grainger
|
||||
set_extra_config
|
||||
if [ -z "$IDENTITY_COUNT" ]; then
|
||||
create_single_identity
|
||||
else
|
||||
@ -96,6 +97,10 @@ teardown() {
|
||||
report_all_servald_servers
|
||||
}
|
||||
|
||||
set_extra_config() {
|
||||
:
|
||||
}
|
||||
|
||||
set_rhizome_config() {
|
||||
executeOk_servald config \
|
||||
set debug.httpd on \
|
||||
@ -280,20 +285,13 @@ test_RhizomeList() {
|
||||
done
|
||||
}
|
||||
|
||||
curl_newsince() {
|
||||
curl \
|
||||
--silent --fail --show-error \
|
||||
--no-buffer \
|
||||
--output "$1" \
|
||||
--basic --user harry:potter \
|
||||
"http://$addr_localhost:$PORTA/restful/rhizome/newsince/$token/bundlelist.json"
|
||||
}
|
||||
|
||||
doc_RhizomeNewSince="HTTP RESTful list Rhizome bundles since token as JSON"
|
||||
setup_RhizomeNewSince() {
|
||||
set_extra_config() {
|
||||
executeOk_servald config set rhizome.api.restful.newsince_timeout 60s \
|
||||
set rhizome.api.restful.newsince_poll_ms 500
|
||||
}
|
||||
setup
|
||||
executeOk_servald config set rhizome.api.restful.newsince_timeout 60s
|
||||
executeOk_servald config set rhizome.api.restful.newsince_poll_ms 500
|
||||
add_bundles 0 5
|
||||
executeOk curl \
|
||||
--silent --fail --show-error \
|
||||
@ -307,14 +305,19 @@ setup_RhizomeNewSince() {
|
||||
assert [ -n "$token" ]
|
||||
}
|
||||
test_RhizomeNewSince() {
|
||||
fork %curl1 curl_newsince newsince1.json
|
||||
fork %curl2 curl_newsince newsince2.json
|
||||
fork %curl3 curl_newsince newsince3.json
|
||||
for i in 1 2 3; do
|
||||
fork %curl$i curl \
|
||||
--silent --fail --show-error \
|
||||
--no-buffer \
|
||||
--output newsince$i.json \
|
||||
--basic --user harry:potter \
|
||||
"http://$addr_localhost:$PORTA/restful/rhizome/newsince/$token/bundlelist.json"
|
||||
done
|
||||
wait_until [ -e newsince1.json -a -e newsince2.json -a -e newsince3.json ]
|
||||
add_bundles 6 10
|
||||
wait_until --timeout=10 grep "${BID[10]}" newsince1.json
|
||||
wait_until --timeout=10 grep "${BID[10]}" newsince2.json
|
||||
wait_until --timeout=10 grep "${BID[10]}" newsince3.json
|
||||
for i in 1 2 3; do
|
||||
wait_until --timeout=10 grep "${BID[10]}" newsince$i.json
|
||||
done
|
||||
fork_terminate_all
|
||||
fork_wait_all
|
||||
for i in 1 2 3; do
|
||||
@ -993,36 +996,43 @@ create_message_file() {
|
||||
|
||||
# Add a sequence of messages of varying sizes up to 1 KiB.
|
||||
add_messages() {
|
||||
NMESSAGE=0
|
||||
NSENT=0
|
||||
NRECV=0
|
||||
NACK=0
|
||||
local symbols="$1"
|
||||
shift
|
||||
local texts=("$@")
|
||||
local sent_since_ack=0
|
||||
local i n
|
||||
local i n size msize
|
||||
local size=0
|
||||
for ((i = 0; i < ${#1}; ++i)); do
|
||||
local sym="${1:$i:1}"
|
||||
for ((i = 0; i < ${#symbols}; ++i)); do
|
||||
local sym="${symbols:$i:1}"
|
||||
let size+=379
|
||||
let msize=size%1021
|
||||
let n=NMESSAGE
|
||||
let n=NMESSAGE++
|
||||
local text="${texts[$i]}"
|
||||
case $sym in
|
||||
'>'|'<')
|
||||
if [ -n "$text" ]; then
|
||||
echo "$text" >text$n
|
||||
else
|
||||
create_message_file text$n $msize
|
||||
text="$(<text$n)"
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
case $sym in
|
||||
'>')
|
||||
MESSAGE[$n]=">"
|
||||
create_message_file send$n $msize
|
||||
executeOk_servald meshms send message $SIDA1 $SIDA2 "$(<send$n)"
|
||||
executeOk_servald meshms send message $SIDA1 $SIDA2 "$text"
|
||||
let ++sent_since_ack
|
||||
let ++NSENT
|
||||
;;
|
||||
'<')
|
||||
MESSAGE[$n]="<"
|
||||
create_message_file recv$n $msize
|
||||
executeOk_servald meshms send message $SIDA2 $SIDA1 "$(<recv$n)"
|
||||
executeOk_servald meshms send message $SIDA2 $SIDA1 "$text"
|
||||
let ++NRECV
|
||||
;;
|
||||
'A')
|
||||
MESSAGE[$n]=ACK
|
||||
>ack$n
|
||||
[ $sent_since_ack -gt 0 ] || error "two ACKs in a row (at position $i)"
|
||||
[ $i -ne 0 -a $sent_since_ack -eq 0 ] && error "two ACKs in a row (at position $i)"
|
||||
executeOk_servald meshms list messages $SIDA2 $SIDA1
|
||||
let ++NACK
|
||||
;;
|
||||
@ -1030,7 +1040,6 @@ add_messages() {
|
||||
error "invalid message symbol '$sym' (at position $i)"
|
||||
;;
|
||||
esac
|
||||
let NMESSAGE=n+1
|
||||
done
|
||||
}
|
||||
|
||||
@ -1039,8 +1048,8 @@ setup_MeshmsListMessages() {
|
||||
IDENTITY_COUNT=2
|
||||
setup
|
||||
add_messages '><>>A>A<>><><><>>>A>A><<<<A<>><>>A<<>'
|
||||
let NROWS=NSENT+NRECV+(NACK?1:0)
|
||||
executeOk_servald meshms list messages $SIDA1 $SIDA2
|
||||
tfw_cat --stdout
|
||||
delivered_offset=$(sed -n -e '/^[0-9]\+:[0-9]\+:ACK:delivered$/{n;s/^[0-9]\+:\([0-9]\+\):>:.*/\1/p;q}' "$TFWSTDOUT")
|
||||
[ -z "$delivered_offset" ] && delivered_offset=0
|
||||
read_offset=$(sed -n -e 's/^[0-9]\+:\([0-9]\+\):MARK:read$/\1/p' "$TFWSTDOUT")
|
||||
@ -1055,7 +1064,7 @@ test_MeshmsListMessages() {
|
||||
"http://$addr_localhost:$PORTA/restful/meshms/$SIDA1/$SIDA2/messagelist.json"
|
||||
tfw_cat http.headers messagelist.json
|
||||
tfw_preserve messagelist.json
|
||||
assert [ "$(jq '.rows | length' messagelist.json)" = $((NSENT + NRECV + (NACK?1:0) )) ]
|
||||
assert [ "$(jq '.rows | length' messagelist.json)" = $NROWS ]
|
||||
transform_list_json messagelist.json messages.json
|
||||
tfw_preserve messages.json
|
||||
seen_ack=false
|
||||
@ -1070,13 +1079,13 @@ test_MeshmsListMessages() {
|
||||
case ${MESSAGE[$j]} in
|
||||
'>')
|
||||
assertJq messages.json '.['$i'].type == ">"'
|
||||
assertJqCmp messages.json '.['$i'].text' send$j
|
||||
assertJqCmp messages.json '.['$i'].text' text$j
|
||||
assertJq messages.json '.['$i'].delivered == (.['$i'].offset <= '$delivered_offset')'
|
||||
let ++i
|
||||
;;
|
||||
'<')
|
||||
assertJq messages.json '.['$i'].type == "<"'
|
||||
assertJqCmp messages.json '.['$i'].text' recv$j
|
||||
assertJqCmp messages.json '.['$i'].text' text$j
|
||||
assertJq messages.json '.['$i'].read == (.['$i'].offset <= '$read_offset')'
|
||||
let ++i
|
||||
;;
|
||||
@ -1091,9 +1100,99 @@ test_MeshmsListMessages() {
|
||||
done
|
||||
}
|
||||
|
||||
doc_MeshmsListMessagesSince="HTTP RESTful list MeshMS messages in one conversation since token as JSON"
|
||||
Xtest_MeshmsListMessagesSince() {
|
||||
:
|
||||
doc_MeshmsListMessagesNewSince="HTTP RESTful list MeshMS messages in one conversation since token as JSON"
|
||||
setup_MeshmsListMessagesNewSince() {
|
||||
IDENTITY_COUNT=2
|
||||
set_extra_config() {
|
||||
executeOk_servald config set rhizome.api.restful.newsince_timeout 1s \
|
||||
set rhizome.api.restful.newsince_poll_ms 500
|
||||
}
|
||||
setup
|
||||
add_messages '><>>A>A<>><><><>>>A>A><<<<A<>><>>A<<>'
|
||||
let NROWS=NSENT+NRECV+(NACK?1:0)
|
||||
executeOk curl \
|
||||
--silent --fail --show-error \
|
||||
--output messagelist.json \
|
||||
--dump-header http.headers \
|
||||
--basic --user harry:potter \
|
||||
"http://$addr_localhost:$PORTA/restful/meshms/$SIDA1/$SIDA2/messagelist.json"
|
||||
assert [ "$(jq '.rows | length' messagelist.json)" = $NROWS ]
|
||||
transform_list_json messagelist.json messages.json
|
||||
tfw_preserve messages.json
|
||||
for ((i = 0; i < NROWS; i += 3)); do
|
||||
token[$i]=$(jq --raw-output '.['$i'].token' messages.json)
|
||||
done
|
||||
}
|
||||
test_MeshmsListMessagesNewSince() {
|
||||
for ((i = 0; i < NROWS; i += 3)); do
|
||||
# At most five requests going at once
|
||||
[ $i -ge 15 ] && fork_wait %curl$((i-15))
|
||||
fork %curl$i executeOk curl \
|
||||
--silent --fail --show-error \
|
||||
--output messagelist$i.json \
|
||||
--dump-header http.headers$i \
|
||||
--basic --user harry:potter \
|
||||
"http://$addr_localhost:$PORTA/restful/meshms/$SIDA1/$SIDA2/newsince/${token[$i]}/messagelist.json"
|
||||
done
|
||||
fork_wait_all
|
||||
for ((i = 0; i < NROWS; i += 3)); do
|
||||
transform_list_json messagelist$i.json messages$i.json
|
||||
tfw_preserve messages$i.json
|
||||
{ echo '{"a":'; cat messages.json; echo ',"b":'; cat messages$i.json; echo '}'; } >tmp.json
|
||||
assertJq tmp.json '.a[:'$i'] == .b'
|
||||
done
|
||||
}
|
||||
|
||||
grepall() {
|
||||
local pattern="$1"
|
||||
shift
|
||||
for file; do
|
||||
grep "$pattern" "$file" || return $?
|
||||
done
|
||||
return 0
|
||||
}
|
||||
|
||||
doc_MeshmsListMessagesNewSinceArrival="HTTP RESTful list newly arriving MeshMS messages in one conversation as JSON"
|
||||
setup_MeshmsListMessagesNewSinceArrival() {
|
||||
IDENTITY_COUNT=2
|
||||
set_extra_config() {
|
||||
executeOk_servald config set rhizome.api.restful.newsince_timeout 60s \
|
||||
set rhizome.api.restful.newsince_poll_ms 500
|
||||
}
|
||||
setup
|
||||
add_messages '><>A>'
|
||||
let NROWS=NSENT+NRECV+(NACK?1:0)
|
||||
executeOk curl \
|
||||
--silent --fail --show-error \
|
||||
--output messagelist.json \
|
||||
--dump-header http.headers \
|
||||
--basic --user harry:potter \
|
||||
"http://$addr_localhost:$PORTA/restful/meshms/$SIDA1/$SIDA2/messagelist.json"
|
||||
assert [ "$(jq '.rows | length' messagelist.json)" = $NROWS ]
|
||||
transform_list_json messagelist.json messages.json
|
||||
tfw_preserve messages.json
|
||||
token=$(jq --raw-output '.[0].token' messages.json)
|
||||
assert [ -n "$token" ]
|
||||
}
|
||||
test_MeshmsListMessagesNewSinceArrival() {
|
||||
for i in 1 2 3; do
|
||||
fork %curl$i executeOk curl \
|
||||
--silent --fail --show-error \
|
||||
--no-buffer \
|
||||
--output newsince$i.json \
|
||||
--basic --user harry:potter \
|
||||
"http://$addr_localhost:$PORTA/restful/meshms/$SIDA1/$SIDA2/newsince/$token/messagelist.json"
|
||||
done
|
||||
wait_until [ -e newsince1.json -a -e newsince2.json -a -e newsince3.json ]
|
||||
for message in '>Rumplestiltskin' 'A' '<Howdydoody' '>Eulenspiegel'; do
|
||||
add_messages "${message:0:1}" "${message:1}"
|
||||
wait_until --timeout=60 grepall "${message:1}" newsince{1,2,3}.json
|
||||
done
|
||||
fork_terminate_all
|
||||
fork_wait_all
|
||||
}
|
||||
teardown_MeshmsListMessagesNewSinceArrival() {
|
||||
tfw_preserve newsince{1,2,3}.json
|
||||
}
|
||||
|
||||
doc_MeshmsSend="HTTP RESTful send MeshMS message"
|
||||
|
Loading…
x
Reference in New Issue
Block a user