mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-02-20 17:33:08 +00:00
Rework rhizome store to write content more lazily and cleanup more carefully
This commit is contained in:
parent
77be2c7ef3
commit
2a572d19bf
@ -639,7 +639,7 @@ struct rhizome_write
|
||||
rhizome_filehash_t id;
|
||||
uint64_t temp_id;
|
||||
char id_known;
|
||||
|
||||
int priority;
|
||||
uint64_t tail;
|
||||
uint64_t file_offset;
|
||||
uint64_t written_offset;
|
||||
|
250
rhizome_store.c
250
rhizome_store.c
@ -17,6 +17,7 @@ along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
|
||||
#include <assert.h>
|
||||
#include "serval.h"
|
||||
#include "rhizome.h"
|
||||
@ -57,7 +58,7 @@ int rhizome_exists(const rhizome_filehash_t *hashp)
|
||||
/* Creates a row in the FILEBLOBS table and return the ROWID. Returns 0 if unsuccessful (error
|
||||
* logged).
|
||||
*/
|
||||
static int rhizome_create_fileblob(sqlite_retry_state *retry, uint64_t id, size_t size)
|
||||
static uint64_t rhizome_create_fileblob(sqlite_retry_state *retry, uint64_t id, size_t size)
|
||||
{
|
||||
if (sqlite_exec_void_retry(
|
||||
retry,
|
||||
@ -69,7 +70,10 @@ static int rhizome_create_fileblob(sqlite_retry_state *retry, uint64_t id, size_
|
||||
WHYF("Failed to create blob, size=%zu, id=%"PRIu64, size, id);
|
||||
return 0;
|
||||
}
|
||||
return sqlite3_last_insert_rowid(rhizome_db);
|
||||
uint64_t rowid = sqlite3_last_insert_rowid(rhizome_db);
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Inserted fileblob rowid=%"PRId64" for id='%"PRIu64"'", rowid, id);
|
||||
return rowid;
|
||||
}
|
||||
|
||||
enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length, int priority)
|
||||
@ -78,6 +82,8 @@ enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, cons
|
||||
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
||||
|
||||
write->blob_fd=-1;
|
||||
write->sql_blob=NULL;
|
||||
write->priority = priority;
|
||||
|
||||
if (expectedHashp){
|
||||
if (rhizome_exists(expectedHashp))
|
||||
@ -94,64 +100,6 @@ enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, cons
|
||||
write->temp_id = last_id + 1;
|
||||
last_id = write->temp_id;
|
||||
|
||||
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
||||
|
||||
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1) {
|
||||
WHY("Failed to begin transaction");
|
||||
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
we have to write incrementally so that we can handle blobs larger than available memory.
|
||||
This is possible using:
|
||||
int sqlite3_bind_zeroblob(sqlite3_stmt*, int, int n);
|
||||
That binds an all zeroes blob to a field. We can then populate the data by
|
||||
opening a handle to the blob using:
|
||||
int sqlite3_blob_write(sqlite3_blob *, const void *z, int n, int iOffset);
|
||||
*/
|
||||
if (sqlite_exec_void_retry(
|
||||
&retry,
|
||||
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES(?,?,?,0,?);",
|
||||
UINT64_TOSTR, write->temp_id,
|
||||
INT64, file_length == RHIZOME_SIZE_UNSET ? (int64_t)-1 : (int64_t)file_length,
|
||||
INT, priority,
|
||||
INT64, now,
|
||||
END
|
||||
) == -1
|
||||
) {
|
||||
WHYF("Failed to create FILES db row, id='%"PRIu64"'", write->temp_id);
|
||||
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
||||
}
|
||||
char blob_path[1024];
|
||||
if (file_length == RHIZOME_SIZE_UNSET || file_length > config.rhizome.max_blob_size) {
|
||||
if (!FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write->temp_id))
|
||||
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Attempting to put blob for id='%"PRIu64"' in %s", write->temp_id, blob_path);
|
||||
if ((write->blob_fd = open(blob_path, O_CREAT | O_TRUNC | O_RDWR, 0664)) == -1) {
|
||||
WHYF("Failed to create payload file, id='%"PRIu64"'", write->temp_id);
|
||||
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
||||
}
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Writing to new blob file %s (fd=%d)", blob_path, write->blob_fd);
|
||||
} else {
|
||||
if ((write->blob_rowid = rhizome_create_fileblob(&retry, write->temp_id, (size_t)file_length)) == 0) {
|
||||
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
|
||||
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
||||
}
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Got rowid=%"PRId64" for id='%"PRIu64"'", write->blob_rowid, write->temp_id);
|
||||
}
|
||||
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1){
|
||||
if (write->blob_fd != -1) {
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Cancel write to fd %d", write->blob_fd);
|
||||
close(write->blob_fd);
|
||||
write->blob_fd=-1;
|
||||
unlink(blob_path);
|
||||
}
|
||||
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
||||
}
|
||||
write->file_length = file_length;
|
||||
write->file_offset = 0;
|
||||
write->written_offset = 0;
|
||||
@ -195,15 +143,41 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer
|
||||
// open database locks
|
||||
static int write_get_lock(struct rhizome_write *write_state)
|
||||
{
|
||||
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
||||
|
||||
if (write_state->blob_fd != -1 || write_state->sql_blob)
|
||||
return 0;
|
||||
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
||||
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
|
||||
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
|
||||
return -1;
|
||||
if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob) == -1)
|
||||
return -1;
|
||||
|
||||
if (write_state->file_length == RHIZOME_SIZE_UNSET ||
|
||||
write_state->file_length > config.rhizome.max_blob_size){
|
||||
char blob_path[1024];
|
||||
if (!FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write_state->temp_id))
|
||||
return -1;
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Attempting to put blob for id='%"PRIu64"' in %s", write_state->temp_id, blob_path);
|
||||
if ((write_state->blob_fd = open(blob_path, O_CREAT | O_TRUNC | O_RDWR, 0664)) == -1) {
|
||||
WHYF("Failed to create payload file, id='%"PRIu64"'", write_state->temp_id);
|
||||
return -1;
|
||||
}
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Writing to new blob file %s (fd=%d)", blob_path, write_state->blob_fd);
|
||||
}else{
|
||||
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
|
||||
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
|
||||
return -1;
|
||||
if (write_state->blob_rowid == 0){
|
||||
write_state->blob_rowid = rhizome_create_fileblob(&retry, write_state->temp_id, write_state->file_length);
|
||||
if (write_state->blob_rowid == 0)
|
||||
goto fail;
|
||||
}
|
||||
if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob) == -1)
|
||||
goto fail;
|
||||
}
|
||||
return 0;
|
||||
|
||||
fail:
|
||||
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// write data to disk
|
||||
@ -250,16 +224,13 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, c
|
||||
static int write_release_lock(struct rhizome_write *write_state)
|
||||
{
|
||||
int ret=0;
|
||||
if (write_state->blob_fd != -1)
|
||||
return 0;
|
||||
|
||||
if (write_state->sql_blob){
|
||||
ret = sqlite_blob_close(write_state->sql_blob);
|
||||
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
||||
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1)
|
||||
ret=-1;
|
||||
write_state->sql_blob=NULL;
|
||||
}
|
||||
write_state->sql_blob=NULL;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -277,11 +248,20 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uns
|
||||
struct rhizome_write_buffer **ptr = &write_state->buffer_list;
|
||||
int ret=0;
|
||||
int should_write = 0;
|
||||
|
||||
// if we are writing to a file, or already have the sql blob open, or are finishing, write as much
|
||||
// as we can.
|
||||
if (write_state->blob_fd != -1 || write_state->sql_blob || buffer == NULL)
|
||||
if (write_state->blob_fd != -1 ||
|
||||
write_state->sql_blob ||
|
||||
buffer == NULL ||
|
||||
write_state->file_length > config.rhizome.max_blob_size ||
|
||||
write_state->file_offset > config.rhizome.max_blob_size) {
|
||||
should_write = 1;
|
||||
else {
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Attempting to write (fd=%d, blob=%p, buffer=%p, len=%"PRId64", offset=%"PRId64")",
|
||||
write_state->blob_fd, write_state->sql_blob, buffer,
|
||||
write_state->file_length, write_state->file_offset);
|
||||
} else {
|
||||
// cache up to RHIZOME_BUFFER_MAXIMUM_SIZE or file length before attempting to write everything in one go.
|
||||
// (Not perfect if the range overlaps)
|
||||
uint64_t new_size = write_state->written_offset + write_state->buffer_size + data_size;
|
||||
@ -412,28 +392,26 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename)
|
||||
if (fd == -1)
|
||||
return WHYF_perror("open(%s,O_RDONLY)", alloca_str_toprint(filename));
|
||||
unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE];
|
||||
int ret = write_get_lock(write);
|
||||
if (ret == 0) {
|
||||
while (write->file_length == RHIZOME_SIZE_UNSET || write->file_offset < write->file_length) {
|
||||
size_t size = sizeof buffer;
|
||||
if (write->file_length != RHIZOME_SIZE_UNSET && write->file_offset + size > write->file_length)
|
||||
size = write->file_length - write->file_offset;
|
||||
ssize_t r = read(fd, buffer, size);
|
||||
if (r == -1) {
|
||||
ret = WHYF_perror("read(%d,%p,%zu)", fd, buffer, size);
|
||||
break;
|
||||
}
|
||||
if (write->file_length != RHIZOME_SIZE_UNSET && (size_t) r != size) {
|
||||
ret = WHYF("file truncated - read(%d,%p,%zu) returned %zu", fd, buffer, size, (size_t) r);
|
||||
break;
|
||||
}
|
||||
if (r && rhizome_write_buffer(write, buffer, (size_t) r)) {
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
if ((size_t) r != size)
|
||||
break;
|
||||
int ret=0;
|
||||
while (write->file_length == RHIZOME_SIZE_UNSET || write->file_offset < write->file_length) {
|
||||
size_t size = sizeof buffer;
|
||||
if (write->file_length != RHIZOME_SIZE_UNSET && write->file_offset + size > write->file_length)
|
||||
size = write->file_length - write->file_offset;
|
||||
ssize_t r = read(fd, buffer, size);
|
||||
if (r == -1) {
|
||||
ret = WHYF_perror("read(%d,%p,%zu)", fd, buffer, size);
|
||||
break;
|
||||
}
|
||||
if (write->file_length != RHIZOME_SIZE_UNSET && (size_t) r != size) {
|
||||
ret = WHYF("file truncated - read(%d,%p,%zu) returned %zu", fd, buffer, size, (size_t) r);
|
||||
break;
|
||||
}
|
||||
if (r && rhizome_write_buffer(write, buffer, (size_t) r)) {
|
||||
ret = -1;
|
||||
break;
|
||||
}
|
||||
if ((size_t) r != size)
|
||||
break;
|
||||
}
|
||||
if (write_release_lock(write))
|
||||
ret = -1;
|
||||
@ -448,20 +426,35 @@ void rhizome_fail_write(struct rhizome_write *write)
|
||||
DEBUGF("Closing and removing fd %d", write->blob_fd);
|
||||
close(write->blob_fd);
|
||||
write->blob_fd=-1;
|
||||
char blob_path[1024];
|
||||
if (FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write->temp_id)){
|
||||
unlink(blob_path);
|
||||
}
|
||||
}
|
||||
write_release_lock(write);
|
||||
if (write->blob_rowid){
|
||||
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM FILEBLOBS WHERE rowid = ?;",
|
||||
INT64, write->blob_rowid, END);
|
||||
}
|
||||
while(write->buffer_list){
|
||||
struct rhizome_write_buffer *n=write->buffer_list;
|
||||
write->buffer_list=n->_next;
|
||||
free(n);
|
||||
}
|
||||
rhizome_delete_file(&write->id);
|
||||
}
|
||||
|
||||
enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
|
||||
{
|
||||
assert(write->blob_rowid != 0 || write->blob_fd != -1);
|
||||
enum rhizome_payload_status status = RHIZOME_PAYLOAD_STATUS_NEW;
|
||||
|
||||
// Once the whole file has been processed, we should finally know its.
|
||||
if (write->file_length == RHIZOME_SIZE_UNSET) {
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Wrote %"PRIu64" bytes, set file_length", write->file_offset);
|
||||
write->file_length = write->file_offset;
|
||||
}
|
||||
|
||||
// flush out any remaining buffered pieces to disk
|
||||
if (write->buffer_list){
|
||||
if (rhizome_random_write(write, 0, NULL, 0)) {
|
||||
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
||||
@ -473,24 +466,34 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
|
||||
goto failure;
|
||||
}
|
||||
}
|
||||
|
||||
// Once the whole file has been written, we finally know its size and hash.
|
||||
if (write->file_length == RHIZOME_SIZE_UNSET) {
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Wrote %"PRIu64" bytes, set file_length", write->file_offset);
|
||||
write->file_length = write->file_offset;
|
||||
} else {
|
||||
assert(write->file_offset <= write->file_length);
|
||||
if (write->file_offset < write->file_length) {
|
||||
WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length);
|
||||
status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
assert(write->file_offset <= write->file_length);
|
||||
if (write->file_offset < write->file_length) {
|
||||
WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length);
|
||||
status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
if (write->file_length==0){
|
||||
// whoops, no payload, don't store anything
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Ignoring empty write");
|
||||
goto failure;
|
||||
}
|
||||
|
||||
rhizome_filehash_t hash_out;
|
||||
SHA512_Final(hash_out.binary, &write->sha512_context);
|
||||
SHA512_End(&write->sha512_context, NULL);
|
||||
|
||||
if (write->id_known) {
|
||||
if (cmp_rhizome_filehash_t(&write->id, &hash_out) != 0) {
|
||||
WARNF("expected filehash=%s, got %s", alloca_tohex_rhizome_filehash_t(write->id), alloca_tohex_rhizome_filehash_t(hash_out));
|
||||
status = RHIZOME_PAYLOAD_STATUS_WRONG_HASH;
|
||||
goto failure;
|
||||
}
|
||||
} else
|
||||
write->id = hash_out;
|
||||
|
||||
char blob_path[1024];
|
||||
if (!FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write->temp_id)) {
|
||||
WHYF("Failed to generate external blob path");
|
||||
@ -499,7 +502,7 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
|
||||
}
|
||||
// If the payload was written into an external blob (file) but is small enough to fit into a
|
||||
// SQLite blob, then copy it into a proper blob (this occurs if rhizome_open_write() was called
|
||||
// with file_length == RHIZOME_SIZE_UNSET).
|
||||
// with file_length == RHIZOME_SIZE_UNSET) and max_blob_size > RHIZOME_BUFFER_MAXIMUM_SIZE.
|
||||
int external = 0;
|
||||
if (write->blob_fd != -1) {
|
||||
external = 1;
|
||||
@ -529,38 +532,31 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
|
||||
goto failure;
|
||||
}
|
||||
|
||||
if (write->id_known) {
|
||||
if (cmp_rhizome_filehash_t(&write->id, &hash_out) != 0) {
|
||||
WARNF("expected filehash=%s, got %s", alloca_tohex_rhizome_filehash_t(write->id), alloca_tohex_rhizome_filehash_t(hash_out));
|
||||
status = RHIZOME_PAYLOAD_STATUS_WRONG_HASH;
|
||||
goto failure;
|
||||
}
|
||||
} else
|
||||
write->id = hash_out;
|
||||
|
||||
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
||||
rhizome_remove_file_datainvalid(&retry, &write->id);
|
||||
if (rhizome_exists(&write->id)) {
|
||||
// we've already got that payload, delete the new copy
|
||||
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE id = ?;", UINT64_TOSTR, write->temp_id, END);
|
||||
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILES WHERE id = ?;", UINT64_TOSTR, write->temp_id, END);
|
||||
if (write->blob_rowid){
|
||||
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE rowid = ?;",
|
||||
INT64, write->blob_rowid, END);
|
||||
}
|
||||
if (external){
|
||||
if (unlink(blob_path) == -1)
|
||||
WARNF_perror("unlink(%s)", alloca_str_toprint(blob_path));
|
||||
}
|
||||
if (config.debug.rhizome_store)
|
||||
DEBUGF("Payload id=%s already present, removed id='%"PRIu64"'", alloca_tohex_rhizome_filehash_t(write->id), write->temp_id);
|
||||
} else {
|
||||
}else{
|
||||
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
|
||||
goto dbfailure;
|
||||
|
||||
// delete any half finished records
|
||||
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE id = ?;", RHIZOME_FILEHASH_T, &write->id, END);
|
||||
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILES WHERE id = ?;", RHIZOME_FILEHASH_T, &write->id, END);
|
||||
|
||||
if (sqlite_exec_void_retry(
|
||||
&retry,
|
||||
"UPDATE FILES SET id = ?, length = ?, inserttime = ?, datavalid = 1 WHERE id = ?",
|
||||
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES(?,?,?,1,?);",
|
||||
RHIZOME_FILEHASH_T, &write->id,
|
||||
INT64, write->file_length,
|
||||
INT, write->priority,
|
||||
INT64, gettime_ms(),
|
||||
UINT64_TOSTR, write->temp_id,
|
||||
END
|
||||
) == -1
|
||||
)
|
||||
|
@ -355,6 +355,22 @@ test_ExtractManifestFileFromExtBlob() {
|
||||
assert diff file2 file2x
|
||||
}
|
||||
|
||||
doc_LargePayload="Export huge bundle after one add"
|
||||
setup_LargePayload() {
|
||||
setup_servald
|
||||
setup_rhizome
|
||||
set_instance +A
|
||||
executeOk_servald config set debug.rhizome_store on
|
||||
}
|
||||
test_LargePayload() {
|
||||
rhizome_add_file file1 100000
|
||||
executeOk_servald rhizome export bundle $BID file1x.manifest file1x
|
||||
assert [ -e file1x.manifest ]
|
||||
assert diff file1.manifest file1x.manifest
|
||||
assert [ -e file1x ]
|
||||
assert diff file1 file1x
|
||||
}
|
||||
|
||||
doc_CorruptExternalBlob="A corrupted payload should fail to export"
|
||||
setup_CorruptExternalBlob() {
|
||||
setup_servald
|
||||
|
Loading…
x
Reference in New Issue
Block a user