Improve resilience to database locking issues

This commit is contained in:
Jeremy Lakeman 2016-07-05 15:26:03 +09:30
parent 619c17a191
commit 94cf79f0f3

View File

@ -524,17 +524,17 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uin
// if existing data should be written, do so now
if (should_write && *ptr && (*ptr)->offset == write_state->written_offset){
struct rhizome_write_buffer *n=*ptr;
if (write_get_lock(write_state)){
ret=-1;
break;
}
if (write_data(write_state, n->offset, n->data, n->data_size)){
ret=-1;
break;
*ptr=n->_next;
if ( write_get_lock(write_state)
|| write_data(write_state, n->offset, n->data, n->data_size)){
// ignore transient write errors, eg db locks
should_write = 0;
continue;
}
write_state->buffer_size-=n->data_size;
last_offset = n->offset + n->data_size;
*ptr=n->_next;
free(n);
continue;
}
@ -570,15 +570,15 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uin
}
if (should_write && offset == write_state->written_offset){
if (write_get_lock(write_state)){
ret=-1;
break;
if ( write_get_lock(write_state)
|| write_data(write_state, offset, buffer, size)){
should_write = 0;
continue;
}
if (write_data(write_state, offset, buffer, size)){
ret=-1;
break;
}
// we need to go around the loop again to re-test if this buffer can now be written
// we need to go around the loop again to re-test if *ptr can now be written
}else{
// impose a limit on the total amount of cached data
if (write_state->buffer_size + size > RHIZOME_BUFFER_MAXIMUM_SIZE)
@ -729,6 +729,7 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
}
if (write->buffer_list) {
WHYF("Buffer was not cleared");
// TODO return code to indicate db locking issues?
status = RHIZOME_PAYLOAD_STATUS_ERROR;
goto failure;
}
@ -739,7 +740,7 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
goto failure;
}
assert(write->file_offset == write->file_length);
assert(write->file_offset == write->file_length && write->written_offset == write->file_length);
if (write->file_length == 0) {
// whoops, no payload, don't store anything
@ -1545,13 +1546,21 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write,
uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
return 0;
sqlite3_blob *blob = NULL;
uint64_t rowid = rhizome_create_fileblob(&retry, id, size);
if (rowid == 0)
return 0;
sqlite3_blob *blob = NULL;
goto fail;
if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", rowid, 1 /* read/write */, &blob) == -1)
goto fail;
char buf[16384];
uint8_t buf[16384];
size_t offset = 0;
while (offset < size) {
size_t toread = size - offset;
@ -1573,11 +1582,17 @@ uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size)
}
assert(offset == size);
sqlite_blob_close(blob);
blob = NULL;
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1)
goto fail;
return rowid;
fail:
if (blob)
sqlite_blob_close(blob);
sqlite_exec_void_retry(&retry, "DELETE FROM FILEBLOBS WHERE id = ?;", UINT64_TOSTR, id, END);
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
return 0;
}