Insert manifest within a transaction

This commit is contained in:
Jeremy Lakeman 2012-05-28 11:59:35 +09:30
parent 0e75cbdcad
commit 36389d2b78
2 changed files with 133 additions and 157 deletions

View File

@ -243,7 +243,7 @@ int rhizome_server_close_http_request(int i);
int rhizome_server_http_send_bytes(int rn,rhizome_http_request *r);
int rhizome_server_parse_http_request(int rn,rhizome_http_request *r);
int rhizome_server_simple_http_response(rhizome_http_request *r,int result, char *response);
long long sqlite_exec_void(const char *sqlformat,...);
int sqlite_exec_void(const char *sqlformat,...);
long long sqlite_exec_int64(const char *sqlformat,...);
int sqlite_exec_strbuf(strbuf sb, const char *sqlformat,...);
int rhizome_server_http_response_header(rhizome_http_request *r,int result,

View File

@ -25,6 +25,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
long long rhizome_space=0;
static const char *rhizome_thisdatastore_path = NULL;
static int rhizome_enabled_flag = -1; // unknown
#define SQLITE_CODE_OK(code) (code==SQLITE_OK || code==SQLITE_DONE)
int rhizome_enabled()
{
@ -121,10 +122,10 @@ int rhizome_opendb()
if (rhizome_db) return 0;
if (create_rhizome_datastore_dir() == -1)
return -1;
return WHY("No Directory");
char dbname[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(dbname, "rhizome.db"))
return -1;
return WHY("Invalid path");
if (sqlite3_open(dbname,&rhizome_db))
return WHYF("SQLite could not open database: %s", sqlite3_errmsg(rhizome_db));
@ -176,7 +177,7 @@ int rhizome_opendb()
Convenience wrapper for executing an SQL command that returns a no value.
Returns -1 if an error occurs, otherwise zero.
*/
long long sqlite_exec_void(const char *sqlformat,...)
int sqlite_exec_void(const char *sqlformat,...)
{
if (!rhizome_db) rhizome_opendb();
strbuf stmt = strbuf_alloca(8192);
@ -187,6 +188,7 @@ long long sqlite_exec_void(const char *sqlformat,...)
if (strbuf_overrun(stmt))
return WHYF("Sql statement overrun: %s", strbuf_str(stmt));
sqlite3_stmt *statement;
switch (sqlite3_prepare_v2(rhizome_db, strbuf_str(stmt), -1, &statement, NULL)) {
case SQLITE_OK: case SQLITE_DONE:
break;
@ -194,9 +196,7 @@ long long sqlite_exec_void(const char *sqlformat,...)
WHY(strbuf_str(stmt));
WHY(sqlite3_errmsg(rhizome_db));
sqlite3_finalize(statement);
sqlite3_close(rhizome_db);
rhizome_db=NULL;
return WHYF("Sql statement prepare: %s -- closed database", strbuf_str(stmt));
return -1;
}
int stepcode;
while ((stepcode = sqlite3_step(statement)) == SQLITE_ROW)
@ -210,7 +210,7 @@ long long sqlite_exec_void(const char *sqlformat,...)
WHY(strbuf_str(stmt));
WHY(sqlite3_errmsg(rhizome_db));
sqlite3_finalize(statement);
return WHYF("Sql statement step: %s", strbuf_str(stmt));
return -1;
}
sqlite3_finalize(statement);
return 0;
@ -239,9 +239,7 @@ long long sqlite_exec_int64(const char *sqlformat,...)
WHY(strbuf_str(stmt));
WHY(sqlite3_errmsg(rhizome_db));
sqlite3_finalize(statement);
sqlite3_close(rhizome_db);
rhizome_db=NULL;
return WHYF("Could not prepare sql statement: %s -- closed database", strbuf_str(stmt));
return -1;
}
if (sqlite3_step(statement) == SQLITE_ROW) {
int n = sqlite3_column_count(statement);
@ -254,7 +252,7 @@ long long sqlite_exec_int64(const char *sqlformat,...)
return result;
}
sqlite3_finalize(statement);
return WHYF("No rows found: %s", strbuf_str(stmt));
return WHY("No rows found");
}
/*
@ -281,8 +279,6 @@ int sqlite_exec_strbuf(strbuf sb, const char *sqlformat,...)
break;
default:
sqlite3_finalize(statement);
sqlite3_close(rhizome_db);
rhizome_db=NULL;
WHY(strbuf_str(stmt));
WHY(sqlite3_errmsg(rhizome_db));
return WHY("Could not prepare sql statement.");
@ -396,25 +392,27 @@ int rhizome_drop_stored_file(const char *id,int maximum_priority)
while ( sqlite3_step(statement) == SQLITE_ROW)
{
/* Find manifests for this file */
const unsigned char *id;
if (sqlite3_column_type(statement, 0)==SQLITE_TEXT) id=sqlite3_column_text(statement, 0);
const unsigned char *manifestId;
if (sqlite3_column_type(statement, 0)==SQLITE_TEXT)
manifestId=sqlite3_column_text(statement, 0);
else {
WHYF("Incorrect type in id column of manifests table.");
continue; }
continue;
}
/* Check that manifest is not part of a higher priority group.
If so, we cannot drop the manifest or the file.
However, we will keep iterating, as we can still drop any other manifests pointing to this file
that are lower priority, and thus free up a little space. */
if (rhizome_manifest_priority((char *)id)>maximum_priority) {
if (rhizome_manifest_priority((char *)manifestId)>maximum_priority) {
WHYF("Cannot drop due to manifest %s",id);
cannot_drop=1;
} else {
printf("removing stale filemanifests, manifests, groupmemberships\n");
sqlite_exec_void("delete from filemanifests where manifestid='%s';",id);
sqlite_exec_void("delete from manifests where manifestid='%s';",id);
sqlite_exec_void("delete from keypairs where public='%s';",id);
sqlite_exec_void("delete from groupmemberships where manifestid='%s';",id);
sqlite_exec_void("delete from filemanifests where manifestid='%s';",manifestId);
sqlite_exec_void("delete from manifests where manifestid='%s';",manifestId);
sqlite_exec_void("delete from keypairs where public='%s';",manifestId);
sqlite_exec_void("delete from groupmemberships where manifestid='%s';",manifestId);
}
}
sqlite3_finalize(statement);
@ -426,6 +424,40 @@ int rhizome_drop_stored_file(const char *id,int maximum_priority)
return 0;
}
int sqlite3_exec_retry(sqlite3 *db,const char *sql,int (*callback)(void*,int,char**,char**),void *arg,char **errmsg){
int ret;
do{
ret = sqlite3_exec(db, sql, callback, arg, errmsg);
}while(ret==SQLITE_BUSY || ret==SQLITE_LOCKED);
if (!SQLITE_CODE_OK(ret))
WHY(sql);
return ret;
}
int sqlite3_prepare_v2_retry(sqlite3 *db, const char *zSql, int nByte, sqlite3_stmt **ppStmt, const char **pzTail){
int ret;
do{
ret = sqlite3_prepare_v2(db, zSql, nByte, ppStmt, pzTail);
}while(ret==SQLITE_BUSY || ret==SQLITE_LOCKED);
if (!SQLITE_CODE_OK(ret))
WHY(zSql);
return ret;
}
int sqlite3_step_retry(sqlite3_stmt *stmt){
int ret;
while(1){
ret = sqlite3_step(stmt);
if (ret==SQLITE_BUSY || ret==SQLITE_LOCKED){
WHY("Database locked, retrying");
sqlite3_reset(stmt);
}else{
return ret;
}
}
}
/*
Store the specified manifest into the sqlite database.
We assume that sufficient space has been made for us.
@ -472,150 +504,93 @@ int rhizome_store_bundle(rhizome_manifest *m)
str_toupper_inplace(filehash);
/* process should be;
BEGIN TRANSACTION;
try{
// insert new file contents, if not already there
// dont risk losing the old manifest if something fails
UPDATE MANIFESTS
SET manifest = ?,
version = ?,
insserttime = ?,
bar = ?
WHERE id = ?;
if (no rows updated){
INSERT INTO MANIFESTS (id,manifest,version,inserttime,bar)
VALUES (?,?,?,?,?);
}
// remove reference to any old file versions
DELETE FROM FILEMANIFESTS
WHERE manifestid=?
AND fileid != ?;
// remove unreferenced files
DELETE FROM FILES f
AND NOT EXISTS(
SELECT 1
FROM FILEMANIFESTS fm
AND fm.fileid = f.id
);
COMMIT;
}catch (){
// if one step fails, all fail.
ROLLBACK;
}
*/
char *err;
int sql_ret;
sqlite3_stmt *stmt;
/* remove any old version of the manifest */
if (sqlite_exec_int64("SELECT COUNT(*) FROM MANIFESTS WHERE id='%s';",manifestid)>0)
{
/* Manifest already exists.
Remove old manifest entry, and replace with new one.
But we do need to check if the file referenced by the old one is still needed,
and if it's priority is right */
sqlite_exec_void("DELETE FROM MANIFESTS WHERE id='%s';",manifestid);
strbuf b = strbuf_alloca(RHIZOME_FILEHASH_STRLEN + 1);
if (sqlite_exec_strbuf(b, "SELECT fileid from filemanifests where manifestid='%s';", manifestid) == -1)
return -1;
if (strbuf_overrun(b))
return WHYF("got over-long fileid from database: %s", strbuf_str(b));
sqlite_exec_void("DELETE FROM FILEMANIFESTS WHERE manifestid='%s';",manifestid);
/* File check must occur AFTER we drop the manifest, otherwise we think
that it has a reference still */
rhizome_update_file_priority(strbuf_str(b));
}
/* Store manifest */
if (debug & DEBUG_RHIZOME) DEBUGF("Writing into manifests table");
char sqlcmd[1024];
snprintf(sqlcmd,1024,
"INSERT INTO MANIFESTS(id,manifest,version,inserttime,bar) VALUES('%s',?,%lld,%lld,?);",
manifestid, m->version, gettime_ms());
const char *cmdtail;
sqlite3_stmt *statement;
if (sqlite3_prepare_v2(rhizome_db,sqlcmd,strlen(sqlcmd)+1,&statement,&cmdtail) != SQLITE_OK) {
WHY(sqlite3_errmsg(rhizome_db));
sqlite3_finalize(statement);
return WHY("Insert into manifests failed.");
}
/* Bind manifest data to data field */
if (sqlite3_bind_blob(statement,1,m->manifestdata,m->manifest_bytes,SQLITE_TRANSIENT)!=SQLITE_OK)
{
WHY(sqlite3_errmsg(rhizome_db));
sqlite3_finalize(statement);
return WHY("Insert into manifests failed.");
}
/* Bind BAR to data field */
unsigned char bar[RHIZOME_BAR_BYTES];
rhizome_manifest_to_bar(m,bar);
if (sqlite3_bind_blob(statement,2,bar,RHIZOME_BAR_BYTES,SQLITE_TRANSIENT)
!=SQLITE_OK)
{
WHY(sqlite3_errmsg(rhizome_db));
sqlite3_finalize(statement);
return WHY("Insert into manifests failed.");
// we should add the file in the same transaction, but closing the blob seems to cause some issues.
/* Store the file */
#warning need to implement passing of encryption key for file here
if (m->fileLength>0){
if (rhizome_store_file(m,NULL)){
WHY("Could not store file");
sql_ret = 1;
}
if (rhizome_finish_sqlstatement(statement))
return WHY("SQLite3 failed to insert row for manifest");
else {
if (debug & DEBUG_RHIZOME) DEBUGF("Insert into manifests apparently worked.");
}
/* Create relationship between file and manifest */
long long r=sqlite_exec_void("INSERT INTO FILEMANIFESTS(manifestid,fileid) VALUES('%s','%s');", manifestid, filehash);
if (r<0) {
WHY(sqlite3_errmsg(rhizome_db));
return WHY("SQLite3 failed to insert row in filemanifests.");
}
/* Create relationships to groups */
if (!rhizome_db) rhizome_opendb();
sql_ret = sqlite3_exec_retry(rhizome_db, "BEGIN TRANSACTION;", NULL, NULL, &err);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_prepare_v2_retry(rhizome_db, "INSERT OR REPLACE INTO MANIFESTS(id,manifest,version,inserttime,bar) VALUES(?,?,?,?,?);", -1, &stmt, NULL);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_blob(stmt, 2, m->manifestdata, m->manifest_bytes, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_int64(stmt, 3, m->version);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_int64(stmt, 4, gettime_ms());
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_blob(stmt, 5, bar, RHIZOME_BAR_BYTES, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_step_retry(stmt);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_finalize(stmt);
// delete all other file manifest records
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_prepare_v2_retry(rhizome_db, "DELETE FROM FILEMANIFESTS WHERE manifestid=? AND fileid != ?;", -1, &stmt, NULL);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 2, filehash, -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_step_retry(stmt);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_finalize(stmt);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_prepare_v2_retry(rhizome_db, "INSERT OR IGNORE INTO FILEMANIFESTS (manifestid, fileid) VALUES (?, ?);", -1, &stmt, NULL);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 2, filehash, -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_step_retry(stmt);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_finalize(stmt);
// we might need to leave the old file around for a bit
// clean out unreferenced files first
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_exec_retry(rhizome_db, "DELETE FROM FILES WHERE NOT EXISTS( SELECT 1 FROM FILEMANIFESTS WHERE FILEMANIFESTS.fileid = FILES.id);", NULL, NULL, &err);
if (rhizome_manifest_get(m,"isagroup",NULL,0)!=NULL) {
/* This manifest is a group, so add entry to group list.
Created group is not automatically subscribed to, however. */
int closed=rhizome_manifest_get_ll(m,"closedgroup");
if (closed<1) closed=0;
int ciphered=rhizome_manifest_get_ll(m,"cipheredgroup");
if (ciphered<1) ciphered=0;
sqlite_exec_void("delete from grouplist where id='%s';",manifestid);
int storedP
=sqlite_exec_void("insert into grouplist(id,closed,ciphered,priority) VALUES('%s',%d,%d,%d);",
manifestid,closed,ciphered,RHIZOME_PRIORITY_DEFAULT);
if (storedP<0) return WHY("Failed to insert group manifest into grouplist table.");
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_prepare_v2_retry(rhizome_db, "INSERT OR REPLACE INTO GROUPLIST(id,closed,ciphered,priority) VALUES (?,?,?,?);", -1, &stmt, NULL);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_int(stmt, 2, closed);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_int(stmt, 3, ciphered);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_int(stmt, 4, RHIZOME_PRIORITY_DEFAULT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_step_retry(stmt);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_finalize(stmt);
}
{
int g;
int dud=0;
for(g=0;g<m->group_count;g++)
{
if (sqlite_exec_void("INSERT INTO GROUPMEMBERSHIPS(manifestid,groupid) VALUES('%s','%s');",
manifestid, m->groups[g])<0)
dud++;
}
if (dud>0) return WHY("Failed to create one or more group associations");
if (m->group_count>0){
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_prepare_v2_retry(rhizome_db, "INSERT OR REPLACE INTO GROUPMEMBERSHIPS(manifestid,groupid) VALUES(?, ?);", -1, &stmt, NULL);
int i;
for (i=0;i<m->group_count;i++){
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 1, manifestid, -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_bind_text(stmt, 2, m->groups[i], -1, SQLITE_TRANSIENT);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_step_retry(stmt);
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_reset(stmt);
}
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_finalize(stmt);
}
/* Store the file */
#warning need to implement passing of encryption key for file here
if (m->fileLength>0)
if (rhizome_store_file(m,NULL))
return WHY("Could not store associated file");
/* Get things consistent */
sqlite3_exec(rhizome_db,"COMMIT;",NULL,NULL,NULL);
return 0;
if (SQLITE_CODE_OK(sql_ret)) sql_ret = sqlite3_exec_retry(rhizome_db, "COMMIT;", NULL, NULL, &err);
if (err!=NULL){
WHY(err);
sqlite3_free(err);
}
if (SQLITE_CODE_OK(sql_ret))
return 0;
WHYF("Failed to store bundle %s", sqlite3_errmsg(rhizome_db));
sqlite3_exec_retry(rhizome_db, "ROLLBACK;", NULL, NULL, NULL);
return -1;
}
int rhizome_finish_sqlstatement(sqlite3_stmt *statement)
@ -767,11 +742,10 @@ int rhizome_store_file(rhizome_manifest *m,const unsigned char *key)
return WHYF("File has shrunk, so cannot be stored.");
} else if (stat.st_size>m->fileLength) {
WARNF("File has grown by %lld bytes. I will just store the original number of bytes so that the hash (hopefully) matches",stat.st_size-m->fileLength);
return -1;
}
unsigned char *addr =
mmap(NULL, stat.st_size, PROT_READ, MAP_FILE|MAP_SHARED, fd, 0);
mmap(NULL, m->fileLength, PROT_READ, MAP_FILE|MAP_SHARED, fd, 0);
if (addr==MAP_FAILED) {
close(fd);
return WHY("mmap() of associated file failed.");
@ -913,7 +887,8 @@ int rhizome_store_file(rhizome_manifest *m,const unsigned char *key)
str_toupper_inplace(hash_out);
}
sqlite3_blob_close(blob);
if (sqlite3_blob_close(blob)!=SQLITE_OK) dud++;
close(fd);
if (strcasecmp(hash_out,hash))
@ -923,7 +898,8 @@ int rhizome_store_file(rhizome_manifest *m,const unsigned char *key)
}
/* Mark file as up-to-date */
sqlite_exec_void("UPDATE FILES SET datavalid=1 WHERE id='%s';", hash);
if (sqlite_exec_void("UPDATE FILES SET datavalid=1 WHERE id='%s';", hash))
return WHY("Failed to set datavalid");
if (dud) {
WHY(sqlite3_errmsg(rhizome_db));
@ -1067,7 +1043,7 @@ int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found,
}
if (blob_filesize != -1 && blob_filesize != m->fileLength) {
WARNF("MANIFESTS row id=%s joined to FILES row id=%s has inconsistent blob: known file size %lld, blob.filesize=%lld -- skipped",
q_manifestid, m->fileLength, blob_filesize);
q_manifestid, m->fileHexHash, m->fileLength, blob_filesize);
++inconsistent;
}
if (checkVersionP && q_version != m->version) {