Support streaming directly into Rhizome store

The rhizome_open_write() function now accepts RHIZOME_SIZE_UNSET as the
payload size, and streams into an external blob until finished, then
copies the external blob into the FILEBLOBS table if it is small enough.

Needed so that HTTP GET /restful/rhizome/insert can stream the payload
directly into the DB even if the supplied manifest does not specify
'filesize'.
This commit is contained in:
Andrew Bettison 2013-12-30 14:56:50 +10:30
parent 7cecdf7a3a
commit f81883758a

@ -25,6 +25,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define RHIZOME_BUFFER_MAXIMUM_SIZE (1024*1024)
uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size);
int rhizome_exists(const rhizome_filehash_t *hashp)
{
uint64_t gotfile = 0;
@ -33,10 +35,28 @@ int rhizome_exists(const rhizome_filehash_t *hashp)
return gotfile;
}
/* Creates a row in the FILEBLOBS table and return the ROWID. Returns 0 if unsuccessful (error
* logged).
*/
static int rhizome_create_fileblob(sqlite_retry_state *retry, uint64_t id, size_t size)
{
if (sqlite_exec_void_retry(
retry,
"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES(?,?)",
UINT64_TOSTR, id,
ZEROBLOB, (int)size,
END) == -1
) {
WHYF("Failed to create blob, size=%zu, id=%"PRIu64, size, id);
return 0;
}
return sqlite3_last_insert_rowid(rhizome_db);
}
enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length, int priority)
{
assert(file_length != RHIZOME_SIZE_UNSET);
assert(file_length != 0);
if (file_length == 0)
return RHIZOME_PAYLOAD_STATUS_EMPTY;
write->blob_fd=-1;
@ -62,82 +82,49 @@ enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, cons
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
/*
/*
we have to write incrementally so that we can handle blobs larger than available memory.
This is possible using:
int sqlite3_bind_zeroblob(sqlite3_stmt*, int, int n);
That binds an all zeroes blob to a field. We can then populate the data by
opening a handle to the blob using:
int sqlite3_blob_write(sqlite3_blob *, const void *z, int n, int iOffset);
*/
sqlite3_stmt *statement = NULL;
int ret = sqlite_exec_void_retry(
*/
if (sqlite_exec_void_retry(
&retry,
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES(?,?,?,0,?);",
UINT64_TOSTR, write->temp_id,
INT64, file_length,
INT64, file_length == RHIZOME_SIZE_UNSET ? (int64_t)-1 : (int64_t)file_length,
INT, priority,
INT64, now,
END
);
if (ret==-1)
goto insert_row_fail;
) == -1
) {
WHYF("Failed to create FILES db row, id='%"PRIu64"'", write->temp_id);
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
char blob_path[1024];
if (file_length > config.rhizome.max_blob_size) {
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRId64, write->temp_id)){
WHY("Invalid path");
goto insert_row_fail;
}
if (file_length == RHIZOME_SIZE_UNSET || file_length > config.rhizome.max_blob_size) {
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRIu64, write->temp_id))
return RHIZOME_PAYLOAD_STATUS_ERROR;
if (config.debug.externalblobs)
DEBUGF("Attempting to put blob for id='%"PRId64"' in %s", write->temp_id, blob_path);
write->blob_fd=open(blob_path, O_CREAT | O_TRUNC | O_WRONLY, 0664);
if (write->blob_fd == -1)
goto insert_row_fail;
DEBUGF("Attempting to put blob for id='%"PRIu64"' in %s", write->temp_id, blob_path);
if ((write->blob_fd = open(blob_path, O_CREAT | O_TRUNC | O_RDWR, 0664)) == -1) {
WHYF("Failed to create payload file, id='%"PRIu64"'", write->temp_id);
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
if (config.debug.externalblobs)
DEBUGF("Writing to new blob file %s (fd=%d)", blob_path, write->blob_fd);
}else{
statement = sqlite_prepare_bind(
&retry,
"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES(?,?)",
UINT64_TOSTR, write->temp_id,
ZEROBLOB, (int)file_length,
END);
if (statement == NULL)
goto insert_row_fail;
/* Do actual insert, and abort if it fails */
int rowcount = 0;
int stepcode;
while ((stepcode = sqlite_step_retry(&retry, statement)) == SQLITE_ROW)
++rowcount;
if (rowcount)
WARNF("void query unexpectedly returned %d row%s", rowcount, rowcount == 1 ? "" : "s");
if (!sqlite_code_ok(stepcode)){
insert_row_fail:
WHYF("Failed to insert row for id='%"PRId64"'", write->temp_id);
if (statement)
sqlite3_finalize(statement);
} else {
if ((write->blob_rowid = rhizome_create_fileblob(&retry, write->temp_id, (size_t)file_length)) == 0) {
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
sqlite3_finalize(statement);
statement=NULL;
/* Get rowid for inserted row, so that we can modify the blob */
write->blob_rowid = sqlite3_last_insert_rowid(rhizome_db);
if (config.debug.rhizome_rx)
DEBUGF("Got rowid=%"PRId64" for id='%"PRId64"'", write->blob_rowid, write->temp_id);
DEBUGF("Got rowid=%"PRId64" for id='%"PRIu64"'", write->blob_rowid, write->temp_id);
}
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1){
if (write->blob_fd != -1){
if (write->blob_fd != -1) {
if (config.debug.externalblobs)
DEBUGF("Cancel write to fd %d", write->blob_fd);
close(write->blob_fd);
@ -146,13 +133,10 @@ enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, cons
}
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
write->file_length = file_length;
write->file_offset = 0;
write->written_offset = 0;
SHA512_Init(&write->sha512_context);
return RHIZOME_PAYLOAD_STATUS_NEW;
}
@ -167,7 +151,9 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer
return WHY("No content supplied");
/* Make sure we aren't being asked to write more data than we expected */
if (write_state->file_offset + data_size > write_state->file_length)
if ( write_state->file_length != RHIZOME_SIZE_UNSET
&& write_state->file_offset + data_size > write_state->file_length
)
return WHYF("Too much content supplied, %"PRIu64" + %zu > %"PRIu64,
write_state->file_offset, data_size, write_state->file_length);
@ -177,6 +163,9 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer
write_state->file_offset + write_state->tail,
write_state->key, write_state->nonce))
return -1;
if (config.debug.rhizome) {
//dump("after encrypt", buffer, data_size);
}
}
SHA512_Update(&write_state->sha512_context, buffer, data_size);
@ -188,32 +177,21 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer
}
// open database locks
static int write_get_lock(struct rhizome_write *write_state){
static int write_get_lock(struct rhizome_write *write_state)
{
if (write_state->blob_fd != -1 || write_state->sql_blob)
return 0;
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
return -1;
while(1){
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data",
write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob);
if (ret==SQLITE_OK){
sqlite_retry_done(&retry, "sqlite3_blob_open");
return 0;
}
if (!sqlite_code_busy(ret))
return WHYF("sqlite3_blob_open() failed: %s",
sqlite3_errmsg(rhizome_db));
if (sqlite_retry(&retry, "sqlite3_blob_open")==0)
return WHYF("Giving up");
}
if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob) == -1)
return -1;
return 0;
}
// write data to disk
static int write_data(struct rhizome_write *write_state, uint64_t file_offset, unsigned char *buffer, size_t data_size)
static int write_data(struct rhizome_write *write_state, uint64_t file_offset, const unsigned char *buffer, size_t data_size)
{
if (config.debug.rhizome) {
DEBUGF("write_state->file_length=%"PRIu64" file_offset=%"PRIu64, write_state->file_length, file_offset);
@ -243,18 +221,8 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u
if (!write_state->sql_blob)
return WHY("Must call write_get_lock() before write_data()");
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
while(1){
int ret=sqlite3_blob_write(write_state->sql_blob, buffer, data_size, file_offset);
if (ret==SQLITE_OK){
sqlite_retry_done(&retry, "sqlite3_blob_write");
break;
}
if (!sqlite_code_busy(ret))
return WHYF("sqlite3_blob_write() failed: %s",
sqlite3_errmsg(rhizome_db));
if (sqlite_retry(&retry, "sqlite3_blob_write")==0)
return WHY("Giving up");
}
if (sqlite_blob_write_retry(&retry, write_state->sql_blob, buffer, data_size, file_offset) == -1)
return -1;
}
write_state->written_offset = file_offset + data_size;
@ -272,11 +240,7 @@ static int write_release_lock(struct rhizome_write *write_state)
return 0;
if (write_state->sql_blob){
ret = sqlite3_blob_close(write_state->sql_blob);
if (ret)
WHYF("sqlite3_blob_close() failed: %s",
sqlite3_errmsg(rhizome_db));
ret = sqlite_blob_close(write_state->sql_blob);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1)
ret=-1;
@ -293,7 +257,9 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uns
DEBUGF("write_state->file_length=%"PRIu64" offset=%"PRIu64, write_state->file_length, offset);
dump("buffer", buffer, data_size);
}
if (offset + data_size > write_state->file_length)
if ( write_state->file_length != RHIZOME_SIZE_UNSET
&& offset + data_size > write_state->file_length
)
data_size = write_state->file_length - offset;
struct rhizome_write_buffer **ptr = &write_state->buffer_list;
@ -307,7 +273,9 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uns
// cache up to RHIZOME_BUFFER_MAXIMUM_SIZE or file length before attempting to write everything in one go.
// (Not perfect if the range overlaps)
uint64_t new_size = write_state->written_offset + write_state->buffer_size + data_size;
if (new_size >= write_state->file_length || new_size >= RHIZOME_BUFFER_MAXIMUM_SIZE)
if ( (write_state->file_length != RHIZOME_SIZE_UNSET && new_size >= write_state->file_length)
|| new_size >= RHIZOME_BUFFER_MAXIMUM_SIZE
)
should_write = 1;
}
uint64_t last_offset = write_state->written_offset;
@ -488,16 +456,52 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
goto failure;
}
}
assert(write->file_offset <= write->file_length);
if (write->file_offset < write->file_length) {
WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length);
status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
// Once the whole file has been written, we finally know its size and hash.
if (write->file_length == RHIZOME_SIZE_UNSET) {
if (config.debug.rhizome)
DEBUGF("Wrote %"PRIu64" bytes, set file_length", write->file_offset);
write->file_length = write->file_offset;
} else {
assert(write->file_offset <= write->file_length);
if (write->file_offset < write->file_length) {
WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length);
status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
goto failure;
}
}
rhizome_filehash_t hash_out;
SHA512_Final(hash_out.binary, &write->sha512_context);
SHA512_End(&write->sha512_context, NULL);
char blob_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRIu64, write->temp_id)) {
WHYF("Failed to generate external blob path");
status = RHIZOME_PAYLOAD_STATUS_ERROR;
goto failure;
}
// If the payload was written into an external blob (file) but is small enough to fit into a
// SQLite blob, then copy it into a proper blob (this occurs if rhizome_open_write() was called
// with file_length == RHIZOME_SIZE_UNSET).
int external = 0;
if (write->blob_fd != -1){
if (write->blob_fd != -1) {
external = 1;
if (write->file_length <= config.rhizome.max_blob_size) {
if (config.debug.rhizome)
DEBUGF("Copying %zu bytes from external file %s into blob, id=%"PRIu64, (size_t)write->file_offset, blob_path, write->temp_id);
int ret = 0;
if (lseek(write->blob_fd, 0, SEEK_SET) == (off_t) -1)
ret = WHYF_perror("lseek(%d,0,SEEK_SET)", write->blob_fd);
else if ((write->blob_rowid = rhizome_copy_file_to_blob(write->blob_fd, write->temp_id, (size_t)write->file_length)) == 0)
ret = -1;
if (ret == -1) {
WHY("Failed to copy external file into blob; keeping external file");
} else {
external = 0;
if (unlink(blob_path) == -1)
WARNF_perror("unlink(%s)", alloca_str_toprint(blob_path));
}
}
if (config.debug.externalblobs)
DEBUGF("Closing fd=%d", write->blob_fd);
close(write->blob_fd);
@ -507,10 +511,6 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
status = RHIZOME_PAYLOAD_STATUS_ERROR;
goto failure;
}
rhizome_filehash_t hash_out;
SHA512_Final(hash_out.binary, &write->sha512_context);
SHA512_End(&write->sha512_context, NULL);
if (write->id_known) {
if (cmp_rhizome_filehash_t(&write->id, &hash_out) != 0) {
@ -518,10 +518,9 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
status = RHIZOME_PAYLOAD_STATUS_WRONG_HASH;
goto failure;
}
} else {
} else
write->id = hash_out;
}
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
rhizome_remove_file_datainvalid(&retry, &write->id);
if (rhizome_exists(&write->id)) {
@ -529,33 +528,29 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE id = ?;", UINT64_TOSTR, write->temp_id, END);
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILES WHERE id = ?;", UINT64_TOSTR, write->temp_id, END);
if (config.debug.rhizome)
DEBUGF("File id=%s already present, removed id='%"PRId64"'", alloca_tohex_rhizome_filehash_t(write->id), write->temp_id);
DEBUGF("Payload id=%s already present, removed id='%"PRIu64"'", alloca_tohex_rhizome_filehash_t(write->id), write->temp_id);
} else {
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
goto dbfailure;
// delete any half finished records
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE id = ?;", RHIZOME_FILEHASH_T, &write->id, END);
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILES WHERE id = ?;", RHIZOME_FILEHASH_T, &write->id, END);
if (sqlite_exec_void_retry(
&retry,
"UPDATE FILES SET id = ?, inserttime = ?, datavalid = 1 WHERE id = ?",
"UPDATE FILES SET id = ?, length = ?, inserttime = ?, datavalid = 1 WHERE id = ?",
RHIZOME_FILEHASH_T, &write->id,
INT64, write->file_length,
INT64, gettime_ms(),
UINT64_TOSTR, write->temp_id,
END
) == -1
)
goto dbfailure;
if (external) {
char blob_path[1024];
char dest_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRId64, write->temp_id)){
WHYF("Failed to generate file path");
goto dbfailure;
}
if (!FORM_RHIZOME_DATASTORE_PATH(dest_path, alloca_tohex_rhizome_filehash_t(write->id))){
WHYF("Failed to generate file path");
goto dbfailure;
@ -584,7 +579,7 @@ enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
}
write->blob_rowid = 0;
return status;
dbfailure:
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
status = RHIZOME_PAYLOAD_STATUS_ERROR;
@ -827,17 +822,10 @@ static ssize_t rhizome_read_retry(sqlite_retry_state *retry, struct rhizome_read
RETURN(rd);
}
if (read_state->blob_rowid == 0)
RETURN(WHY("file not open"));
RETURN(WHY("blob not created"));
sqlite3_blob *blob = NULL;
int ret;
do {
assert(blob == NULL);
ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", read_state->blob_rowid, 0 /* read only */, &blob);
} while (sqlite_code_busy(ret) && sqlite_retry(retry, "sqlite3_blob_open"));
if (ret != SQLITE_OK) {
assert(blob == NULL);
RETURN(WHYF("sqlite3_blob_open() failed: %s", sqlite3_errmsg(rhizome_db)));
}
if (sqlite_blob_open_retry(retry, "main", "FILEBLOBS", "data", read_state->blob_rowid, 0 /* read only */, &blob) == -1)
RETURN(WHY("blob open failed"));
assert(blob != NULL);
if (read_state->length == RHIZOME_SIZE_UNSET)
read_state->length = sqlite3_blob_bytes(blob);
@ -849,16 +837,17 @@ static ssize_t rhizome_read_retry(sqlite_retry_state *retry, struct rhizome_read
if (bytes_read > bufsz)
bytes_read = bufsz;
assert(bytes_read > 0);
int ret;
do {
ret = sqlite3_blob_read(blob, buffer, (int) bytes_read, read_state->offset);
} while (sqlite_code_busy(ret) && sqlite_retry(retry, "sqlite3_blob_read"));
if (ret != SQLITE_OK) {
WHYF("sqlite3_blob_read() failed: %s", sqlite3_errmsg(rhizome_db));
sqlite3_blob_close(blob);
sqlite_blob_close(blob);
RETURN(-1);
}
}
sqlite3_blob_close(blob);
sqlite_blob_close(blob);
RETURN(bytes_read);
OUT();
}
@ -1264,6 +1253,45 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write,
return 0;
}
uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
uint64_t rowid = rhizome_create_fileblob(&retry, id, size);
if (rowid == 0)
return 0;
sqlite3_blob *blob = NULL;
if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", rowid, 1 /* read/write */, &blob) == -1)
goto fail;
char buf[16384];
size_t offset = 0;
while (offset < size) {
size_t toread = size - offset;
if (toread > sizeof buf)
toread = sizeof buf;
ssize_t nread = read(fd, buf, toread);
if (nread == -1) {
WHYF_perror("read(%d,%p,%zu)", fd, buf, toread);
goto fail;
}
if ((size_t)nread == 0) {
WHYF("read(%d,%p,%zu) returned 0", fd, buf, toread);
goto fail;
}
if (sqlite_blob_write_retry(&retry, blob, buf, (int)nread, (int)offset) == -1)
goto fail;
assert((size_t)nread <= toread);
offset += (size_t)nread;
}
assert(offset == size);
sqlite_blob_close(blob);
return rowid;
fail:
if (blob)
sqlite_blob_close(blob);
sqlite_exec_void_retry(&retry, "DELETE FROM FILEBLOBS WHERE id = ?;", UINT64_TOSTR, id, END);
return 0;
}
enum rhizome_payload_status rhizome_journal_pipe(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t start_offset, uint64_t length)
{
struct rhizome_read read_state;