Refactor rhizome storage methods to return database busy status, and pause rhizome sync

This commit is contained in:
Jeremy Lakeman 2017-03-06 15:56:58 +10:30
parent 2540c9e633
commit 585e573ecf
9 changed files with 374 additions and 282 deletions

View File

@ -371,8 +371,7 @@ error:
}
/* Import a bundle from a pair of files, one containing the manifest and the optional other
* containing the payload. The work is all done by rhizome_bundle_import() and
* rhizome_store_manifest().
* containing the payload.
*/
enum rhizome_bundle_status rhizome_bundle_import_files(rhizome_manifest *m, rhizome_manifest **mout, const char *manifest_path, const char *filepath, int zip_comment)
{
@ -532,8 +531,7 @@ enum rhizome_bundle_status rhizome_bundle_import_files(rhizome_manifest *m, rhiz
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
case RHIZOME_PAYLOAD_STATUS_NEW:
if (rhizome_store_manifest(m) == -1)
ret = -1;
ret = rhizome_add_manifest_to_store(m, mout);
break;
case RHIZOME_PAYLOAD_STATUS_TOO_BIG:
case RHIZOME_PAYLOAD_STATUS_EVICTED:
@ -613,49 +611,6 @@ enum rhizome_bundle_status rhizome_manifest_check_stored(rhizome_manifest *m, rh
return result;
}
/* Insert the manifest 'm' into the Rhizome store. This function encapsulates all the invariants
* that a manifest must satisfy before it is allowed into the store, so it is used by both the sync
* protocol and the application layer.
*
* - If the manifest is not valid then returns RHIZOME_BUNDLE_STATUS_INVALID. A valid manifest is
* one with all the core (transport) fields present and consistent ('id', 'version', 'filesize',
* 'filehash', 'tail'), all mandatory application fields present and consistent ('service',
* 'date') and any other service-dependent mandatory fields present (eg, 'sender', 'recipient').
*
* - If the manifest's signature does not verify, then returns RHIZOME_BUNDLE_STATUS_FAKE.
*
* - If the manifest has a payload (filesize != 0) but the payload is not present in the store
* (filehash), then returns an internal error RHIZOME_BUNDLE_STATUS_ERROR (-1).
*
* - If the store will not accept the manifest because there is already the same or a newer
* manifest in the store, then returns RHIZOME_BUNDLE_STATUS_SAME or RHIZOME_BUNDLE_STATUS_OLD.
*
* This function then attempts to store the manifest. If this fails due to an internal error,
* then returns RHIZOME_BUNDLE_STATUS_ERROR (-1), otherwise returns RHIZOME_BUNDLE_STATUS_NEW to
* indicate that the manifest was successfully stored.
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
enum rhizome_bundle_status rhizome_add_manifest_to_store(rhizome_manifest *m, rhizome_manifest **mout)
{
if (mout == NULL)
DEBUGF(rhizome, "%s(m=manifest %p, mout=NULL)", __func__, m);
else
DEBUGF(rhizome, "%s(m=manifest %p, *mout=manifest %p)", __func__, m, *mout);
if (!m->finalised && !rhizome_manifest_validate(m))
return RHIZOME_BUNDLE_STATUS_INVALID;
assert(m->finalised);
if (!m->selfSigned && !rhizome_manifest_verify(m))
return RHIZOME_BUNDLE_STATUS_FAKE;
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (m->filesize > 0 && !rhizome_exists(&m->filehash))
return WHY("Payload has not been stored");
enum rhizome_bundle_status status = rhizome_manifest_check_stored(m, mout);
if (status == RHIZOME_BUNDLE_STATUS_NEW && rhizome_store_manifest(m) == -1)
status = RHIZOME_BUNDLE_STATUS_ERROR;
return status;
}
/* When voice traffic is being carried, we need to throttle Rhizome down
to a more sensible level. Or possibly even supress it entirely.
*/
@ -679,7 +634,7 @@ const char *rhizome_bundle_status_message(enum rhizome_bundle_status status)
case RHIZOME_BUNDLE_STATUS_INCONSISTENT: return "Manifest inconsistent with supplied payload";
case RHIZOME_BUNDLE_STATUS_NO_ROOM: return "No room in store for bundle";
case RHIZOME_BUNDLE_STATUS_READONLY: return "Bundle is read-only";
case RHIZOME_BUNDLE_STATUS_BUSY: return "Internal error";
case RHIZOME_BUNDLE_STATUS_BUSY: return "Database is busy";
case RHIZOME_BUNDLE_STATUS_ERROR: return "Internal error";
case RHIZOME_BUNDLE_STATUS_MANIFEST_TOO_BIG: return "Manifest too big";
}

View File

@ -470,7 +470,6 @@ void _rhizome_manifest_free(struct __sourceloc, rhizome_manifest *m);
rhizome_manifest *_rhizome_new_manifest(struct __sourceloc);
#define rhizome_new_manifest() _rhizome_new_manifest(__WHENCE__)
int rhizome_store_manifest(rhizome_manifest *m);
int rhizome_store_file(rhizome_manifest *m,const unsigned char *key);
int rhizome_parse_field_assignments(struct rhizome_manifest_field_assignment *fields, unsigned argc, const char *const *args);
@ -552,11 +551,11 @@ sqlite3_stmt *_sqlite_prepare_bind(struct __sourceloc, int log_level, sqlite_ret
int _sqlite_retry(struct __sourceloc, sqlite_retry_state *retry, const char *action);
void _sqlite_retry_done(struct __sourceloc, sqlite_retry_state *retry, const char *action);
int _sqlite_step(struct __sourceloc, int log_level, sqlite_retry_state *retry, sqlite3_stmt *statement);
int _sqlite_exec_code(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, sqlite3_stmt *statement, int *rowcount);
int _sqlite_exec(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, sqlite3_stmt *statement);
int _sqlite_exec_void(struct __sourceloc, int log_level, const char *sqltext, ...);
int _sqlite_exec_void_retry(struct __sourceloc, int log_level, sqlite_retry_state *retry, const char *sqltext, ...);
int _sqlite_exec_changes_retry(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, int *rowcount, int *changes, const char *sqltext, ...);
int _sqlite_exec_uint64(struct __sourceloc, uint64_t *result, const char *sqltext, ...);
int _sqlite_exec_uint64_retry(struct __sourceloc, sqlite_retry_state *retry, uint64_t *result, const char *sqltext, ...);
int _sqlite_exec_strbuf(struct __sourceloc, strbuf sb, const char *sqltext, ...);
int _sqlite_exec_strbuf_retry(struct __sourceloc, sqlite_retry_state *retry, strbuf sb, const char *sqltext, ...);
@ -598,6 +597,8 @@ int _sqlite_blob_close(struct __sourceloc, int log_level, sqlite3_blob *blob);
#define sqlite_exec(stmt) _sqlite_exec(__WHENCE__, LOG_LEVEL_ERROR, NULL, (stmt))
#define sqlite_exec_retry(rs,stmt) _sqlite_exec(__WHENCE__, LOG_LEVEL_ERROR, (rs), (stmt))
#define sqlite_exec_retry_loglevel(ll,rs,stmt) _sqlite_exec(__WHENCE__, (ll), (rs), (stmt))
#define sqlite_exec_code(stmt, rowcount) _sqlite_exec_code(__WHENCE__, LOG_LEVEL_ERROR, NULL, (stmt), (rowcount))
#define sqlite_exec_code_retry(rs, stmt, rowcount) _sqlite_exec_code(__WHENCE__, LOG_LEVEL_ERROR, (rs), (stmt), (rowcount))
#define sqlite_step(stmt) _sqlite_step(__WHENCE__, LOG_LEVEL_ERROR, NULL, (stmt))
#define sqlite_step_retry(rs,stmt) _sqlite_step(__WHENCE__, LOG_LEVEL_ERROR, (rs), (stmt))
#define sqlite_exec_void(sql,arg,...) _sqlite_exec_void(__WHENCE__, LOG_LEVEL_ERROR, (sql), arg, ##__VA_ARGS__)
@ -606,7 +607,6 @@ int _sqlite_blob_close(struct __sourceloc, int log_level, sqlite3_blob *blob);
#define sqlite_exec_changes_retry(rs,ROWS,CHANGES,sql,arg,...) _sqlite_exec_changes_retry(__WHENCE__, LOG_LEVEL_ERROR, (rs), (ROWS), (CHANGES), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_changes_retry_loglevel(ll,rs,ROWS,CHANGES,sql,arg,...) _sqlite_exec_changes_retry(__WHENCE__, (ll), (rs), (ROWS), (CHANGES), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_void_retry_loglevel(ll,rs,sql,arg,...) _sqlite_exec_void_retry(__WHENCE__, (ll), (rs), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_uint64(res,sql,arg,...) _sqlite_exec_uint64(__WHENCE__, (res), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_uint64_retry(rs,res,sql,arg,...) _sqlite_exec_uint64_retry(__WHENCE__, (rs), (res), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_strbuf(sb,sql,arg,...) _sqlite_exec_strbuf(__WHENCE__, (sb), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_strbuf_retry(rs,sb,sql,arg,...) _sqlite_exec_strbuf_retry(__WHENCE__, (rs), (sb), (sql), arg, ##__VA_ARGS__)
@ -619,8 +619,8 @@ double rhizome_manifest_get_double(rhizome_manifest *m,char *var,double default_
int rhizome_manifest_extract_signature(rhizome_manifest *m, unsigned *ofs);
enum rhizome_bundle_status rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found);
int rhizome_manifest_to_bar(rhizome_manifest *m, rhizome_bar_t *bar);
int rhizome_is_bar_interesting(const rhizome_bar_t *bar);
int rhizome_is_manifest_interesting(rhizome_manifest *m);
enum rhizome_bundle_status rhizome_is_bar_interesting(const rhizome_bar_t *bar);
enum rhizome_bundle_status rhizome_is_manifest_interesting(rhizome_manifest *m);
enum rhizome_bundle_status rhizome_retrieve_manifest(const rhizome_bid_t *bid, rhizome_manifest *m);
enum rhizome_bundle_status rhizome_retrieve_manifest_by_prefix(const unsigned char *prefix, unsigned prefix_len, rhizome_manifest *m);
enum rhizome_bundle_status rhizome_retrieve_manifest_by_hash_prefix(const uint8_t *prefix, unsigned prefix_len, rhizome_manifest *m);
@ -865,7 +865,7 @@ int rhizome_fetch_has_queue_space(unsigned char log2_size);
/* Rhizome storage methods */
int rhizome_exists(const rhizome_filehash_t *hashp);
enum rhizome_payload_status rhizome_exists(const rhizome_filehash_t *hashp);
enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length);
int rhizome_write_buffer(struct rhizome_write *write_state, uint8_t *buffer, size_t data_size);
int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uint8_t *buffer, size_t data_size);

View File

@ -648,8 +648,6 @@ static int app_rhizome_export_file(const struct cli_parsed *parsed, struct cli_c
return -1;
if (rhizome_opendb() == -1)
return -1;
if (!rhizome_exists(&hash))
return 1;
uint64_t length;
enum rhizome_payload_status pstatus = rhizome_dump_file(&hash, filepath, &length);
switch (pstatus) {
@ -658,6 +656,7 @@ static int app_rhizome_export_file(const struct cli_parsed *parsed, struct cli_c
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
return 1; // payload not found
case RHIZOME_PAYLOAD_STATUS_BUSY:
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:

View File

@ -134,10 +134,8 @@ void sqlite_log(void *UNUSED(ignored), int result, const char *msg)
void verify_bundles()
{
// assume that only the manifest itself can be trusted
// fetch all manifests and reinsert them.
// fetch all manifests, parse and update or delete them.
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
// This cursor must be ordered descending as re-inserting the manifests will give them a new higher manifest id.
// If we didn't, we'd get stuck in an infinite loop.
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT ROWID, MANIFEST FROM MANIFESTS ORDER BY ROWID DESC;");
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
sqlite3_int64 rowid = sqlite3_column_int64(statement, 0);
@ -153,9 +151,46 @@ void verify_bundles()
&& rhizome_manifest_verify(m)
) {
assert(m->finalised);
// Store it again, to ensure that MANIFESTS columns are up to date.
ret = rhizome_store_manifest(m);
if (m->filesize == 0 || rhizome_exists(&m->filehash) == RHIZOME_PAYLOAD_STATUS_STORED){
// Attempt to update the manifest
rhizome_bar_t bar;
rhizome_manifest_to_bar(m, &bar);
rhizome_authenticate_author(m);
if (sqlite_exec_void("UPDATE MANIFESTS SET"
"id = ?,"
"version = ?,"
"bar = ?,"
"filesize = ?,"
"filehash = ?,"
"author = ?,"
"service = ?,"
"name = ?,"
"sender = ?,"
"recipient = ?,"
"tail = ?,"
"manifest_hash = ?"
"WHERE ROWID = ?;",
RHIZOME_BID_T, &m->keypair.public_key,
INT64, m->version,
RHIZOME_BAR_T, &bar,
INT64, m->filesize,
RHIZOME_FILEHASH_T|NUL, m->filesize > 0 ? &m->filehash : NULL,
SID_T|NUL, m->authorship == AUTHOR_AUTHENTIC ? &m->author : NULL,
STATIC_TEXT, m->service,
STATIC_TEXT|NUL, m->name,
SID_T|NUL, m->has_sender ? &m->sender : NULL,
SID_T|NUL, m->has_recipient ? &m->recipient : NULL,
INT64, m->tail,
RHIZOME_FILEHASH_T, &m->manifesthash,
INT64, rowid,
END
)!=-1)
ret = 0;
}
}
if (ret) {
DEBUGF(rhizome, "Removing invalid manifest entry @%lld", rowid);
sqlite_exec_void_retry(&retry, "DELETE FROM MANIFESTS WHERE ROWID = ?;", INT64, rowid, END);
@ -249,7 +284,7 @@ int rhizome_opendb()
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
uint64_t version;
if (sqlite_exec_uint64_retry(&retry, &version, "PRAGMA user_version;", END) == -1)
if (sqlite_exec_uint64_retry(&retry, &version, "PRAGMA user_version;", END) != SQLITE_ROW)
RETURN(-1);
if (version<1){
@ -1040,47 +1075,34 @@ static int _sqlite_vexec_uint64(struct __sourceloc __whence, sqlite_retry_state
return -1;
if (_sqlite_vbind(__whence, LOG_LEVEL_ERROR, retry, statement, ap) == -1)
return -1;
int ret = 0;
int rowcount = 0;
int rows = 0;
int stepcode;
while ((stepcode = _sqlite_step(__whence, LOG_LEVEL_ERROR, retry, statement)) == SQLITE_ROW) {
int columncount = sqlite3_column_count(statement);
if (columncount != 1)
ret = WHYF("incorrect column count %d (should be 1): %s", columncount, sqlite3_sql(statement));
else if (++rowcount == 1)
FATALF("incorrect column count %d (should be 1): %s", columncount, sqlite3_sql(statement));
else if (++rows == 1)
*result = sqlite3_column_int64(statement, 0);
}
if (rowcount > 1)
WARNF("query unexpectedly returned %d rows, ignored all but first", rowcount);
if (rows > 1)
FATALF("query unexpectedly returned %d rows", rows);
sqlite3_finalize(statement);
if (!sqlite_code_ok(stepcode) || ret == -1)
return -1;
if (sqlite_trace_func())
_DEBUGF("rowcount=%d changes=%d result=%"PRIu64, rowcount, sqlite3_changes(rhizome_db), *result);
return rowcount;
_DEBUGF("rowcount=%d changes=%d result=%"PRIu64, rows, sqlite3_changes(rhizome_db), *result);
if (sqlite_code_ok(stepcode) && rows>0)
return SQLITE_ROW;
return stepcode;
}
/*
* Convenience wrapper for executing an SQL command that returns a single int64 value.
* Logs an error and returns -1 if an error occurs.
* If no row is found, then returns 0 and does not alter *result.
* If exactly one row is found, the assigns its value to *result and returns 1.
* If more than one row is found, then logs a warning, assigns the value of the first row to *result
* and returns the number of rows.
* If no row is found, then returns SQLITE_OK / SQLITE_DONE and does not alter *result.
* If exactly one row is found, the assigns its value to *result and returns SQLITE_ROW.
* Otherwise an SQLITE_ stepcode will be returned
* If more than one row is found, or the query returns more that one column then this function asserts
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int _sqlite_exec_uint64(struct __sourceloc __whence, uint64_t *result, const char *sqlformat,...)
{
va_list ap;
va_start(ap, sqlformat);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
int ret = _sqlite_vexec_uint64(__whence, &retry, result, sqlformat, ap);
va_end(ap);
return ret;
}
/* Same as sqlite_exec_uint64() but if the statement cannot be executed because the database is
* if the statement cannot be executed because the database is
* currently locked for updates, then will call sqlite_retry() on the supplied retry state variable
* instead of its own, internal one. If 'retry' is passed as NULL, then will not sleep and retry at
* all in the event of a busy condition, but will log it as an error and return immediately.
@ -1355,40 +1377,63 @@ end:
mdp_close(mdpsock);
}
/*
Store the specified manifest into the sqlite database.
We assume that sufficient space has been made for us.
The manifest should be finalised, and so we don't need to
look at the underlying manifest file, but can just write m->manifest_data
as a blob.
associated_filename needs to be read in and stored as a blob. Hopefully that
can be done in pieces so that we don't have memory exhaustion issues on small
architectures. However, we do know it's hash apriori from m, and so we can
skip loading the file in if it is already stored. mmap() apparently works on
Linux FAT file systems, and is probably the best choice since it doesn't need
all pages to be in RAM at the same time.
SQLite does allow modifying of blobs once stored in the database.
The trick is to insert the blob as all zeroes using a special function, and then
substitute bytes in the blog progressively.
We need to also need to create the appropriate row(s) in the MANIFESTS, FILES tables.
/* Insert the manifest 'm' into the Rhizome store. This function encapsulates all the invariants
* that a manifest must satisfy before it is allowed into the store, so it is used by both the sync
* protocol and the application layer.
*
* - If the manifest is not valid then returns RHIZOME_BUNDLE_STATUS_INVALID. A valid manifest is
* one with all the core (transport) fields present and consistent ('id', 'version', 'filesize',
* 'filehash', 'tail'), all mandatory application fields present and consistent ('service',
* 'date') and any other service-dependent mandatory fields present (eg, 'sender', 'recipient').
*
* - If the manifest's signature does not verify, then returns RHIZOME_BUNDLE_STATUS_FAKE.
*
* - If the manifest has a payload (filesize != 0) but the payload is not present in the store
* (filehash), then returns an internal error RHIZOME_BUNDLE_STATUS_ERROR (-1).
*
* - If the store will not accept the manifest because there is already the same or a newer
* manifest in the store, then returns RHIZOME_BUNDLE_STATUS_SAME or RHIZOME_BUNDLE_STATUS_OLD.
*
* This function then attempts to store the manifest. If this fails due to an internal error,
* then returns RHIZOME_BUNDLE_STATUS_ERROR (-1), otherwise returns RHIZOME_BUNDLE_STATUS_NEW to
* indicate that the manifest was successfully stored.
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_store_manifest(rhizome_manifest *m)
enum rhizome_bundle_status rhizome_add_manifest_to_store(rhizome_manifest *m, rhizome_manifest **mout)
{
if (mout == NULL)
DEBUGF(rhizome, "%s(m=manifest %p, mout=NULL)", __func__, m);
else
DEBUGF(rhizome, "%s(m=manifest %p, *mout=manifest %p)", __func__, m, *mout);
if (!m->finalised && !rhizome_manifest_validate(m))
return RHIZOME_BUNDLE_STATUS_INVALID;
assert(m->finalised);
assert(m->haveSecret || m->selfSigned); // should not store an invalid or fake manifest
if (!m->selfSigned && !rhizome_manifest_verify(m))
return RHIZOME_BUNDLE_STATUS_FAKE;
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (m->filesize > 0){
switch (rhizome_exists(&m->filehash)){
case RHIZOME_PAYLOAD_STATUS_BUSY:
return RHIZOME_BUNDLE_STATUS_BUSY;
case RHIZOME_PAYLOAD_STATUS_STORED:
break;
default:
return WHY("Payload has not been stored");
}
}
enum rhizome_bundle_status status = rhizome_manifest_check_stored(m, mout);
if (status != RHIZOME_BUNDLE_STATUS_NEW)
return status;
// manifest is complete, and not already stored
/* Bind BAR to data field */
rhizome_bar_t bar;
rhizome_manifest_to_bar(m, &bar);
/* Store the file (but not if it is already in the database) */
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (m->filesize > 0 && !rhizome_exists(&m->filehash))
return WHY("File should already be stored by now");
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
return WHY("Failed to begin transaction");
@ -1464,14 +1509,15 @@ int rhizome_store_manifest(rhizome_manifest *m)
}else{
sync_rhizome();
}
return 0;
return RHIZOME_BUNDLE_STATUS_NEW;
}
rollback:
if (stmt)
sqlite3_finalize(stmt);
WHYF("Failed to store bundle bid=%s", alloca_tohex_rhizome_bid_t(m->keypair.public_key));
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
return -1;
return RHIZOME_BUNDLE_STATUS_ERROR;
}
static void trigger_rhizome_bundle_added_debug(rhizome_manifest *m)
@ -1988,7 +2034,7 @@ int rhizome_delete_manifest(const rhizome_bid_t *bidp)
return rhizome_delete_manifest_retry(&retry, bidp);
}
static int is_interesting(const char *id_hex, uint64_t version)
static enum rhizome_bundle_status is_interesting(const char *id_hex, uint64_t version)
{
IN();
@ -2000,29 +2046,50 @@ static int is_interesting(const char *id_hex, uint64_t version)
INT64, version,
END);
if (!statement)
RETURN(-1);
int ret=1;
int r = sqlite_step_retry(&retry, statement);
if (r == SQLITE_ROW){
RETURN(RHIZOME_BUNDLE_STATUS_ERROR);
enum rhizome_bundle_status status = RHIZOME_BUNDLE_STATUS_ERROR;
int stepcode;
if ((stepcode = sqlite_step_retry(&retry, statement)) == SQLITE_ROW){
const char *q_filehash = (const char *) sqlite3_column_text(statement, 0);
if (q_filehash && *q_filehash) {
rhizome_filehash_t hash;
if (str_to_rhizome_filehash_t(&hash, q_filehash) == -1) {
WARNF("invalid field MANIFESTS.filehash=%s -- ignored", alloca_str_toprint(q_filehash));
} else if (rhizome_exists(&hash))
ret=0;
}else
ret=0;
}else if(sqlite_code_ok(r))
ret=1;
else
ret=-1;
WHYF("Malformed filehash %s", q_filehash);
status = RHIZOME_BUNDLE_STATUS_ERROR;
}else{
enum rhizome_payload_status pstatus;
switch((pstatus = rhizome_exists(&hash))){
case RHIZOME_PAYLOAD_STATUS_NEW:
status = RHIZOME_BUNDLE_STATUS_NEW;
break;
case RHIZOME_PAYLOAD_STATUS_STORED:
status = RHIZOME_BUNDLE_STATUS_SAME;
break;
case RHIZOME_PAYLOAD_STATUS_BUSY:
status = RHIZOME_BUNDLE_STATUS_BUSY;
break;
default:
status = RHIZOME_BUNDLE_STATUS_ERROR;
break;
}
}
}else{
status = RHIZOME_BUNDLE_STATUS_SAME;
}
}else if (sqlite_code_busy(stepcode)){
status = RHIZOME_BUNDLE_STATUS_BUSY;
}else if (!sqlite_code_ok(stepcode)){
status = RHIZOME_BUNDLE_STATUS_ERROR;
}else{
status = RHIZOME_BUNDLE_STATUS_NEW;
}
sqlite3_finalize(statement);
RETURN(ret);
RETURN(status);
OUT();
}
int rhizome_is_bar_interesting(const rhizome_bar_t *bar)
enum rhizome_bundle_status rhizome_is_bar_interesting(const rhizome_bar_t *bar)
{
char id_hex[RHIZOME_BAR_PREFIX_BYTES *2 + 2];
tohex(id_hex, RHIZOME_BAR_PREFIX_BYTES * 2, rhizome_bar_prefix(bar));
@ -2030,7 +2097,7 @@ int rhizome_is_bar_interesting(const rhizome_bar_t *bar)
return is_interesting(id_hex, rhizome_bar_version(bar));
}
int rhizome_is_manifest_interesting(rhizome_manifest *m)
enum rhizome_bundle_status rhizome_is_manifest_interesting(rhizome_manifest *m)
{
return is_interesting(alloca_tohex_rhizome_bid_t(m->keypair.public_key), m->version);
}

View File

@ -171,8 +171,8 @@ void rhizome_fetch_status(struct sched_ent *alarm)
candidate_size += q->candidate_queue[j].manifest->filesize;
}
}
// if (candidates == 0 && q->active.state==RHIZOME_FETCH_FREE)
// continue;
if (candidates == 0 && q->active.state==RHIZOME_FETCH_FREE)
continue;
DEBUGF(rhizome_rx, "Fetch slot %d, candidates %u of %u %"PRIu64" bytes, %s %"PRIu64" of %"PRIu64,
i, candidates, q->candidate_queue_size, candidate_size,
fetch_state(q->active.state),
@ -729,7 +729,7 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m,
}
// If we already have this version or newer, do not fetch.
if (!rhizome_is_manifest_interesting(m)) {
if (rhizome_is_manifest_interesting(m) == RHIZOME_BUNDLE_STATUS_SAME) {
DEBUG(rhizome_rx, " fetch not started -- already have that version or newer");
RETURN(SUPERSEDED);
}
@ -878,7 +878,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
DEBUGF(rhizome_rx, "Considering import bid=%s version=%"PRIu64" size=%"PRIu64,
alloca_tohex_rhizome_bid_t(m->keypair.public_key), m->version, m->filesize);
if (!rhizome_is_manifest_interesting(m)) {
if (rhizome_is_manifest_interesting(m) == RHIZOME_BUNDLE_STATUS_SAME) {
DEBUG(rhizome_rx, " already stored that version or newer");
rhizome_manifest_free(m);
RETURN(-1);

View File

@ -328,7 +328,7 @@ next:
if (bar_count > max_tests && random()%bar_count >= max_tests)
continue;
test_count++;
if (rhizome_is_bar_interesting(bars[index])==1){
if (rhizome_is_bar_interesting(bars[index])==RHIZOME_BUNDLE_STATUS_NEW){
// add a request for the manifest
if (!payload){
header.source = get_my_subscriber(1);

View File

@ -41,31 +41,39 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size);
int rhizome_exists(const rhizome_filehash_t *hashp)
enum rhizome_payload_status rhizome_exists(const rhizome_filehash_t *hashp)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
uint64_t gotfile = 0;
if (sqlite_exec_uint64(&gotfile, "SELECT COUNT(*) FROM FILES WHERE id = ? and datavalid = 1;", RHIZOME_FILEHASH_T, hashp, END) != 1)
return 0;
int stepcode = sqlite_exec_uint64_retry(&retry, &gotfile, "SELECT COUNT(*) FROM FILES WHERE id = ? and datavalid = 1;",
RHIZOME_FILEHASH_T, hashp, END);
if (sqlite_code_busy(stepcode))
return RHIZOME_PAYLOAD_STATUS_BUSY;
if (!sqlite_code_ok(stepcode))
return RHIZOME_PAYLOAD_STATUS_ERROR;
if (gotfile==0)
return 0;
uint64_t blob_rowid;
if (sqlite_exec_uint64(&blob_rowid,
"SELECT rowid "
"FROM FILEBLOBS "
"WHERE id = ?", RHIZOME_FILEHASH_T, hashp, END) == -1)
return 0;
if (blob_rowid !=0)
return 1;
// No row in FILEBLOBS, look for an external blob file.
return RHIZOME_PAYLOAD_STATUS_NEW;
char blob_path[1024];
if (!FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%s", RHIZOME_BLOB_SUBDIR, alloca_tohex_rhizome_filehash_t(*hashp)))
return 0;
struct stat st;
if (stat(blob_path, &st) == -1)
return 0;
return 1;
if (FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%s", RHIZOME_BLOB_SUBDIR, alloca_tohex_rhizome_filehash_t(*hashp))){
struct stat st;
if (stat(blob_path, &st) == 0)
return RHIZOME_PAYLOAD_STATUS_STORED;
}
uint64_t blob_rowid = 0;
stepcode = sqlite_exec_uint64_retry(&retry, &blob_rowid,
"SELECT rowid "
"FROM FILEBLOBS "
"WHERE id = ?", RHIZOME_FILEHASH_T, hashp, END);
if (sqlite_code_busy(stepcode))
return RHIZOME_PAYLOAD_STATUS_BUSY;
if (!sqlite_code_ok(stepcode))
return RHIZOME_PAYLOAD_STATUS_ERROR;
if (blob_rowid!=0)
return RHIZOME_PAYLOAD_STATUS_STORED;
return RHIZOME_PAYLOAD_STATUS_NEW;
}
/* Creates a row in the FILEBLOBS table and return the ROWID. Returns 0 if unsuccessful (error
@ -213,33 +221,37 @@ static uint64_t store_space_limit(uint64_t current_size)
// TODO readonly version?
static enum rhizome_payload_status store_make_space(uint64_t bytes, struct rhizome_cleanup_report *report)
{
uint64_t external_bytes;
uint64_t db_page_size;
uint64_t db_page_count;
uint64_t db_free_page_count;
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
uint64_t external_bytes=0;
uint64_t db_page_size=0;
uint64_t db_page_count=0;
uint64_t db_free_page_count=0;
// No limit?
if (config.rhizome.database_size==UINT64_MAX && config.rhizome.min_free_space==0)
return RHIZOME_PAYLOAD_STATUS_NEW;
// TODO index external_bytes calculation and/or cache result
if ( sqlite_exec_uint64_retry(&retry, &db_page_size, "PRAGMA page_size;", END) == -1LL
|| sqlite_exec_uint64_retry(&retry, &db_page_count, "PRAGMA page_count;", END) == -1LL
|| sqlite_exec_uint64_retry(&retry, &db_free_page_count, "PRAGMA freelist_count;", END) == -1LL
|| sqlite_exec_uint64_retry(&retry, &external_bytes,
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
int stepcode = sqlite_exec_uint64_retry(&retry, &db_page_size, "PRAGMA page_size;", END);
if (sqlite_code_ok(stepcode))
stepcode = sqlite_exec_uint64_retry(&retry, &db_page_count, "PRAGMA page_count;", END);
if (sqlite_code_ok(stepcode))
stepcode = sqlite_exec_uint64_retry(&retry, &db_free_page_count, "PRAGMA freelist_count;", END);
if (sqlite_code_ok(stepcode))
// TODO index and/or cache result?
stepcode = sqlite_exec_uint64_retry(&retry, &external_bytes,
"SELECT SUM(length) "
"FROM FILES "
"WHERE NOT EXISTS( "
"SELECT 1 "
"FROM FILEBLOBS "
"WHERE FILES.ID = FILEBLOBS.ID "
");", END) == -1LL
)
return WHY("Cannot measure database used bytes");
");", END);
if (sqlite_code_busy(stepcode))
return RHIZOME_PAYLOAD_STATUS_BUSY;
if (!sqlite_code_ok(stepcode))
return RHIZOME_PAYLOAD_STATUS_ERROR;
uint64_t db_used = external_bytes + db_page_size * (db_page_count - db_free_page_count);
const uint64_t limit = store_space_limit(db_used);
@ -272,8 +284,7 @@ static enum rhizome_payload_status store_make_space(uint64_t bytes, struct rhizo
if (!statement)
return RHIZOME_PAYLOAD_STATUS_ERROR;
int r=0;
while (db_used + bytes > limit && (r=sqlite_step_retry(&retry, statement)) == SQLITE_ROW) {
while (db_used + bytes > limit && (stepcode=sqlite_step_retry(&retry, statement)) == SQLITE_ROW) {
const char *id=(const char *) sqlite3_column_text(statement, 0);
uint64_t length = sqlite3_column_int(statement, 1);
time_ms_t inserttime = sqlite3_column_int64(statement, 2);
@ -289,34 +300,41 @@ static enum rhizome_payload_status store_make_space(uint64_t bytes, struct rhizo
// drop the existing content and recalculate used space
if (rhizome_delete_external(id)==0)
external_bytes -= length;
int rowcount=0;
sqlite3_stmt *s = sqlite_prepare_bind(&retry, "DELETE FROM fileblobs WHERE id = ?", STATIC_TEXT, id, END);
if (s)
sqlite_exec_retry(&retry, s);
if (s && !sqlite_code_ok(stepcode = sqlite_exec_code_retry(&retry, s, &rowcount)))
break;
s = sqlite_prepare_bind(&retry, "DELETE FROM files WHERE id = ?", STATIC_TEXT, id, END);
if (s)
sqlite_exec_retry(&retry, s);
sqlite_exec_uint64_retry(&retry, &db_page_count, "PRAGMA page_count;", END);
sqlite_exec_uint64_retry(&retry, &db_free_page_count, "PRAGMA freelist_count;", END);
if (s && !sqlite_code_ok(stepcode = sqlite_exec_code_retry(&retry, s, &rowcount)))
break;
if (!sqlite_code_ok(stepcode = sqlite_exec_uint64_retry(&retry, &db_page_count, "PRAGMA page_count;", END)))
break;
if (!sqlite_code_ok(stepcode = sqlite_exec_uint64_retry(&retry, &db_free_page_count, "PRAGMA freelist_count;", END)))
break;
if (report)
report->deleted_expired_files++;
db_used = external_bytes + db_page_size * (db_page_count - db_free_page_count);
}
sqlite3_finalize(statement);
if (sqlite_code_busy(stepcode))
return RHIZOME_PAYLOAD_STATUS_BUSY;
if (!sqlite_code_ok(stepcode))
return RHIZOME_PAYLOAD_STATUS_ERROR;
rhizome_vacuum_db(&retry);
if (db_used + bytes <= limit)
return RHIZOME_PAYLOAD_STATUS_NEW;
if (sqlite_code_ok(r)){
DEBUGF(rhizome, "Not enough space for %"PRIu64". Used; %"PRIu64" = %"PRIu64" + %"PRIu64" * (%"PRIu64" - %"PRIu64"), Limit; %"PRIu64,
bytes, db_used, external_bytes, db_page_size, db_page_count, db_free_page_count, limit);
return RHIZOME_PAYLOAD_STATUS_EVICTED;
}
return RHIZOME_PAYLOAD_STATUS_ERROR;
DEBUGF(rhizome, "Not enough space for %"PRIu64". Used; %"PRIu64" = %"PRIu64" + %"PRIu64" * (%"PRIu64" - %"PRIu64"), Limit; %"PRIu64,
bytes, db_used, external_bytes, db_page_size, db_page_count, db_free_page_count, limit);
return RHIZOME_PAYLOAD_STATUS_EVICTED;
}
int rhizome_store_cleanup(struct rhizome_cleanup_report *report)
@ -335,7 +353,7 @@ enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, cons
write->sql_blob=NULL;
if (expectedHashp){
if (rhizome_exists(expectedHashp))
if (rhizome_exists(expectedHashp) == RHIZOME_PAYLOAD_STATUS_STORED)
return RHIZOME_PAYLOAD_STATUS_STORED;
write->id = *expectedHashp;
write->id_known=1;
@ -1057,49 +1075,60 @@ enum rhizome_payload_status rhizome_open_read(struct rhizome_read *read, const r
read->offset = 0;
read->hash_offset = 0;
int r = sqlite_exec_uint64(&read->length,"SELECT length FROM FILES WHERE id = ?",
RHIZOME_FILEHASH_T, &read->id, END);
if (r == -1)
return RHIZOME_PAYLOAD_STATUS_ERROR;
if (r == 0)
return RHIZOME_PAYLOAD_STATUS_NEW;
assert(read->length>0);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_uint64(&read->blob_rowid,
int stepcode = sqlite_exec_uint64_retry(&retry, &read->length,"SELECT length FROM FILES WHERE id = ?",
RHIZOME_FILEHASH_T, &read->id, END);
if (sqlite_code_busy(stepcode))
return RHIZOME_PAYLOAD_STATUS_BUSY;
if (stepcode != SQLITE_ROW){
if (sqlite_code_ok(stepcode))
return RHIZOME_PAYLOAD_STATUS_NEW;
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
assert(read->length>0);
crypto_hash_sha512_init(&read->sha512_context);
char blob_path[1024];
if (FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%s", RHIZOME_BLOB_SUBDIR, alloca_tohex_rhizome_filehash_t(read->id))){
int fd = open(blob_path, O_RDONLY);
DEBUGF(rhizome_store, "open(%s) = %d", alloca_str_toprint(blob_path), fd);
if (fd == -1){
if (errno!=ENOENT)
WHYF_perror("open(%s)", alloca_str_toprint(blob_path));
}else{
off64_t pos = lseek64(fd, 0, SEEK_END);
if (pos == -1)
WHYF_perror("lseek64(%s,0,SEEK_END)", alloca_str_toprint(blob_path));
if (read->length <= (uint64_t)pos){
read->blob_fd = fd;
DEBUGF(rhizome_store, "Opened stored file %s as fd %d, len %"PRIu64, blob_path, read->blob_fd, read->length);
return RHIZOME_PAYLOAD_STATUS_STORED;
}
DEBUGF(rhizome_store, "Ignoring file? %s fd %d, len %"PRIu64", seek %zd", blob_path, fd, read->length, pos);
close(fd);
}
}
stepcode = sqlite_exec_uint64_retry(&retry, &read->blob_rowid,
"SELECT rowid "
"FROM FILEBLOBS "
"WHERE id = ?", RHIZOME_FILEHASH_T, &read->id, END) == -1)
return RHIZOME_PAYLOAD_STATUS_ERROR;
"WHERE id = ?", RHIZOME_FILEHASH_T, &read->id, END);
if (sqlite_code_busy(stepcode))
return RHIZOME_PAYLOAD_STATUS_BUSY;
if (read->blob_rowid == 0) {
// No row in FILEBLOBS, look for an external blob file.
char blob_path[1024];
if (!FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%s", RHIZOME_BLOB_SUBDIR, alloca_tohex_rhizome_filehash_t(read->id)))
return RHIZOME_PAYLOAD_STATUS_ERROR;
read->blob_fd = open(blob_path, O_RDONLY);
if (read->blob_fd == -1) {
if (errno == ENOENT) {
DEBUGF(rhizome_store, "Stored file does not exist: %s", blob_path);
// make sure we remove an orphan file row
rhizome_delete_file(&read->id);
return RHIZOME_PAYLOAD_STATUS_NEW;
}
WHYF_perror("open(%s)", alloca_str_toprint(blob_path));
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
off64_t pos = lseek64(read->blob_fd, 0, SEEK_END);
if (pos == -1) {
WHYF_perror("lseek64(%s,0,SEEK_END)", alloca_str_toprint(blob_path));
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
if (read->length != (uint64_t)pos){
WHYF("Length mismatch");
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
DEBUGF(rhizome_store, "Opened stored file %s as fd %d, len %"PRIx64, blob_path, read->blob_fd, read->length);
if (!sqlite_code_ok(stepcode))
return RHIZOME_PAYLOAD_STATUS_ERROR;
if (stepcode == SQLITE_ROW){
DEBUGF(rhizome_store, "Opened stored blob, rowid %d", read->blob_rowid);
return RHIZOME_PAYLOAD_STATUS_STORED;
}
crypto_hash_sha512_init(&read->sha512_context);
return RHIZOME_PAYLOAD_STATUS_STORED;
// database is inconsistent, clean it up
rhizome_delete_file(&read->id);
return RHIZOME_PAYLOAD_STATUS_NEW;
}
static ssize_t rhizome_read_retry(sqlite_retry_state *retry, struct rhizome_read *read_state, unsigned char *buffer, size_t bufsz)

View File

@ -81,6 +81,7 @@ static const char *get_state_name(uint8_t state)
case STATE_SEND_PAYLOAD: return "SEND_PAYLOAD";
case STATE_RECV_PAYLOAD: return "RECV_PAYLOAD";
case STATE_COMPLETING: return "COMPLETING";
case STATE_LOOKUP_BAR: return "LOOKUP_BAR";
}
return "Unknown";
}
@ -191,7 +192,6 @@ void sync_keys_status(struct sched_ent *alarm)
if (!IF_DEBUG(rhizome_sync_keys))
return;
DEBUGF(rhizome_sync_keys, "Sync state;");
sync_enum_differences(sync_tree, sync_key_diffs);
time_ms_t next = gettime_ms()+1000;
@ -203,18 +203,36 @@ static int sync_complete_transfers(){
while(completing){
struct transfers *transfer = completing;
assert(transfer->state == STATE_COMPLETING);
enum rhizome_payload_status status = rhizome_finish_write(transfer->write);
if (status == RHIZOME_PAYLOAD_STATUS_BUSY)
return 1;
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&transfer->key), status);
free(transfer->write);
transfer->write = NULL;
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&transfer->key), rhizome_bundle_status_message_nonnull(add_state));
if (transfer->write){
enum rhizome_payload_status status = rhizome_finish_write(transfer->write);
if (status == RHIZOME_PAYLOAD_STATUS_BUSY)
return 1;
free(transfer->write);
transfer->write = NULL;
if (status != RHIZOME_PAYLOAD_STATUS_NEW && status != RHIZOME_PAYLOAD_STATUS_STORED){
WARNF("Write failed %s (hash %s)",
rhizome_payload_status_message_nonnull(status),
alloca_sync_key(&transfer->key));
goto cleanup;
}
}
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(transfer->manifest, NULL);
switch(add_state){
case RHIZOME_BUNDLE_STATUS_BUSY:
return 1;
case RHIZOME_BUNDLE_STATUS_NEW:
case RHIZOME_BUNDLE_STATUS_SAME:
break;
default:
WARNF("Import manifest (hash %s) failed %s",
alloca_sync_key(&transfer->key), rhizome_bundle_status_message_nonnull(add_state));
}
cleanup:
if (transfer->manifest)
rhizome_manifest_free(transfer->manifest);
transfer->manifest=NULL;
@ -379,6 +397,9 @@ static void sync_send_peer(struct subscriber *peer, struct rhizome_sync_keys *sy
break;
default:
msg_complete = 0;
DEBUGF(rhizome_sync_keys, "Can't send manifest right now, (hash %s) %s",
alloca_sync_key(&msg->key),
rhizome_bundle_status_message_nonnull(status));
case RHIZOME_BUNDLE_STATUS_NEW:
// TODO we don't have this bundle anymore!
ob_rewind(payload);
@ -492,7 +513,7 @@ void sync_send(struct sched_ent *alarm)
time_ms_t next_action = msp_iterator_close(&iterator);
if (sync_complete_transfers()==1){
time_ms_t try_again = gettime_ms()+100;
time_ms_t try_again = gettime_ms()+20;
if (next_action > try_again)
next_action = try_again;
}
@ -548,15 +569,20 @@ static void build_tree()
sync_tree = sync_alloc_state(NULL, sync_peer_has, sync_peer_does_not_have, sync_peer_now_has);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT manifest_hash FROM manifests "
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT id, version, manifest_hash FROM manifests "
"WHERE manifests.filehash IS NULL OR EXISTS(SELECT 1 FROM files WHERE files.id = manifests.filehash);");
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
const char *hash = (const char *) sqlite3_column_text(statement, 0);
const char *q_id = (const char *) sqlite3_column_text(statement, 0);
uint64_t q_version = sqlite3_column_int64(statement, 1);
const char *hash = (const char *) sqlite3_column_text(statement, 2);
rhizome_filehash_t manifest_hash;
if (str_to_rhizome_filehash_t(&manifest_hash, hash)==0){
sync_key_t key;
memcpy(key.key, manifest_hash.binary, sizeof(sync_key_t));
DEBUGF(rhizome_sync_keys, "Adding %s to tree",
DEBUGF(rhizome_sync_keys, "Adding %s:%"PRIu64" (hash %s) to tree",
q_id,
q_version,
alloca_sync_key(&key));
sync_add_key(sync_tree, &key, NULL);
}
@ -600,27 +626,26 @@ void sync_send_keys(struct sched_ent *alarm)
DEBUG(rhizome_sync_keys,"Queueing next message for now");
RESCHEDULE(alarm, now, now, now);
}else{
DEBUG(rhizome_sync_keys,"Queueing next message for 5s");
RESCHEDULE(alarm, now+5000, now+30000, TIME_MS_NEVER_WILL);
}
}
static void process_transfer_message(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload)
static int process_transfer_message(struct subscriber *peer, struct rhizome_sync_keys *sync_state, struct overlay_buffer *payload)
{
while(ob_remaining(payload)){
ob_checkpoint(payload);
int msg_state = ob_get(payload);
if (msg_state<0)
return;
return 0;
sync_key_t key;
if (ob_get_bytes(payload, key.key, sizeof key)<0)
return;
return 0;
int rank=-1;
if (msg_state & STATE_REQ){
rank = ob_get(payload);
if (rank < 0)
return;
return 0;
}
DEBUGF(rhizome_sync_keys, "Processing sync message %s %s %d",
@ -629,19 +654,22 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
case STATE_SEND_BAR:{
rhizome_bar_t bar;
if (ob_get_bytes(payload, bar.binary, sizeof(rhizome_bar_t))<0)
return;
return 0;
if (!config.rhizome.fetch)
break;
int r = rhizome_is_bar_interesting(&bar);
if (r == 0){
DEBUGF(rhizome_sync_keys, "Ignoring BAR for %s, (Uninteresting)",
enum rhizome_bundle_status status = rhizome_is_bar_interesting(&bar);
if (status == RHIZOME_BUNDLE_STATUS_SAME){
DEBUGF(rhizome_sync_keys, "Ignoring BAR %s:%"PRIu64" (hash %s), (Uninteresting)",
alloca_tohex_rhizome_bar_prefix(&bar),
rhizome_bar_version(&bar),
alloca_sync_key(&key));
break;
}else if (r==-1){
}else if (status != RHIZOME_BUNDLE_STATUS_NEW){
// don't consume the payload
ob_rewind(payload);
return;
return 1;
}
// send a request for the manifest
rank = rhizome_bar_log_size(&bar);
@ -666,7 +694,7 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
struct rhizome_manifest_summary summ;
if (!rhizome_manifest_inspect((char *)data, len, &summ)){
WHYF("Ignoring manifest for %s, (Malformed)",
WHYF("Ignoring manifest (hash %s), (Malformed)",
alloca_sync_key(&key));
break;
}
@ -676,7 +704,7 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
if (!m){
// don't consume the payload
ob_rewind(payload);
return;
return 1;
}
memcpy(m->manifestdata, data, len);
@ -684,23 +712,27 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
if ( rhizome_manifest_parse(m) == -1
|| !rhizome_manifest_validate(m)
) {
WHYF("Ignoring manifest for %s, (Malformed)",
WHYF("Ignoring manifest %s:%u"PRIu64" (hash %s), (Malformed)",
alloca_tohex_rhizome_bid_t(m->keypair.public_key),
m->version,
alloca_sync_key(&key));
rhizome_manifest_free(m);
break;
}
int r = rhizome_is_manifest_interesting(m);
if (r == 0){
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (Uninteresting)",
enum rhizome_bundle_status bstatus = rhizome_is_manifest_interesting(m);
if (bstatus == RHIZOME_BUNDLE_STATUS_SAME){
DEBUGF(rhizome_sync_keys, "Ignoring manifest %s:%u"PRIu64" (hash %s), (Uninteresting)",
alloca_tohex_rhizome_bid_t(m->keypair.public_key),
m->version,
alloca_sync_key(&key));
rhizome_manifest_free(m);
break;
}else if (r == -1){
}else if (bstatus != RHIZOME_BUNDLE_STATUS_NEW){
// don't consume the payload
rhizome_manifest_free(m);
ob_rewind(payload);
return;
return 1;
}
// start writing the payload
@ -723,7 +755,7 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
rhizome_fail_write(write);
free(write);
ob_rewind(payload);
return;
return 1;
}
DEBUGF(rhizome_sync_keys, "Already have payload, imported manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_status));
@ -736,14 +768,17 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
rhizome_fail_write(write);
free(write);
ob_rewind(payload);
return;
return 1;
default:
DEBUGF(rhizome_sync_keys, "Ignoring manifest for %s, (%s)",
alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
break;
}
if (status!=RHIZOME_PAYLOAD_STATUS_NEW){
DEBUGF(rhizome_sync_keys, "Ignoring manifest %s:%"PRIu64" (hash %s), (%s)",
alloca_tohex_rhizome_bid_t(m->keypair.public_key),
m->version,
alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
rhizome_manifest_free(m);
rhizome_fail_write(write);
free(write);
@ -771,14 +806,14 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
// no new content in the new version, we can import now
enum rhizome_payload_status status = rhizome_finish_write(write);
DEBUGF(rhizome_sync_keys, "Write complete %s (%d)", alloca_sync_key(&key), status);
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_STORED){
enum rhizome_bundle_status add_state = rhizome_add_manifest_to_store(m, NULL);
DEBUGF(rhizome_sync_keys, "Import %s = %s",
alloca_sync_key(&key), rhizome_bundle_status_message_nonnull(add_state));
} else
} else {
WHYF("Failed to complete payload %s %s", alloca_sync_key(&key), rhizome_payload_status_message_nonnull(status));
rhizome_fail_write(write);
}
free(write);
rhizome_manifest_free(m);
break;
@ -804,7 +839,7 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
rhizome_manifest *m = rhizome_new_manifest();
if (!m){
ob_rewind(payload);
return;
return 1;
}
enum rhizome_bundle_status status = rhizome_retrieve_manifest_by_hash_prefix(key.key, sizeof(sync_key_t), m);
@ -868,6 +903,7 @@ static void process_transfer_message(struct subscriber *peer, struct rhizome_syn
WHYF("Unknown message type %x", msg_state);
}
}
return 0;
}
@ -910,13 +946,14 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
struct rhizome_sync_keys *sync_state = get_peer_sync_state(header->source);
sync_state->connection = connection_state;
while(1){
int r = 0;
while(r == 0){
struct msp_packet *packet = msp_recv_next(connection_state);
if (!packet)
break;
struct overlay_buffer *recv_payload = msp_unpack(connection_state, packet);
if (recv_payload)
process_transfer_message(header->source, sync_state, recv_payload);
r = process_transfer_message(header->source, sync_state, recv_payload);
msp_consumed(connection_state, packet, recv_payload);
}
@ -924,10 +961,15 @@ static int sync_keys_recv(struct internal_mdp_header *header, struct overlay_buf
time_ms_t next_action = msp_next_action(connection_state);
if (sync_complete_transfers()==1){
time_ms_t try_again = gettime_ms() + 100;
time_ms_t try_again = gettime_ms() + 20;
if (next_action > try_again)
next_action = try_again;
}
if (r!=0){
time_ms_t wail_till = gettime_ms() + 20;
if (next_action < wail_till)
next_action = wail_till;
}
struct sched_ent *alarm=&ALARM_STRUCT(sync_send);
if (alarm->alarm > next_action || !is_scheduled(alarm))

View File

@ -43,11 +43,11 @@ default_config() {
set debug.http_server on \
set debug.httpd on \
set debug.rhizome_manifest on \
set debug.rhizome_ads on \
set debug.rhizome_ads off \
set debug.rhizome_tx on \
set debug.rhizome_rx on \
set debug.rhizome_sync_keys on \
set debug.msp on
set debug.msp off
}
# Called by start_servald_instances for each instance.