Improve Rhizome list cursor

Remove 'rowcount' element

Order by descending ROWID, which is functionally the same as descending
inserttime but more reliable

Replace '_offset' cursor element with rowid first-last range to record
the expanding window of rows already returned, which allows release and
re-open of cursor mid-listing without missing rows or producing duplicates
This commit is contained in:
Andrew Bettison 2013-11-07 17:22:06 +10:30
parent 47988bfe0f
commit 051eca4775
3 changed files with 99 additions and 80 deletions

View File

@ -1871,40 +1871,45 @@ int app_rhizome_list(const struct cli_parsed *parsed, struct cli_context *contex
"name"
};
cli_columns(context, NELS(headers), headers);
while (rhizome_list_next(&retry, &cursor) == 1) {
rhizome_manifest *m = cursor.manifest;
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (cursor.rowcount < rowoffset)
size_t rowcount = 0;
int n;
while ((n = rhizome_list_next(&retry, &cursor)) == 1) {
++rowcount;
if (rowcount <= rowoffset)
continue;
if (rowlimit != 0 && cursor.rowcount > rowlimit)
break;
rhizome_lookup_author(m);
cli_put_long(context, cursor.rowid, ":");
cli_put_string(context, m->service, ":");
cli_put_hexvalue(context, m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary, ":");
cli_put_long(context, m->version, ":");
cli_put_long(context, m->has_date ? m->date : 0, ":");
cli_put_long(context, m->inserttime, ":");
switch (m->authorship) {
case AUTHOR_LOCAL:
case AUTHOR_AUTHENTIC:
cli_put_hexvalue(context, m->author.binary, sizeof m->author.binary, ":");
cli_put_long(context, 1, ":");
break;
default:
cli_put_string(context, NULL, ":");
cli_put_long(context, 0, ":");
break;
if (rowlimit == 0 || rowcount <= rowoffset + rowlimit) {
rhizome_manifest *m = cursor.manifest;
assert(m->filesize != RHIZOME_SIZE_UNSET);
rhizome_lookup_author(m);
cli_put_long(context, cursor.rowid, ":");
cli_put_string(context, m->service, ":");
cli_put_hexvalue(context, m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary, ":");
cli_put_long(context, m->version, ":");
cli_put_long(context, m->has_date ? m->date : 0, ":");
cli_put_long(context, m->inserttime, ":");
switch (m->authorship) {
case AUTHOR_LOCAL:
case AUTHOR_AUTHENTIC:
cli_put_hexvalue(context, m->author.binary, sizeof m->author.binary, ":");
cli_put_long(context, 1, ":");
break;
default:
cli_put_string(context, NULL, ":");
cli_put_long(context, 0, ":");
break;
}
cli_put_long(context, m->filesize, ":");
cli_put_hexvalue(context, m->filesize ? m->filehash.binary : NULL, sizeof m->filehash.binary, ":");
cli_put_hexvalue(context, m->has_sender ? m->sender.binary : NULL, sizeof m->sender.binary, ":");
cli_put_hexvalue(context, m->has_recipient ? m->recipient.binary : NULL, sizeof m->recipient.binary, ":");
cli_put_string(context, m->name, "\n");
}
cli_put_long(context, m->filesize, ":");
cli_put_hexvalue(context, m->filesize ? m->filehash.binary : NULL, sizeof m->filehash.binary, ":");
cli_put_hexvalue(context, m->has_sender ? m->sender.binary : NULL, sizeof m->sender.binary, ":");
cli_put_hexvalue(context, m->has_recipient ? m->recipient.binary : NULL, sizeof m->recipient.binary, ":");
cli_put_string(context, m->name, "\n");
}
rhizome_list_release(&cursor);
cli_row_count(context, cursor.rowcount);
keyring_free(keyring);
if (n == -1)
return -1;
cli_row_count(context, rowcount);
return 0;
}

View File

@ -623,10 +623,10 @@ struct rhizome_list_cursor {
// Set by calling the next() function.
int64_t rowid;
rhizome_manifest *manifest;
size_t rowcount;
// Private state.
sqlite3_stmt *_statement;
unsigned _offset;
int64_t _rowid_first;
int64_t _rowid_last;
};
int rhizome_list_open(sqlite_retry_state *, struct rhizome_list_cursor *);

View File

@ -1298,6 +1298,10 @@ int rhizome_store_bundle(rhizome_manifest *m)
time_ms_t now = gettime_ms();
// The INSERT OR REPLACE statement will delete a row with the same ID (primary key) if it exists,
// so a new autoincremented ROWID will be allocated whether or not the manifest with this ID is
// already in the table. Other code depends on this property: that ROWID is monotonically
// increasing with time and unique.
sqlite3_stmt *stmt;
if ((stmt = sqlite_prepare_bind(&retry,
"INSERT OR REPLACE INTO MANIFESTS("
@ -1412,70 +1416,81 @@ rollback:
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_list_open(sqlite_retry_state *retry, struct rhizome_list_cursor *cursor)
int rhizome_list_open(sqlite_retry_state *retry, struct rhizome_list_cursor *c)
{
IN();
strbuf b = strbuf_alloca(1024);
strbuf_sprintf(b, "SELECT id, manifest, version, inserttime, author, rowid FROM manifests WHERE 1=1");
if (cursor->service)
if (c->service)
strbuf_puts(b, " AND service = @service");
if (cursor->name)
if (c->name)
strbuf_puts(b, " AND name like @name");
if (cursor->is_sender_set)
if (c->is_sender_set)
strbuf_puts(b, " AND sender = @sender");
if (cursor->is_recipient_set)
if (c->is_recipient_set)
strbuf_puts(b, " AND recipient = @recipient");
strbuf_puts(b, " ORDER BY inserttime DESC LIMIT -1 OFFSET @offset");
if (c->_rowid_first) {
assert(c->_rowid_last);
assert(c->_rowid_last <= c->_rowid_first);
strbuf_puts(b, " AND (rowid > @first OR rowid < @last)");
}
strbuf_puts(b, " ORDER BY rowid DESC"); // most recent first
if (strbuf_overrun(b))
RETURN(WHYF("SQL command too long: %s", strbuf_str(b)));
cursor->_statement = sqlite_prepare(retry, strbuf_str(b));
if (cursor->_statement == NULL)
c->_statement = sqlite_prepare(retry, strbuf_str(b));
if (c->_statement == NULL)
RETURN(-1);
if (sqlite_bind(retry, cursor->_statement, NAMED|INT, "@offset", cursor->_offset, END) == -1)
if (c->service && sqlite_bind(retry, c->_statement, NAMED|STATIC_TEXT, "@service", c->service, END) == -1)
goto failure;
if (cursor->service && sqlite_bind(retry, cursor->_statement, NAMED|STATIC_TEXT, "@service", cursor->service, END) == -1)
if (c->name && sqlite_bind(retry, c->_statement, NAMED|STATIC_TEXT, "@name", c->name, END) == -1)
goto failure;
if (cursor->name && sqlite_bind(retry, cursor->_statement, NAMED|STATIC_TEXT, "@name", cursor->name, END) == -1)
if (c->is_sender_set && sqlite_bind(retry, c->_statement, NAMED|SID_T, "@sender", &c->sender, END) == -1)
goto failure;
if (cursor->is_sender_set && sqlite_bind(retry, cursor->_statement, NAMED|SID_T, "@sender", &cursor->sender, END) == -1)
if (c->is_recipient_set && sqlite_bind(retry, c->_statement, NAMED|SID_T, "@recipient", &c->recipient, END) == -1)
goto failure;
if (cursor->is_recipient_set && sqlite_bind(retry, cursor->_statement, NAMED|SID_T, "@recipient", &cursor->recipient, END) == -1)
if ( c->_rowid_first
&& sqlite_bind(retry, c->_statement, NAMED|INT64, "@first", c->_rowid_first,
NAMED|INT64, "@last", c->_rowid_last, END) == -1
)
goto failure;
cursor->manifest = NULL;
c->manifest = NULL;
RETURN(0);
OUT();
failure:
sqlite3_finalize(cursor->_statement);
cursor->_statement = NULL;
sqlite3_finalize(c->_statement);
c->_statement = NULL;
RETURN(-1);
OUT();
}
int rhizome_list_next(sqlite_retry_state *retry, struct rhizome_list_cursor *cursor)
int rhizome_list_next(sqlite_retry_state *retry, struct rhizome_list_cursor *c)
{
IN();
if (cursor->_statement == NULL && rhizome_list_open(retry, cursor) == -1)
if (c->_statement == NULL && rhizome_list_open(retry, c) == -1)
RETURN(-1);
while (sqlite_step_retry(retry, cursor->_statement) == SQLITE_ROW) {
++cursor->_offset;
if (cursor->manifest) {
rhizome_manifest_free(cursor->manifest);
cursor->manifest = NULL;
while (sqlite_step_retry(retry, c->_statement) == SQLITE_ROW) {
if (c->manifest) {
rhizome_manifest_free(c->manifest);
c->manifest = NULL;
}
assert(sqlite3_column_count(cursor->_statement) == 6);
assert(sqlite3_column_type(cursor->_statement, 0) == SQLITE_TEXT);
assert(sqlite3_column_type(cursor->_statement, 1) == SQLITE_BLOB);
assert(sqlite3_column_type(cursor->_statement, 2) == SQLITE_INTEGER);
assert(sqlite3_column_type(cursor->_statement, 3) == SQLITE_INTEGER);
assert(sqlite3_column_type(cursor->_statement, 4) == SQLITE_TEXT || sqlite3_column_type(cursor->_statement, 4) == SQLITE_NULL);
assert(sqlite3_column_type(cursor->_statement, 5) == SQLITE_INTEGER);
const char *q_manifestid = (const char *) sqlite3_column_text(cursor->_statement, 0);
const char *manifestblob = (char *) sqlite3_column_blob(cursor->_statement, 1);
size_t manifestblobsize = sqlite3_column_bytes(cursor->_statement, 1); // must call after sqlite3_column_blob()
int64_t q_version = sqlite3_column_int64(cursor->_statement, 2);
int64_t q_inserttime = sqlite3_column_int64(cursor->_statement, 3);
const char *q_author = (const char *) sqlite3_column_text(cursor->_statement, 4);
cursor->rowid = sqlite3_column_int64(cursor->_statement, 5);
assert(sqlite3_column_count(c->_statement) == 6);
assert(sqlite3_column_type(c->_statement, 0) == SQLITE_TEXT);
assert(sqlite3_column_type(c->_statement, 1) == SQLITE_BLOB);
assert(sqlite3_column_type(c->_statement, 2) == SQLITE_INTEGER);
assert(sqlite3_column_type(c->_statement, 3) == SQLITE_INTEGER);
assert(sqlite3_column_type(c->_statement, 4) == SQLITE_TEXT || sqlite3_column_type(c->_statement, 4) == SQLITE_NULL);
assert(sqlite3_column_type(c->_statement, 5) == SQLITE_INTEGER);
const char *q_manifestid = (const char *) sqlite3_column_text(c->_statement, 0);
const char *manifestblob = (char *) sqlite3_column_blob(c->_statement, 1);
size_t manifestblobsize = sqlite3_column_bytes(c->_statement, 1); // must call after sqlite3_column_blob()
int64_t q_version = sqlite3_column_int64(c->_statement, 2);
int64_t q_inserttime = sqlite3_column_int64(c->_statement, 3);
const char *q_author = (const char *) sqlite3_column_text(c->_statement, 4);
c->rowid = sqlite3_column_int64(c->_statement, 5);
if (c->rowid > c->_rowid_first)
c->_rowid_first = c->rowid;
if (c->_rowid_last == 0 || c->rowid < c->_rowid_last)
c->_rowid_last = c->rowid;
sid_t *author = NULL;
if (q_author) {
author = alloca(sizeof *author);
@ -1484,7 +1499,7 @@ int rhizome_list_next(sqlite_retry_state *retry, struct rhizome_list_cursor *cur
continue;
}
}
rhizome_manifest *m = cursor->manifest = rhizome_new_manifest();
rhizome_manifest *m = c->manifest = rhizome_new_manifest();
if (m == NULL)
RETURN(-1);
if (rhizome_read_manifest_file(m, manifestblob, manifestblobsize) == -1) {
@ -1499,30 +1514,29 @@ int rhizome_list_next(sqlite_retry_state *retry, struct rhizome_list_cursor *cur
if (author)
rhizome_manifest_set_author(m, author);
rhizome_manifest_set_inserttime(m, q_inserttime);
if (cursor->service && !(m->service && strcasecmp(cursor->service, m->service) == 0))
if (c->service && !(m->service && strcasecmp(c->service, m->service) == 0))
continue;
if (cursor->is_sender_set && !(m->has_sender && cmp_sid_t(&cursor->sender, &m->sender) == 0))
if (c->is_sender_set && !(m->has_sender && cmp_sid_t(&c->sender, &m->sender) == 0))
continue;
if (cursor->is_recipient_set && !(m->has_recipient && cmp_sid_t(&cursor->recipient, &m->recipient) == 0))
if (c->is_recipient_set && !(m->has_recipient && cmp_sid_t(&c->recipient, &m->recipient) == 0))
continue;
// Don't do rhizome_verify_author(m); too CPU expensive for a listing. Save that for when
// the bundle is extracted or exported.
++cursor->rowcount;
RETURN(1);
}
RETURN(0);
OUT();
}
void rhizome_list_release(struct rhizome_list_cursor *cursor)
void rhizome_list_release(struct rhizome_list_cursor *c)
{
if (cursor->manifest) {
rhizome_manifest_free(cursor->manifest);
cursor->manifest = NULL;
if (c->manifest) {
rhizome_manifest_free(c->manifest);
c->manifest = NULL;
}
if (cursor->_statement) {
sqlite3_finalize(cursor->_statement);
cursor->_statement = NULL;
if (c->_statement) {
sqlite3_finalize(c->_statement);
c->_statement = NULL;
}
}