diff --git a/rhizome_store.c b/rhizome_store.c index eea5ced8..e734df14 100644 --- a/rhizome_store.c +++ b/rhizome_store.c @@ -25,6 +25,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #define RHIZOME_BUFFER_MAXIMUM_SIZE (1024*1024) +uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size); + int rhizome_exists(const rhizome_filehash_t *hashp) { uint64_t gotfile = 0; @@ -33,10 +35,28 @@ int rhizome_exists(const rhizome_filehash_t *hashp) return gotfile; } +/* Creates a row in the FILEBLOBS table and return the ROWID. Returns 0 if unsuccessful (error + * logged). + */ +static int rhizome_create_fileblob(sqlite_retry_state *retry, uint64_t id, size_t size) +{ + if (sqlite_exec_void_retry( + retry, + "INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES(?,?)", + UINT64_TOSTR, id, + ZEROBLOB, (int)size, + END) == -1 + ) { + WHYF("Failed to create blob, size=%zu, id=%"PRIu64, size, id); + return 0; + } + return sqlite3_last_insert_rowid(rhizome_db); +} + enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length, int priority) { - assert(file_length != RHIZOME_SIZE_UNSET); - assert(file_length != 0); + if (file_length == 0) + return RHIZOME_PAYLOAD_STATUS_EMPTY; write->blob_fd=-1; @@ -62,82 +82,49 @@ enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, cons return RHIZOME_PAYLOAD_STATUS_ERROR; } - /* + /* we have to write incrementally so that we can handle blobs larger than available memory. This is possible using: int sqlite3_bind_zeroblob(sqlite3_stmt*, int, int n); That binds an all zeroes blob to a field. We can then populate the data by opening a handle to the blob using: int sqlite3_blob_write(sqlite3_blob *, const void *z, int n, int iOffset); - */ - - sqlite3_stmt *statement = NULL; - int ret = sqlite_exec_void_retry( + */ + if (sqlite_exec_void_retry( &retry, "INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES(?,?,?,0,?);", UINT64_TOSTR, write->temp_id, - INT64, file_length, + INT64, file_length == RHIZOME_SIZE_UNSET ? (int64_t)-1 : (int64_t)file_length, INT, priority, INT64, now, END - ); - if (ret==-1) - goto insert_row_fail; - + ) == -1 + ) { + WHYF("Failed to create FILES db row, id='%"PRIu64"'", write->temp_id); + return RHIZOME_PAYLOAD_STATUS_ERROR; + } char blob_path[1024]; - - if (file_length > config.rhizome.max_blob_size) { - if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRId64, write->temp_id)){ - WHY("Invalid path"); - goto insert_row_fail; - } - + if (file_length == RHIZOME_SIZE_UNSET || file_length > config.rhizome.max_blob_size) { + if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRIu64, write->temp_id)) + return RHIZOME_PAYLOAD_STATUS_ERROR; if (config.debug.externalblobs) - DEBUGF("Attempting to put blob for id='%"PRId64"' in %s", write->temp_id, blob_path); - - write->blob_fd=open(blob_path, O_CREAT | O_TRUNC | O_WRONLY, 0664); - if (write->blob_fd == -1) - goto insert_row_fail; - + DEBUGF("Attempting to put blob for id='%"PRIu64"' in %s", write->temp_id, blob_path); + if ((write->blob_fd = open(blob_path, O_CREAT | O_TRUNC | O_RDWR, 0664)) == -1) { + WHYF("Failed to create payload file, id='%"PRIu64"'", write->temp_id); + return RHIZOME_PAYLOAD_STATUS_ERROR; + } if (config.debug.externalblobs) DEBUGF("Writing to new blob file %s (fd=%d)", blob_path, write->blob_fd); - - }else{ - statement = sqlite_prepare_bind( - &retry, - "INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES(?,?)", - UINT64_TOSTR, write->temp_id, - ZEROBLOB, (int)file_length, - END); - if (statement == NULL) - goto insert_row_fail; - /* Do actual insert, and abort if it fails */ - int rowcount = 0; - int stepcode; - while ((stepcode = sqlite_step_retry(&retry, statement)) == SQLITE_ROW) - ++rowcount; - if (rowcount) - WARNF("void query unexpectedly returned %d row%s", rowcount, rowcount == 1 ? "" : "s"); - if (!sqlite_code_ok(stepcode)){ - insert_row_fail: - WHYF("Failed to insert row for id='%"PRId64"'", write->temp_id); - if (statement) - sqlite3_finalize(statement); + } else { + if ((write->blob_rowid = rhizome_create_fileblob(&retry, write->temp_id, (size_t)file_length)) == 0) { sqlite_exec_void_retry(&retry, "ROLLBACK;", END); return RHIZOME_PAYLOAD_STATUS_ERROR; } - sqlite3_finalize(statement); - statement=NULL; - - /* Get rowid for inserted row, so that we can modify the blob */ - write->blob_rowid = sqlite3_last_insert_rowid(rhizome_db); if (config.debug.rhizome_rx) - DEBUGF("Got rowid=%"PRId64" for id='%"PRId64"'", write->blob_rowid, write->temp_id); - + DEBUGF("Got rowid=%"PRId64" for id='%"PRIu64"'", write->blob_rowid, write->temp_id); } - if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1){ - if (write->blob_fd != -1){ + if (write->blob_fd != -1) { if (config.debug.externalblobs) DEBUGF("Cancel write to fd %d", write->blob_fd); close(write->blob_fd); @@ -146,13 +133,10 @@ enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, cons } return RHIZOME_PAYLOAD_STATUS_ERROR; } - write->file_length = file_length; write->file_offset = 0; write->written_offset = 0; - SHA512_Init(&write->sha512_context); - return RHIZOME_PAYLOAD_STATUS_NEW; } @@ -167,7 +151,9 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer return WHY("No content supplied"); /* Make sure we aren't being asked to write more data than we expected */ - if (write_state->file_offset + data_size > write_state->file_length) + if ( write_state->file_length != RHIZOME_SIZE_UNSET + && write_state->file_offset + data_size > write_state->file_length + ) return WHYF("Too much content supplied, %"PRIu64" + %zu > %"PRIu64, write_state->file_offset, data_size, write_state->file_length); @@ -177,6 +163,9 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer write_state->file_offset + write_state->tail, write_state->key, write_state->nonce)) return -1; + if (config.debug.rhizome) { + //dump("after encrypt", buffer, data_size); + } } SHA512_Update(&write_state->sha512_context, buffer, data_size); @@ -188,32 +177,21 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer } // open database locks -static int write_get_lock(struct rhizome_write *write_state){ +static int write_get_lock(struct rhizome_write *write_state) +{ if (write_state->blob_fd != -1 || write_state->sql_blob) return 0; sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; - // use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried. if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1) return -1; - - while(1){ - int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", - write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob); - if (ret==SQLITE_OK){ - sqlite_retry_done(&retry, "sqlite3_blob_open"); - return 0; - } - if (!sqlite_code_busy(ret)) - return WHYF("sqlite3_blob_open() failed: %s", - sqlite3_errmsg(rhizome_db)); - if (sqlite_retry(&retry, "sqlite3_blob_open")==0) - return WHYF("Giving up"); - } + if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob) == -1) + return -1; + return 0; } // write data to disk -static int write_data(struct rhizome_write *write_state, uint64_t file_offset, unsigned char *buffer, size_t data_size) +static int write_data(struct rhizome_write *write_state, uint64_t file_offset, const unsigned char *buffer, size_t data_size) { if (config.debug.rhizome) { DEBUGF("write_state->file_length=%"PRIu64" file_offset=%"PRIu64, write_state->file_length, file_offset); @@ -243,18 +221,8 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u if (!write_state->sql_blob) return WHY("Must call write_get_lock() before write_data()"); sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; - while(1){ - int ret=sqlite3_blob_write(write_state->sql_blob, buffer, data_size, file_offset); - if (ret==SQLITE_OK){ - sqlite_retry_done(&retry, "sqlite3_blob_write"); - break; - } - if (!sqlite_code_busy(ret)) - return WHYF("sqlite3_blob_write() failed: %s", - sqlite3_errmsg(rhizome_db)); - if (sqlite_retry(&retry, "sqlite3_blob_write")==0) - return WHY("Giving up"); - } + if (sqlite_blob_write_retry(&retry, write_state->sql_blob, buffer, data_size, file_offset) == -1) + return -1; } write_state->written_offset = file_offset + data_size; @@ -272,11 +240,7 @@ static int write_release_lock(struct rhizome_write *write_state) return 0; if (write_state->sql_blob){ - ret = sqlite3_blob_close(write_state->sql_blob); - if (ret) - WHYF("sqlite3_blob_close() failed: %s", - sqlite3_errmsg(rhizome_db)); - + ret = sqlite_blob_close(write_state->sql_blob); sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1) ret=-1; @@ -293,7 +257,9 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uns DEBUGF("write_state->file_length=%"PRIu64" offset=%"PRIu64, write_state->file_length, offset); dump("buffer", buffer, data_size); } - if (offset + data_size > write_state->file_length) + if ( write_state->file_length != RHIZOME_SIZE_UNSET + && offset + data_size > write_state->file_length + ) data_size = write_state->file_length - offset; struct rhizome_write_buffer **ptr = &write_state->buffer_list; @@ -307,7 +273,9 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uns // cache up to RHIZOME_BUFFER_MAXIMUM_SIZE or file length before attempting to write everything in one go. // (Not perfect if the range overlaps) uint64_t new_size = write_state->written_offset + write_state->buffer_size + data_size; - if (new_size >= write_state->file_length || new_size >= RHIZOME_BUFFER_MAXIMUM_SIZE) + if ( (write_state->file_length != RHIZOME_SIZE_UNSET && new_size >= write_state->file_length) + || new_size >= RHIZOME_BUFFER_MAXIMUM_SIZE + ) should_write = 1; } uint64_t last_offset = write_state->written_offset; @@ -488,16 +456,52 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write) goto failure; } } - - assert(write->file_offset <= write->file_length); - if (write->file_offset < write->file_length) { - WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length); - status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE; + + // Once the whole file has been written, we finally know its size and hash. + if (write->file_length == RHIZOME_SIZE_UNSET) { + if (config.debug.rhizome) + DEBUGF("Wrote %"PRIu64" bytes, set file_length", write->file_offset); + write->file_length = write->file_offset; + } else { + assert(write->file_offset <= write->file_length); + if (write->file_offset < write->file_length) { + WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length); + status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE; + goto failure; + } + } + rhizome_filehash_t hash_out; + SHA512_Final(hash_out.binary, &write->sha512_context); + SHA512_End(&write->sha512_context, NULL); + + char blob_path[1024]; + if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRIu64, write->temp_id)) { + WHYF("Failed to generate external blob path"); + status = RHIZOME_PAYLOAD_STATUS_ERROR; goto failure; } + // If the payload was written into an external blob (file) but is small enough to fit into a + // SQLite blob, then copy it into a proper blob (this occurs if rhizome_open_write() was called + // with file_length == RHIZOME_SIZE_UNSET). int external = 0; - if (write->blob_fd != -1){ + if (write->blob_fd != -1) { external = 1; + if (write->file_length <= config.rhizome.max_blob_size) { + if (config.debug.rhizome) + DEBUGF("Copying %zu bytes from external file %s into blob, id=%"PRIu64, (size_t)write->file_offset, blob_path, write->temp_id); + int ret = 0; + if (lseek(write->blob_fd, 0, SEEK_SET) == (off_t) -1) + ret = WHYF_perror("lseek(%d,0,SEEK_SET)", write->blob_fd); + else if ((write->blob_rowid = rhizome_copy_file_to_blob(write->blob_fd, write->temp_id, (size_t)write->file_length)) == 0) + ret = -1; + if (ret == -1) { + WHY("Failed to copy external file into blob; keeping external file"); + } else { + external = 0; + if (unlink(blob_path) == -1) + WARNF_perror("unlink(%s)", alloca_str_toprint(blob_path)); + } + } if (config.debug.externalblobs) DEBUGF("Closing fd=%d", write->blob_fd); close(write->blob_fd); @@ -507,10 +511,6 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write) status = RHIZOME_PAYLOAD_STATUS_ERROR; goto failure; } - - rhizome_filehash_t hash_out; - SHA512_Final(hash_out.binary, &write->sha512_context); - SHA512_End(&write->sha512_context, NULL); if (write->id_known) { if (cmp_rhizome_filehash_t(&write->id, &hash_out) != 0) { @@ -518,10 +518,9 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write) status = RHIZOME_PAYLOAD_STATUS_WRONG_HASH; goto failure; } - } else { + } else write->id = hash_out; - } - + sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; rhizome_remove_file_datainvalid(&retry, &write->id); if (rhizome_exists(&write->id)) { @@ -529,33 +528,29 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write) sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE id = ?;", UINT64_TOSTR, write->temp_id, END); sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILES WHERE id = ?;", UINT64_TOSTR, write->temp_id, END); if (config.debug.rhizome) - DEBUGF("File id=%s already present, removed id='%"PRId64"'", alloca_tohex_rhizome_filehash_t(write->id), write->temp_id); + DEBUGF("Payload id=%s already present, removed id='%"PRIu64"'", alloca_tohex_rhizome_filehash_t(write->id), write->temp_id); } else { if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1) goto dbfailure; - + // delete any half finished records sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE id = ?;", RHIZOME_FILEHASH_T, &write->id, END); sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILES WHERE id = ?;", RHIZOME_FILEHASH_T, &write->id, END); - + if (sqlite_exec_void_retry( &retry, - "UPDATE FILES SET id = ?, inserttime = ?, datavalid = 1 WHERE id = ?", + "UPDATE FILES SET id = ?, length = ?, inserttime = ?, datavalid = 1 WHERE id = ?", RHIZOME_FILEHASH_T, &write->id, + INT64, write->file_length, INT64, gettime_ms(), UINT64_TOSTR, write->temp_id, END ) == -1 ) goto dbfailure; - + if (external) { - char blob_path[1024]; char dest_path[1024]; - if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRId64, write->temp_id)){ - WHYF("Failed to generate file path"); - goto dbfailure; - } if (!FORM_RHIZOME_DATASTORE_PATH(dest_path, alloca_tohex_rhizome_filehash_t(write->id))){ WHYF("Failed to generate file path"); goto dbfailure; @@ -584,7 +579,7 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write) } write->blob_rowid = 0; return status; - + dbfailure: sqlite_exec_void_retry(&retry, "ROLLBACK;", END); status = RHIZOME_PAYLOAD_STATUS_ERROR; @@ -827,17 +822,10 @@ static ssize_t rhizome_read_retry(sqlite_retry_state *retry, struct rhizome_read RETURN(rd); } if (read_state->blob_rowid == 0) - RETURN(WHY("file not open")); + RETURN(WHY("blob not created")); sqlite3_blob *blob = NULL; - int ret; - do { - assert(blob == NULL); - ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", read_state->blob_rowid, 0 /* read only */, &blob); - } while (sqlite_code_busy(ret) && sqlite_retry(retry, "sqlite3_blob_open")); - if (ret != SQLITE_OK) { - assert(blob == NULL); - RETURN(WHYF("sqlite3_blob_open() failed: %s", sqlite3_errmsg(rhizome_db))); - } + if (sqlite_blob_open_retry(retry, "main", "FILEBLOBS", "data", read_state->blob_rowid, 0 /* read only */, &blob) == -1) + RETURN(WHY("blob open failed")); assert(blob != NULL); if (read_state->length == RHIZOME_SIZE_UNSET) read_state->length = sqlite3_blob_bytes(blob); @@ -849,16 +837,17 @@ static ssize_t rhizome_read_retry(sqlite_retry_state *retry, struct rhizome_read if (bytes_read > bufsz) bytes_read = bufsz; assert(bytes_read > 0); + int ret; do { ret = sqlite3_blob_read(blob, buffer, (int) bytes_read, read_state->offset); } while (sqlite_code_busy(ret) && sqlite_retry(retry, "sqlite3_blob_read")); if (ret != SQLITE_OK) { WHYF("sqlite3_blob_read() failed: %s", sqlite3_errmsg(rhizome_db)); - sqlite3_blob_close(blob); + sqlite_blob_close(blob); RETURN(-1); } } - sqlite3_blob_close(blob); + sqlite_blob_close(blob); RETURN(bytes_read); OUT(); } @@ -1264,6 +1253,45 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write, return 0; } +uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size) +{ + sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; + uint64_t rowid = rhizome_create_fileblob(&retry, id, size); + if (rowid == 0) + return 0; + sqlite3_blob *blob = NULL; + if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", rowid, 1 /* read/write */, &blob) == -1) + goto fail; + char buf[16384]; + size_t offset = 0; + while (offset < size) { + size_t toread = size - offset; + if (toread > sizeof buf) + toread = sizeof buf; + ssize_t nread = read(fd, buf, toread); + if (nread == -1) { + WHYF_perror("read(%d,%p,%zu)", fd, buf, toread); + goto fail; + } + if ((size_t)nread == 0) { + WHYF("read(%d,%p,%zu) returned 0", fd, buf, toread); + goto fail; + } + if (sqlite_blob_write_retry(&retry, blob, buf, (int)nread, (int)offset) == -1) + goto fail; + assert((size_t)nread <= toread); + offset += (size_t)nread; + } + assert(offset == size); + sqlite_blob_close(blob); + return rowid; +fail: + if (blob) + sqlite_blob_close(blob); + sqlite_exec_void_retry(&retry, "DELETE FROM FILEBLOBS WHERE id = ?;", UINT64_TOSTR, id, END); + return 0; +} + enum rhizome_payload_status rhizome_journal_pipe(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t start_offset, uint64_t length) { struct rhizome_read read_state;