From 94cf79f0f3aaeede7d1e5d0e4bf2ed818f501689 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 5 Jul 2016 15:26:03 +0930 Subject: [PATCH] Improve resilience to database locking issues --- rhizome_store.c | 57 +++++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/rhizome_store.c b/rhizome_store.c index 6f2b6000..bd2f686f 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -524,17 +524,17 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uin // if existing data should be written, do so now if (should_write && *ptr && (*ptr)->offset == write_state->written_offset){ struct rhizome_write_buffer *n=*ptr; - if (write_get_lock(write_state)){ - ret=-1; - break; - } - if (write_data(write_state, n->offset, n->data, n->data_size)){ - ret=-1; - break; + *ptr=n->_next; + + if ( write_get_lock(write_state) + || write_data(write_state, n->offset, n->data, n->data_size)){ + // ignore transient write errors, eg db locks + should_write = 0; + continue; } + write_state->buffer_size-=n->data_size; last_offset = n->offset + n->data_size; - *ptr=n->_next; free(n); continue; } @@ -570,15 +570,15 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uin } if (should_write && offset == write_state->written_offset){ - if (write_get_lock(write_state)){ - ret=-1; - break; + + if ( write_get_lock(write_state) + || write_data(write_state, offset, buffer, size)){ + should_write = 0; + continue; } - if (write_data(write_state, offset, buffer, size)){ - ret=-1; - break; - } - // we need to go around the loop again to re-test if this buffer can now be written + + // we need to go around the loop again to re-test if *ptr can now be written + }else{ // impose a limit on the total amount of cached data if (write_state->buffer_size + size > RHIZOME_BUFFER_MAXIMUM_SIZE) @@ -729,6 +729,7 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write) } if (write->buffer_list) { WHYF("Buffer was not cleared"); + // TODO return code to indicate db locking issues? status = RHIZOME_PAYLOAD_STATUS_ERROR; goto failure; } @@ -739,7 +740,7 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write) status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE; goto failure; } - assert(write->file_offset == write->file_length); + assert(write->file_offset == write->file_length && write->written_offset == write->file_length); if (write->file_length == 0) { // whoops, no payload, don't store anything @@ -1545,13 +1546,21 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write, uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size) { sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; + + // use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried. + if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1) + return 0; + + sqlite3_blob *blob = NULL; + uint64_t rowid = rhizome_create_fileblob(&retry, id, size); if (rowid == 0) - return 0; - sqlite3_blob *blob = NULL; + goto fail; + if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", rowid, 1 /* read/write */, &blob) == -1) goto fail; - char buf[16384]; + + uint8_t buf[16384]; size_t offset = 0; while (offset < size) { size_t toread = size - offset; @@ -1573,11 +1582,17 @@ uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size) } assert(offset == size); sqlite_blob_close(blob); + blob = NULL; + + if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1) + goto fail; + return rowid; + fail: if (blob) sqlite_blob_close(blob); - sqlite_exec_void_retry(&retry, "DELETE FROM FILEBLOBS WHERE id = ?;", UINT64_TOSTR, id, END); + sqlite_exec_void_retry(&retry, "ROLLBACK;", END); return 0; }