Rework handling of race conditions when storing the same bundle (#109)

This commit is contained in:
Jeremy Lakeman 2016-05-23 12:54:27 +09:30
parent 267a7da0d9
commit 177f695671
3 changed files with 86 additions and 56 deletions

View File

@ -557,6 +557,7 @@ int _sqlite_step(struct __sourceloc, int log_level, sqlite_retry_state *retry, s
int _sqlite_exec(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, sqlite3_stmt *statement);
int _sqlite_exec_void(struct __sourceloc, int log_level, const char *sqltext, ...);
int _sqlite_exec_void_retry(struct __sourceloc, int log_level, sqlite_retry_state *retry, const char *sqltext, ...);
int _sqlite_exec_changes_retry(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, int *rowcount, int *changes, const char *sqltext, ...);
int _sqlite_exec_uint64(struct __sourceloc, uint64_t *result, const char *sqltext, ...);
int _sqlite_exec_uint64_retry(struct __sourceloc, sqlite_retry_state *retry, uint64_t *result, const char *sqltext, ...);
int _sqlite_exec_strbuf(struct __sourceloc, strbuf sb, const char *sqltext, ...);
@ -604,6 +605,8 @@ int _sqlite_blob_close(struct __sourceloc, int log_level, sqlite3_blob *blob);
#define sqlite_exec_void(sql,arg,...) _sqlite_exec_void(__WHENCE__, LOG_LEVEL_ERROR, (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_void_loglevel(ll,sql,arg,...) _sqlite_exec_void(__WHENCE__, (ll), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_void_retry(rs,sql,arg,...) _sqlite_exec_void_retry(__WHENCE__, LOG_LEVEL_ERROR, (rs), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_changes_retry(rs,ROWS,CHANGES,sql,arg,...) _sqlite_exec_changes_retry(__WHENCE__, LOG_LEVEL_ERROR, (rs), (ROWS), (CHANGES), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_changes_retry_loglevel(ll,rs,ROWS,CHANGES,sql,arg,...) _sqlite_exec_changes_retry(__WHENCE__, (ll), (rs), (ROWS), (CHANGES), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_void_retry_loglevel(ll,rs,sql,arg,...) _sqlite_exec_void_retry(__WHENCE__, (ll), (rs), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_uint64(res,sql,arg,...) _sqlite_exec_uint64(__WHENCE__, (res), (sql), arg, ##__VA_ARGS__)
#define sqlite_exec_uint64_retry(rs,res,sql,arg,...) _sqlite_exec_uint64_retry(__WHENCE__, (rs), (res), (sql), arg, ##__VA_ARGS__)

View File

@ -869,14 +869,13 @@ int _sqlite_step(struct __sourceloc __whence, int log_level, sqlite_retry_state
int ret = -1;
sqlite_trace_whence = &__whence;
while (statement) {
int stepcode = sqlite3_step(statement);
switch (stepcode) {
ret = sqlite3_step(statement);
switch (ret) {
case SQLITE_OK:
case SQLITE_DONE:
case SQLITE_ROW:
if (retry)
_sqlite_retry_done(__whence, retry, sqlite3_sql(statement));
ret = stepcode;
statement = NULL;
break;
case SQLITE_BUSY:
@ -885,10 +884,9 @@ int _sqlite_step(struct __sourceloc __whence, int log_level, sqlite_retry_state
sqlite3_reset(statement);
break; // back to sqlite3_step()
}
ret = stepcode;
// fall through...
default:
LOGF(log_level, "query failed (%d), %s: %s", stepcode, sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
LOGF(log_level, "query failed (%d), %s: %s", ret, sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
statement = NULL;
break;
}
@ -898,6 +896,20 @@ int _sqlite_step(struct __sourceloc __whence, int log_level, sqlite_retry_state
return ret;
}
int _sqlite_exec_code(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, sqlite3_stmt *statement, int *rowcount)
{
*rowcount = 0;
if (!statement)
return SQLITE_ERROR;
int stepcode;
while ((stepcode = _sqlite_step(__whence, log_level, retry, statement)) == SQLITE_ROW)
++(*rowcount);
sqlite3_finalize(statement);
if (sqlite_trace_func())
_DEBUGF("rowcount=%d changes=%d", *rowcount, sqlite3_changes(rhizome_db));
return stepcode;
}
/*
* Convenience wrapper for executing a prepared SQL statement where the row outputs are not wanted.
* Always finalises the statement before returning.
@ -915,36 +927,25 @@ int _sqlite_step(struct __sourceloc __whence, int log_level, sqlite_retry_state
*/
int _sqlite_exec(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, sqlite3_stmt *statement)
{
if (!statement)
return -1;
int rowcount = 0;
int stepcode;
while ((stepcode = _sqlite_step(__whence, log_level, retry, statement)) == SQLITE_ROW)
++rowcount;
sqlite3_finalize(statement);
if (sqlite_trace_func())
_DEBUGF("rowcount=%d changes=%d", rowcount, sqlite3_changes(rhizome_db));
int rowcount;
int stepcode = _sqlite_exec_code(__whence, log_level, retry, statement, &rowcount);
return sqlite_code_ok(stepcode) ? rowcount : -1;
}
/* Execute an SQL command that returns no value. If an error occurs then logs it at ERROR level and
* returns -1. Otherwise returns the number of rows changed by the command.
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
static int _sqlite_vexec_void(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, const char *sqltext, va_list ap)
static int _sqlite_vexec_void_code(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, int *rowcount, int *changes, const char *sqltext, va_list ap)
{
*changes=0;
*rowcount=0;
sqlite3_stmt *statement = _sqlite_prepare(__whence, log_level, retry, sqltext);
if (!statement)
return -1;
return SQLITE_ERROR;
if (_sqlite_vbind(__whence, log_level, retry, statement, ap) == -1)
return -1;
int rowcount = _sqlite_exec(__whence, log_level, retry, statement);
if (rowcount == -1)
return -1;
if (rowcount)
WARNF("void query unexpectedly returned %d row%s", rowcount, rowcount == 1 ? "" : "s");
return sqlite3_changes(rhizome_db);
return SQLITE_ERROR;
int stepcode = _sqlite_exec_code(__whence, log_level, retry, statement, rowcount);
if (sqlite_code_ok(stepcode)){
*changes = sqlite3_changes(rhizome_db);
}
return stepcode;
}
/* Convenience wrapper for executing an SQL command that returns no value. If an error occurs then
@ -955,12 +956,17 @@ static int _sqlite_vexec_void(struct __sourceloc __whence, int log_level, sqlite
*/
int _sqlite_exec_void(struct __sourceloc __whence, int log_level, const char *sqltext, ...)
{
int rowcount, changes;
va_list ap;
va_start(ap, sqltext);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
int ret = _sqlite_vexec_void(__whence, log_level, &retry, sqltext, ap);
int stepcode = _sqlite_vexec_void_code(__whence, log_level, &retry, &rowcount, &changes, sqltext, ap);
va_end(ap);
return ret;
if (!sqlite_code_ok(stepcode))
return -1;
if (rowcount)
WARNF("void query unexpectedly returned %d row%s", rowcount, rowcount == 1 ? "" : "s");
return changes;
}
/* Same as sqlite_exec_void() but if the statement cannot be executed because the database is
@ -972,11 +978,25 @@ int _sqlite_exec_void(struct __sourceloc __whence, int log_level, const char *sq
*/
int _sqlite_exec_void_retry(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, const char *sqltext, ...)
{
int rowcount, changes;
va_list ap;
va_start(ap, sqltext);
int ret = _sqlite_vexec_void(__whence, log_level, retry, sqltext, ap);
int stepcode = _sqlite_vexec_void_code(__whence, log_level, retry, &rowcount, &changes, sqltext, ap);
va_end(ap);
return ret;
if (!sqlite_code_ok(stepcode))
return -1;
if (rowcount)
WARNF("void query unexpectedly returned %d row%s", rowcount, rowcount == 1 ? "" : "s");
return changes;
}
int _sqlite_exec_changes_retry(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, int *rowcount, int *changes, const char *sqltext, ...)
{
va_list ap;
va_start(ap, sqltext);
int stepcode = _sqlite_vexec_void_code(__whence, log_level, retry, rowcount, changes, sqltext, ap);
va_end(ap);
return stepcode;
}
static int _sqlite_vexec_uint64(struct __sourceloc __whence, sqlite_retry_state *retry, uint64_t *result, const char *sqltext, va_list ap)

View File

@ -767,11 +767,28 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
}
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (rhizome_exists(&write->id)) {
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
goto dbfailure;
// attempt the insert first
time_ms_t now = gettime_ms();
int rowcount, changes;
int stepcode = sqlite_exec_changes_retry_loglevel(
LOG_LEVEL_INFO,
&retry, &rowcount, &changes,
"INSERT INTO FILES(id,length,datavalid,inserttime,last_verified) VALUES(?,?,1,?,?);",
RHIZOME_FILEHASH_T, &write->id,
INT64, write->file_length,
INT64, now,
INT64, now,
END
);
if (stepcode == SQLITE_CONSTRAINT){
// we've already got that payload, delete the new copy
if (write->blob_rowid){
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE rowid = ?;",
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE rowid = ?;",
INT64, write->blob_rowid, END);
}
if (external){
@ -780,23 +797,8 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
}
DEBUGF(rhizome_store, "Payload id=%s already present, removed id='%"PRIu64"'", alloca_tohex_rhizome_filehash_t(write->id), write->temp_id);
status = RHIZOME_PAYLOAD_STATUS_STORED;
}else{
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
goto dbfailure;
time_ms_t now = gettime_ms();
if (sqlite_exec_void_retry(
&retry,
"INSERT OR REPLACE INTO FILES(id,length,datavalid,inserttime,last_verified) VALUES(?,?,1,?,?);",
RHIZOME_FILEHASH_T, &write->id,
INT64, write->file_length,
INT64, now,
INT64, now,
END
) == -1
)
goto dbfailure;
}else if(sqlite_code_ok(stepcode)){
if (external) {
char dest_path[1024];
@ -818,12 +820,17 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
)
goto dbfailure;
}
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1)
goto dbfailure;
// A test case in tests/rhizomeprotocol depends on this debug message:
DEBUGF(rhizome_store, "Stored file %s", alloca_tohex_rhizome_filehash_t(write->id));
}
}else
goto dbfailure;
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1)
goto dbfailure;
write->blob_rowid = 0;
// A test case in tests/rhizomeprotocol depends on this debug message:
if (status == RHIZOME_PAYLOAD_STATUS_NEW)
DEBUGF(rhizome_store, "Stored file %s", alloca_tohex_rhizome_filehash_t(write->id));
return status;
dbfailure: