mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-02-20 09:26:37 +00:00
Add restful api for listing broadcast messages
This commit is contained in:
parent
c2956568d6
commit
09d04d7c8f
14
httpd.h
14
httpd.h
@ -193,7 +193,7 @@ typedef struct httpd_request
|
||||
}
|
||||
msglist;
|
||||
|
||||
/* For responses that send a MeshMS message.
|
||||
/* For responses that send a MeshMS / MeshMB message.
|
||||
*/
|
||||
struct {
|
||||
// Which part is currently being received
|
||||
@ -204,7 +204,17 @@ typedef struct httpd_request
|
||||
struct form_buf_malloc message;
|
||||
}
|
||||
sendmsg;
|
||||
|
||||
|
||||
struct{
|
||||
struct message_ply_read ply_reader;
|
||||
enum list_phase phase;
|
||||
uint64_t start_offset;
|
||||
uint64_t current_offset;
|
||||
uint64_t end_offset;
|
||||
size_t rowcount;
|
||||
time_ms_t end_time;
|
||||
time_s_t timestamp;
|
||||
} plylist;
|
||||
|
||||
struct {
|
||||
int fd;
|
||||
|
225
meshmb_restful.c
225
meshmb_restful.c
@ -1,4 +1,5 @@
|
||||
#include "serval.h"
|
||||
#include "dataformats.h"
|
||||
#include "conf.h"
|
||||
#include "httpd.h"
|
||||
#include "str.h"
|
||||
@ -111,6 +112,221 @@ static int restful_meshmb_send(httpd_request *r, const char *remainder)
|
||||
return 1;
|
||||
}
|
||||
|
||||
static strbuf position_token_to_str(strbuf b, uint64_t position)
|
||||
{
|
||||
uint8_t tmp[12];
|
||||
char tmp_str[BASE64_ENCODED_LEN(12)+1];
|
||||
|
||||
int len = pack_uint(tmp, position);
|
||||
assert(len <= (int)sizeof tmp);
|
||||
size_t n = base64url_encode(tmp_str, tmp, len);
|
||||
tmp_str[n] = '\0';
|
||||
return strbuf_puts(b, tmp_str);
|
||||
}
|
||||
|
||||
static int strn_to_position_token(const char *str, uint64_t *position, const char **afterp)
|
||||
{
|
||||
uint8_t token[12];
|
||||
size_t token_len = base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL);
|
||||
|
||||
int unpacked;
|
||||
if ((unpacked = unpack_uint(token, token_len, position))==-1){
|
||||
*position = 0;
|
||||
*afterp=str;
|
||||
}else
|
||||
(*afterp)++;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int next_ply_message(httpd_request *r){
|
||||
if (!message_ply_is_open(&r->u.plylist.ply_reader)){
|
||||
if (message_ply_read_open(&r->u.plylist.ply_reader, &r->bid)==-1)
|
||||
return -1;
|
||||
|
||||
// skip back to where we were
|
||||
if (r->u.plylist.current_offset)
|
||||
r->u.plylist.ply_reader.read.offset = r->u.plylist.current_offset;
|
||||
|
||||
DEBUGF(httpd, "Opened ply @%"PRIu64, r->u.plylist.ply_reader.read.offset);
|
||||
}
|
||||
|
||||
if (r->u.plylist.current_offset==0){
|
||||
// enumerate everything from the top
|
||||
DEBUGF(httpd, "Started reading @%"PRIu64, r->u.plylist.ply_reader.read.length);
|
||||
r->u.plylist.current_offset =
|
||||
r->u.plylist.start_offset =
|
||||
r->u.plylist.ply_reader.read.offset =
|
||||
r->u.plylist.ply_reader.read.length;
|
||||
}
|
||||
|
||||
while(message_ply_read_prev(&r->u.plylist.ply_reader) == 0){
|
||||
r->u.plylist.current_offset = r->u.plylist.ply_reader.record_end_offset;
|
||||
if (r->u.plylist.current_offset <= r->u.plylist.end_offset){
|
||||
DEBUGF(httpd, "Hit end %"PRIu64" @%"PRIu64,
|
||||
r->u.plylist.end_offset, r->u.plylist.current_offset);
|
||||
break;
|
||||
}
|
||||
|
||||
switch(r->u.plylist.ply_reader.type){
|
||||
case MESSAGE_BLOCK_TYPE_TIME:
|
||||
if (r->u.plylist.ply_reader.record_length<4){
|
||||
WARN("Malformed ply, expected 4 byte timestamp");
|
||||
continue;
|
||||
}
|
||||
r->u.plylist.timestamp = read_uint32(r->u.plylist.ply_reader.record);
|
||||
break;
|
||||
|
||||
case MESSAGE_BLOCK_TYPE_MESSAGE:
|
||||
return 1;
|
||||
|
||||
case MESSAGE_BLOCK_TYPE_ACK:
|
||||
// TODO, link to some other ply?
|
||||
break;
|
||||
|
||||
default:
|
||||
//ignore unknown types
|
||||
break;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int restful_meshmb_list_json_content_chunk(struct http_request *hr, strbuf b)
|
||||
{
|
||||
httpd_request *r = (httpd_request *) hr;
|
||||
// The "my_sid" and "their_sid" per-conversation fields allow the same JSON structure to be used
|
||||
// in a future, non-SID-specific request, eg, to list all conversations for all currently open
|
||||
// identities.
|
||||
const char *headers[] = {
|
||||
"offset",
|
||||
"token",
|
||||
"text",
|
||||
"timestamp"
|
||||
};
|
||||
|
||||
DEBUGF(httpd, "Phase %d", r->u.plylist.phase);
|
||||
|
||||
switch (r->u.plylist.phase) {
|
||||
case LIST_HEADER:
|
||||
strbuf_puts(b, "{\n\"header\":[");
|
||||
unsigned i;
|
||||
for (i = 0; i != NELS(headers); ++i) {
|
||||
if (i)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_json_string(b, headers[i]);
|
||||
}
|
||||
strbuf_puts(b, "],\n\"rows\":[");
|
||||
if (!strbuf_overrun(b))
|
||||
r->u.plylist.phase = LIST_ROWS;
|
||||
return 1;
|
||||
|
||||
ROWS:
|
||||
case LIST_ROWS:
|
||||
case LIST_FIRST:
|
||||
|
||||
if (!message_ply_is_open(&r->u.plylist.ply_reader)){
|
||||
// re-load the current message text
|
||||
if (next_ply_message(r)!=1)
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (r->u.plylist.rowcount!=0)
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_puts(b, "\n[");
|
||||
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.plylist.current_offset);
|
||||
strbuf_puts(b, ",\"");
|
||||
position_token_to_str(b, r->u.plylist.current_offset);
|
||||
strbuf_puts(b, "\",");
|
||||
strbuf_json_string(b, (const char *)r->u.plylist.ply_reader.record);
|
||||
strbuf_putc(b, ',');
|
||||
strbuf_sprintf(b, "%d", r->u.plylist.timestamp);
|
||||
strbuf_puts(b, "]");
|
||||
|
||||
if (!strbuf_overrun(b)) {
|
||||
++r->u.plylist.rowcount;
|
||||
if (next_ply_message(r)!=1)
|
||||
r->u.plylist.phase = LIST_END;
|
||||
}
|
||||
return 1;
|
||||
|
||||
END:
|
||||
r->u.plylist.phase = LIST_END;
|
||||
case LIST_END:
|
||||
|
||||
{
|
||||
time_ms_t now;
|
||||
// during a new-since request, we don't really want to end until the time limit has elapsed
|
||||
if (r->u.plylist.end_time && (now = gettime_ms()) < r->u.plylist.end_time) {
|
||||
// where we started this time, will become where we end on the next pass;
|
||||
r->u.plylist.end_offset = r->u.plylist.start_offset;
|
||||
r->u.plylist.current_offset = 0;
|
||||
r->u.plylist.phase = LIST_ROWS;
|
||||
|
||||
if (r->u.plylist.ply_reader.read.length > r->u.plylist.start_offset && next_ply_message(r)==1)
|
||||
// new content arrived while we were iterating, we can resume immediately
|
||||
goto ROWS;
|
||||
|
||||
message_ply_read_close(&r->u.plylist.ply_reader);
|
||||
http_request_pause_response(&r->http, r->u.plylist.end_time);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
strbuf_puts(b, "\n]\n}\n");
|
||||
if (!strbuf_overrun(b))
|
||||
r->u.plylist.phase = LIST_DONE;
|
||||
// fall through...
|
||||
case LIST_DONE:
|
||||
return 0;
|
||||
}
|
||||
abort();
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int restful_meshmb_list_json_content(struct http_request *hr, unsigned char *buf, size_t bufsz, struct http_content_generator_result *result)
|
||||
{
|
||||
return generate_http_content_from_strbuf_chunks(hr, (char *)buf, bufsz, result, restful_meshmb_list_json_content_chunk);
|
||||
}
|
||||
|
||||
static void list_on_rhizome_add(httpd_request *r, rhizome_manifest *m)
|
||||
{
|
||||
if (strcmp(m->service, RHIZOME_SERVICE_MESHMB) == 0
|
||||
&& cmp_rhizome_bid_t(&m->keypair.public_key, &r->bid)==0) {
|
||||
message_ply_read_close(&r->u.plylist.ply_reader);
|
||||
http_request_resume_response(&r->http);
|
||||
}
|
||||
}
|
||||
|
||||
static void list_finalise(httpd_request *r)
|
||||
{
|
||||
message_ply_read_close(&r->u.plylist.ply_reader);
|
||||
}
|
||||
|
||||
static int restful_meshmb_list(httpd_request *r, const char *remainder)
|
||||
{
|
||||
if (*remainder)
|
||||
return 404;
|
||||
assert(r->finalise_union == NULL);
|
||||
r->finalise_union = list_finalise;
|
||||
r->trigger_rhizome_bundle_added = list_on_rhizome_add;
|
||||
r->u.plylist.phase = LIST_HEADER;
|
||||
r->u.plylist.rowcount = 0;
|
||||
r->u.plylist.end_offset = r->ui64;
|
||||
|
||||
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_meshmb_list_json_content);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int restful_meshmb_newsince_list(httpd_request *r, const char *remainder)
|
||||
{
|
||||
int ret;
|
||||
if ((ret = restful_meshmb_list(r, remainder))==1){
|
||||
r->u.plylist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
DECLARE_HANDLER("/restful/meshmb/", restful_meshmb_);
|
||||
static int restful_meshmb_(httpd_request *r, const char *remainder)
|
||||
{
|
||||
@ -131,6 +347,15 @@ static int restful_meshmb_(httpd_request *r, const char *remainder)
|
||||
handler = restful_meshmb_send;
|
||||
verb = HTTP_VERB_POST;
|
||||
remainder = "";
|
||||
} else if (strcmp(remainder, "/messagelist.json") == 0) {
|
||||
handler = restful_meshmb_list;
|
||||
remainder = "";
|
||||
r->ui64 = 0;
|
||||
} else if ( str_startswith(remainder, "/newsince/", &end)
|
||||
&& strn_to_position_token(end, &r->ui64, &end)
|
||||
&& strcmp(end, "messagelist.json") == 0) {
|
||||
handler = restful_meshmb_newsince_list;
|
||||
remainder = "";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,10 +291,8 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques
|
||||
r->u.mclist.phase = LIST_END;
|
||||
// fall through...
|
||||
} else {
|
||||
if (r->u.mclist.phase==LIST_ROWS)
|
||||
if (r->u.mclist.rowcount!=0)
|
||||
strbuf_putc(b, ',');
|
||||
else
|
||||
r->u.mclist.phase=LIST_ROWS;
|
||||
strbuf_puts(b, "\n[");
|
||||
strbuf_sprintf(b, "%zu", r->u.mclist.rowcount);
|
||||
strbuf_putc(b, ',');
|
||||
@ -309,6 +307,7 @@ static int restful_meshms_conversationlist_json_content_chunk(struct http_reques
|
||||
strbuf_sprintf(b, "%"PRIu64, r->u.mclist.iter.current->metadata.read_offset);
|
||||
strbuf_puts(b, "]");
|
||||
if (!strbuf_overrun(b)) {
|
||||
r->u.mclist.phase=LIST_ROWS;
|
||||
meshms_conversation_iterator_advance(&r->u.mclist.iter);
|
||||
++r->u.mclist.rowcount;
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ includeTests vomp
|
||||
includeTests keyringrestful
|
||||
includeTests rhizomerestful
|
||||
includeTests meshmsrestful
|
||||
includeTests meshmbrestful
|
||||
if type -p "$JAVAC" >/dev/null; then
|
||||
includeTests jni
|
||||
includeTests keyringjava
|
||||
|
@ -13,6 +13,7 @@ setup_instance() {
|
||||
set debug.http_server on \
|
||||
set debug.httpd on \
|
||||
set debug.meshmb on \
|
||||
set debug.meshms on \
|
||||
set debug.verbose on \
|
||||
set log.console.level debug
|
||||
set_extra_config
|
||||
@ -48,8 +49,8 @@ teardown() {
|
||||
report_all_servald_servers
|
||||
}
|
||||
|
||||
doc_MeshmbSend="Send a broadcast message"
|
||||
test_MeshmbSend() {
|
||||
doc_MeshMBRestSend="Restful send of a broadcast message"
|
||||
test_MeshMBRestSend() {
|
||||
executeOk curl \
|
||||
-H "Expect:" \
|
||||
--silent --fail --show-error \
|
||||
@ -62,5 +63,37 @@ test_MeshmbSend() {
|
||||
tfw_cat -h broadcast
|
||||
}
|
||||
|
||||
doc_MeshMBRestList="Restful list of meshmb messages"
|
||||
setup_MeshMBRestList() {
|
||||
setup
|
||||
executeOk_servald meshmb send $IDA1 "Message 1"
|
||||
executeOk_servald meshmb send $IDA1 "Message 2"
|
||||
}
|
||||
test_MeshMBRestList() {
|
||||
executeOk curl \
|
||||
-H "Expect:" \
|
||||
--silent --fail --show-error \
|
||||
--output listmessages.json \
|
||||
--dump-header http.headers \
|
||||
--basic --user harry:potter \
|
||||
"http://$addr_localhost:$PORTA/restful/meshmb/$IDA1/messagelist.json"
|
||||
tfw_cat http.headers listmessages.json
|
||||
tfw_preserve listmessages.json
|
||||
assert [ "$(jq '.rows | length' listmessages.json)" = 2 ]
|
||||
transform_list_json listmessages.json list.json
|
||||
tfw_preserve list.json
|
||||
assertJq list.json \
|
||||
"contains([
|
||||
{ offset: 12,
|
||||
text: \"Message 1\"
|
||||
}
|
||||
])"
|
||||
assertJq list.json \
|
||||
"contains([
|
||||
{ offset: 30,
|
||||
text: \"Message 2\"
|
||||
}
|
||||
])"
|
||||
}
|
||||
runTests "$@"
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user