Always store payloads with a temporary id

This commit is contained in:
Jeremy Lakeman 2013-08-27 16:15:51 +09:30
parent 8fea1523b1
commit 8accabfcac
4 changed files with 70 additions and 68 deletions

View File

@ -223,7 +223,7 @@ static int ply_read_open(struct ply_read *ply, const char *id, rhizome_manifest
return -1; return -1;
int ret = rhizome_open_decrypt_read(m, NULL, &ply->read); int ret = rhizome_open_decrypt_read(m, NULL, &ply->read);
if (ret>0) if (ret>0)
WARNF("Payload was not found for manifest %s", alloca_tohex_bid(m->cryptoSignPublic)); WARNF("Payload was not found for manifest %s, %"PRId64, alloca_tohex_bid(m->cryptoSignPublic), m->version);
if (ret) if (ret)
return ret; return ret;
ply->read.offset = ply->read.length = m->fileLength; ply->read.offset = ply->read.length = m->fileLength;

View File

@ -399,6 +399,7 @@ struct rhizome_write_buffer{
struct rhizome_write{ struct rhizome_write{
char id[SHA512_DIGEST_STRING_LENGTH+1]; char id[SHA512_DIGEST_STRING_LENGTH+1];
uint64_t temp_id;
char id_known; char id_known;
int64_t tail; int64_t tail;

View File

@ -1391,12 +1391,6 @@ int rhizome_received_content(unsigned char *bidprefix,
} }
if (m){ if (m){
// the rhizome store API doesn't support concurrent writing, so we have an active slot for this payload
// we need to close it first without freeing the manifest just yet.
if (slot && (slot->write_state.blob_fd>=0 ||
slot->write_state.blob_rowid>=0))
rhizome_fail_write(&slot->write_state);
if (rhizome_import_buffer(m, bytes, count)>=0 && !rhizome_import_received_bundle(m)){ if (rhizome_import_buffer(m, bytes, count)>=0 && !rhizome_import_received_bundle(m)){
INFOF("Completed MDP transfer in one hit for file %s", m->fileHexHash); INFOF("Completed MDP transfer in one hit for file %s", m->fileHexHash);
if (c) if (c)

View File

@ -25,9 +25,13 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
strlcpy(write->id, expectedFileHash, SHA512_DIGEST_STRING_LENGTH); strlcpy(write->id, expectedFileHash, SHA512_DIGEST_STRING_LENGTH);
write->id_known=1; write->id_known=1;
}else{ }else{
snprintf(write->id, sizeof(write->id), "%lld", gettime_ms());
write->id_known=0; write->id_known=0;
} }
static uint64_t last_id=0;
write->temp_id = gettime_ms();
if (write->temp_id<last_id)
write->temp_id=last_id+1;
last_id=write->temp_id;
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
@ -45,22 +49,23 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
sqlite3_stmt *statement = NULL; sqlite3_stmt *statement = NULL;
int ret=sqlite_exec_void_retry(&retry, int ret=sqlite_exec_void_retry(&retry,
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES('%s',%lld,%d,0,%lld);", "INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) "
write->id, (long long)file_length, priority, (long long)gettime_ms()); "VALUES('%"PRId64"',%"PRId64",%d,0,%"PRId64");",
write->temp_id, file_length, priority, gettime_ms());
if (ret==-1) if (ret==-1)
goto insert_row_fail; goto insert_row_fail;
char blob_path[1024]; char blob_path[1024];
if (config.rhizome.external_blobs || file_length > 128*1024) { if (config.rhizome.external_blobs || file_length > 128*1024) {
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, write->id)){ if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRId64, write->temp_id)){
WHY("Invalid path"); WHY("Invalid path");
goto insert_row_fail; goto insert_row_fail;
} }
if (config.debug.externalblobs) if (config.debug.externalblobs)
DEBUGF("Attempting to put blob for %s in %s", DEBUGF("Attempting to put blob for %"PRId64" in %s",
write->id,blob_path); write->temp_id,blob_path);
write->blob_fd=open(blob_path, O_CREAT | O_TRUNC | O_WRONLY, 0664); write->blob_fd=open(blob_path, O_CREAT | O_TRUNC | O_WRONLY, 0664);
if (write->blob_fd<0) if (write->blob_fd<0)
@ -70,7 +75,7 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
DEBUGF("Writing to new blob file %s (fd=%d)", blob_path, write->blob_fd); DEBUGF("Writing to new blob file %s (fd=%d)", blob_path, write->blob_fd);
}else{ }else{
statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",write->id); statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%"PRId64"',?)",write->temp_id);
if (!statement) { if (!statement) {
WHYF("Failed to insert into fileblobs: %s", sqlite3_errmsg(rhizome_db)); WHYF("Failed to insert into fileblobs: %s", sqlite3_errmsg(rhizome_db));
goto insert_row_fail; goto insert_row_fail;
@ -92,7 +97,7 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
if (!sqlite_code_ok(stepcode)){ if (!sqlite_code_ok(stepcode)){
insert_row_fail: insert_row_fail:
WHYF("Failed to insert row for fileid=%s", write->id); WHYF("Failed to insert row for fileid=%"PRId64, write->temp_id);
if (statement) sqlite3_finalize(statement); if (statement) sqlite3_finalize(statement);
sqlite_exec_void_retry(&retry, "ROLLBACK;"); sqlite_exec_void_retry(&retry, "ROLLBACK;");
return -1; return -1;
@ -104,7 +109,7 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
/* Get rowid for inserted row, so that we can modify the blob */ /* Get rowid for inserted row, so that we can modify the blob */
write->blob_rowid = sqlite3_last_insert_rowid(rhizome_db); write->blob_rowid = sqlite3_last_insert_rowid(rhizome_db);
if (config.debug.rhizome_rx) if (config.debug.rhizome_rx)
DEBUGF("Got rowid %"PRId64" for %s", write->blob_rowid, write->id); DEBUGF("Got rowid %"PRId64" for %"PRId64, write->blob_rowid, write->temp_id);
} }
@ -461,65 +466,62 @@ int rhizome_finish_write(struct rhizome_write *write){
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
str_toupper_inplace(hash_out);
if (write->id_known){ if (write->id_known){
if (strcasecmp(write->id, hash_out)){ if (strcasecmp(write->id, hash_out)){
WHYF("Expected hash=%s, got %s", write->id, hash_out); WHYF("Expected hash=%s, got %s", write->id, hash_out);
goto failure; goto failure;
} }
if (sqlite_exec_void_retry(&retry, "UPDATE FILES SET inserttime=%lld, datavalid=1 WHERE id='%s'",
gettime_ms(), write->id) == -1)
goto failure;
}else{ }else{
str_toupper_inplace(hash_out);
if (rhizome_exists(hash_out)){
// ooops, we've already got that file, delete the new copy.
rhizome_fail_write(write);
}else{
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") == -1)
goto dbfailure;
// delete any half finished records
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry,"DELETE FROM FILEBLOBS WHERE id='%s';",hash_out);
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry,"DELETE FROM FILES WHERE id='%s';",hash_out);
if (sqlite_exec_void_retry(&retry,
"UPDATE FILES SET id='%s', inserttime=%lld, datavalid=1 WHERE id='%s'",
hash_out, gettime_ms(), write->id) == -1)
goto dbfailure;
if (fd>=0){
char blob_path[1024];
char dest_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, write->id)){
WHYF("Failed to generate file path");
goto dbfailure;
}
if (!FORM_RHIZOME_DATASTORE_PATH(dest_path, hash_out)){
WHYF("Failed to generate file path");
goto dbfailure;
}
if (link(blob_path, dest_path)){
WHY_perror("link");
goto dbfailure;
}
if (unlink(blob_path))
WHY_perror("unlink");
}else{
if (sqlite_exec_void_retry(&retry,
"UPDATE FILEBLOBS SET id='%s' WHERE rowid=%lld",
hash_out, write->blob_rowid) == -1){
goto dbfailure;
}
}
if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1)
goto dbfailure;
}
strlcpy(write->id, hash_out, SHA512_DIGEST_STRING_LENGTH); strlcpy(write->id, hash_out, SHA512_DIGEST_STRING_LENGTH);
} }
if (rhizome_exists(hash_out)){
// ooops, we've already got that file, delete the new copy.
rhizome_fail_write(write);
}else{
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") == -1)
goto dbfailure;
// delete any half finished records
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry,"DELETE FROM FILEBLOBS WHERE id='%s';",hash_out);
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry,"DELETE FROM FILES WHERE id='%s';",hash_out);
if (sqlite_exec_void_retry(&retry,
"UPDATE FILES SET id='%s', inserttime=%lld, datavalid=1 WHERE id='%"PRId64"'",
hash_out, gettime_ms(), write->temp_id) == -1)
goto dbfailure;
if (fd>=0){
char blob_path[1024];
char dest_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRId64, write->temp_id)){
WHYF("Failed to generate file path");
goto dbfailure;
}
if (!FORM_RHIZOME_DATASTORE_PATH(dest_path, hash_out)){
WHYF("Failed to generate file path");
goto dbfailure;
}
if (link(blob_path, dest_path)){
WHY_perror("link");
goto dbfailure;
}
if (unlink(blob_path))
WHY_perror("unlink");
}else{
if (sqlite_exec_void_retry(&retry,
"UPDATE FILEBLOBS SET id='%s' WHERE rowid=%lld",
hash_out, write->blob_rowid) == -1){
goto dbfailure;
}
}
if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1)
goto dbfailure;
}
write->blob_rowid=-1; write->blob_rowid=-1;
if (config.debug.rhizome) if (config.debug.rhizome)
@ -684,7 +686,12 @@ int rhizome_open_read(struct rhizome_read *read, const char *fileid)
str_toupper_inplace(read->id); str_toupper_inplace(read->id);
read->blob_rowid = -1; read->blob_rowid = -1;
read->blob_fd = -1; read->blob_fd = -1;
if (sqlite_exec_int64(&read->blob_rowid, "SELECT FILEBLOBS.rowid FROM FILEBLOBS, FILES WHERE FILEBLOBS.id = FILES.id AND FILES.id = '%s' AND FILES.datavalid != 0", read->id) == -1) if (sqlite_exec_int64(&read->blob_rowid,
"SELECT FILEBLOBS.rowid "
"FROM FILEBLOBS, FILES "
"WHERE FILEBLOBS.id = FILES.id "
"AND FILES.id = '%s' "
"AND FILES.datavalid != 0", read->id) == -1)
return -1; return -1;
if (read->blob_rowid != -1) { if (read->blob_rowid != -1) {
read->length = -1; // discover the length on opening the db BLOB read->length = -1; // discover the length on opening the db BLOB