Support rhizome newsince request on an empty store

This commit is contained in:
Jeremy Lakeman 2016-07-18 16:19:16 +09:30
parent 94cf79f0f3
commit a854105598
6 changed files with 55 additions and 53 deletions

View File

@ -79,6 +79,8 @@ public class RhizomeBundleList {
rowCount = 0;
if (this.sinceToken == null)
httpConnection = httpConnector.newServalDHttpConnection("/restful/rhizome/bundlelist.json");
else if(this.sinceToken.equals(""))
httpConnection = httpConnector.newServalDHttpConnection("/restful/rhizome/newsince/bundlelist.json");
else
httpConnection = httpConnector.newServalDHttpConnection("/restful/rhizome/newsince/" + this.sinceToken + "/bundlelist.json");
httpConnection.connect();

View File

@ -20,31 +20,30 @@
package org.servalproject.servaldna.rhizome;
import java.lang.StringBuilder;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.List;
import org.servalproject.json.JSONInputException;
import org.servalproject.json.JSONTokeniser;
import org.servalproject.servaldna.BundleId;
import org.servalproject.servaldna.BundleKey;
import org.servalproject.servaldna.BundleSecret;
import org.servalproject.servaldna.FileHash;
import org.servalproject.servaldna.ServalDFailureException;
import org.servalproject.servaldna.ServalDHttpConnectionFactory;
import org.servalproject.servaldna.ServalDInterfaceException;
import org.servalproject.servaldna.ServalDNotImplementedException;
import org.servalproject.servaldna.SubscriberId;
import java.io.IOException;
import java.io.PrintStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URL;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.HttpURLConnection;
import org.servalproject.json.JSONTokeniser;
import org.servalproject.json.JSONInputException;
import org.servalproject.servaldna.BundleId;
import org.servalproject.servaldna.FileHash;
import org.servalproject.servaldna.BundleKey;
import org.servalproject.servaldna.SubscriberId;
import org.servalproject.servaldna.BundleSecret;
import org.servalproject.servaldna.ServalDHttpConnectionFactory;
import org.servalproject.servaldna.ServalDInterfaceException;
import org.servalproject.servaldna.ServalDFailureException;
import org.servalproject.servaldna.ServalDNotImplementedException;
import java.util.List;
import java.util.Map;
public class RhizomeCommon
{
@ -130,6 +129,8 @@ public class RhizomeCommon
Status status = receiveResponse(conn, expected_response_codes);
if (!conn.getContentType().equals("application/json"))
throw new ServalDInterfaceException("unexpected HTTP Content-Type: " + conn.getContentType());
if (status.input_stream == null)
throw new ServalDInterfaceException("unexpected HTTP response: " + status.http_status_code + " " + status.http_status_message);
return new JSONTokeniser(new InputStreamReader(status.input_stream, "UTF-8"));
}

View File

@ -344,6 +344,7 @@ static int restful_meshms_newsince_messagelist_json(httpd_request *r, const char
r->trigger_rhizome_bundle_added = on_rhizome_bundle_added;
r->u.msglist.rowcount = 0;
r->u.msglist.phase = LIST_HEADER;
r->u.msglist.dirty = 1;
enum meshms_status status;
if (meshms_failed(status = reopen_meshms_message_iterator(r)))
return http_request_meshms_response(r, 0, NULL, status);

View File

@ -662,6 +662,7 @@ struct rhizome_list_cursor {
const char *name;
bool_t is_sender_set;
bool_t is_recipient_set;
bool_t oldest_first;
sid_t sender;
sid_t recipient;
// If set, then the cursor moves in ascending (chronological) order starting

View File

@ -1418,13 +1418,14 @@ DEFINE_TRIGGER(bundle_add, trigger_rhizome_bundle_added_debug);
*/
int rhizome_list_open(struct rhizome_list_cursor *c)
{
DEBUGF(rhizome, "c=%p c->service=%s c->name=%s c->sender=%s c->recipient=%s c->rowid_since=%"PRIu64" c->_rowid_last=%"PRIu64,
DEBUGF(rhizome, "c=%p c->service=%s c->name=%s c->sender=%s c->recipient=%s c->rowid_since=%"PRIu64" c->oldest_first=%d c->_rowid_last=%"PRIu64,
c,
alloca_str_toprint(c->service),
alloca_str_toprint(c->name),
c->is_sender_set ? alloca_tohex_sid_t(c->sender) : "UNSET",
c->is_recipient_set ? alloca_tohex_sid_t(c->recipient) : "UNSET",
c->rowid_since,
c->oldest_first,
c->_rowid_last
);
IN();
@ -1438,14 +1439,16 @@ int rhizome_list_open(struct rhizome_list_cursor *c)
strbuf_puts(b, " AND sender = @sender");
if (c->is_recipient_set)
strbuf_puts(b, " AND recipient = @recipient");
if (c->rowid_since) {
strbuf_puts(b, " AND rowid > @last ORDER BY rowid ASC"); // oldest first
if (c->_rowid_last < c->rowid_since)
c->_rowid_last = c->rowid_since;
} else {
if (c->rowid_since)
strbuf_puts(b, " AND rowid > @since");
if (c->oldest_first){
if (c->_rowid_last)
strbuf_puts(b, " AND rowid > @last");
strbuf_puts(b, " ORDER BY rowid ASC");
}else{
if (c->_rowid_last)
strbuf_puts(b, " AND rowid < @last");
strbuf_puts(b, " ORDER BY rowid DESC"); // most recent first
strbuf_puts(b, " ORDER BY rowid DESC");
}
if (strbuf_overrun(b))
RETURN(WHYF("SQL command too long: %s", strbuf_str(b)));
@ -1461,6 +1464,8 @@ int rhizome_list_open(struct rhizome_list_cursor *c)
goto failure;
if (c->is_recipient_set && sqlite_bind(&c->_retry, c->_statement, NAMED|SID_T, "@recipient", &c->recipient, END) == -1)
goto failure;
if (c->rowid_since && sqlite_bind(&c->_retry, c->_statement, NAMED|INT64, "@since", c->rowid_since, END) == -1)
goto failure;
if (c->_rowid_last && sqlite_bind(&c->_retry, c->_statement, NAMED|INT64, "@last", c->_rowid_last, END) == -1)
goto failure;
c->manifest = NULL;
@ -1512,16 +1517,8 @@ int rhizome_list_next(struct rhizome_list_cursor *c)
assert(sqlite3_column_type(c->_statement, 3) == SQLITE_INTEGER);
assert(sqlite3_column_type(c->_statement, 4) == SQLITE_TEXT || sqlite3_column_type(c->_statement, 4) == SQLITE_NULL);
assert(sqlite3_column_type(c->_statement, 5) == SQLITE_INTEGER);
uint64_t q_rowid = sqlite3_column_int64(c->_statement, 5);
if (c->_rowid_current && (c->rowid_since ? q_rowid >= c->_rowid_current : q_rowid <= c->_rowid_current)) {
WHYF("Query returned rowid=%"PRIu64" out of order (last was %"PRIu64") -- skipped", q_rowid, c->_rowid_current);
continue;
}
c->_rowid_current = q_rowid;
if (q_rowid <= c->rowid_since) {
WHYF("Query returned rowid=%"PRIu64" <= rowid_since=%"PRIu64" -- skipped", q_rowid, c->rowid_since);
continue;
}
uint64_t q_rowid = c->_rowid_current = sqlite3_column_int64(c->_statement, 5);
const char *q_manifestid = (const char *) sqlite3_column_text(c->_statement, 0);
const char *manifestblob = (char *) sqlite3_column_blob(c->_statement, 1);
size_t manifestblobsize = sqlite3_column_bytes(c->_statement, 1); // must call after sqlite3_column_blob()
@ -1555,12 +1552,6 @@ int rhizome_list_next(struct rhizome_list_cursor *c)
rhizome_manifest_set_author(m, &author);
rhizome_manifest_set_rowid(m, q_rowid);
rhizome_manifest_set_inserttime(m, q_inserttime);
if (c->service && !(m->service && strcasecmp(c->service, m->service) == 0))
continue;
if (c->is_sender_set && !(m->has_sender && cmp_sid_t(&c->sender, &m->sender) == 0))
continue;
if (c->is_recipient_set && !(m->has_recipient && cmp_sid_t(&c->recipient, &m->recipient) == 0))
continue;
assert(c->_rowid_current != 0);
// Don't do rhizome_verify_author(m); too CPU expensive for a listing. Save that for when
// the bundle is extracted or exported.
@ -1573,10 +1564,10 @@ int rhizome_list_next(struct rhizome_list_cursor *c)
void rhizome_list_commit(struct rhizome_list_cursor *c)
{
DEBUGF(rhizome, "c=%p c->rowid_since=%"PRIu64" c->_rowid_current=%"PRIu64" c->_rowid_last=%"PRIu64,
c, c->rowid_since, c->_rowid_current, c->_rowid_last);
DEBUGF(rhizome, "c=%p c->oldest_first=%d c->_rowid_current=%"PRIu64" c->_rowid_last=%"PRIu64,
c, c->oldest_first, c->_rowid_current, c->_rowid_last);
assert(c->_rowid_current != 0);
if (c->_rowid_last == 0 || (c->rowid_since ? c->_rowid_current > c->_rowid_last : c->_rowid_current < c->_rowid_last))
if (c->_rowid_last == 0 || (c->oldest_first ? c->_rowid_current > c->_rowid_last : c->_rowid_current < c->_rowid_last))
c->_rowid_last = c->_rowid_current;
}

View File

@ -62,11 +62,16 @@ static char *list_token_to_str(char *buf, uint64_t rowid)
static int strn_to_list_token(const char *str, uint64_t *rowidp, const char **afterp)
{
unsigned char token[sizeof rhizome_db_uuid.u.binary + sizeof *rowidp];
if (base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL) != sizeof token)
return 0;
if (cmp_uuid_t(&rhizome_db_uuid, (serval_uuid_t *) &token) != 0)
return 0;
memcpy(rowidp, token + sizeof rhizome_db_uuid.u.binary, sizeof *rowidp);
if (base64url_decode(token, sizeof token, str, 0, afterp, 0, NULL) == sizeof token
&& cmp_uuid_t(&rhizome_db_uuid, (serval_uuid_t *) &token) == 0
&& **afterp=='/'){
memcpy(rowidp, token + sizeof rhizome_db_uuid.u.binary, sizeof *rowidp);
(*afterp)++;
}else{
// don't skip the token
*afterp=str;
*rowidp=1;
}
return 1;
}
@ -210,7 +215,7 @@ static int restful_rhizome_newsince(httpd_request *r, const char *remainder)
return ret;
uint64_t rowid;
const char *end = NULL;
if (!strn_to_list_token(remainder, &rowid, &end) || strcmp(end, "/bundlelist.json") != 0)
if (!strn_to_list_token(remainder, &rowid, &end) || strcmp(end, "bundlelist.json") != 0)
return 404;
if (r->http.verb != HTTP_VERB_GET)
return 405;
@ -218,6 +223,7 @@ static int restful_rhizome_newsince(httpd_request *r, const char *remainder)
r->u.rhlist.rowcount = 0;
bzero(&r->u.rhlist.cursor, sizeof r->u.rhlist.cursor);
r->u.rhlist.cursor.rowid_since = rowid;
r->u.rhlist.cursor.oldest_first = 1;
r->u.rhlist.end_time = gettime_ms() + config.api.restful.newsince_timeout * 1000;
r->trigger_rhizome_bundle_added = on_rhizome_bundle_added;
http_request_response_generated(&r->http, 200, CONTENT_TYPE_JSON, restful_rhizome_bundlelist_json_content);
@ -269,7 +275,7 @@ static int restful_rhizome_bundlelist_json_content_chunk(struct http_request *hr
return -1;
if (ret == 0) {
time_ms_t now;
if (r->u.rhlist.cursor.rowid_since == 0 || (now = gettime_ms()) >= r->u.rhlist.end_time) {
if (r->u.rhlist.cursor.oldest_first == 0 || (now = gettime_ms()) >= r->u.rhlist.end_time) {
r->u.rhlist.phase = LIST_END;
return 1;
}