mirror of
https://github.com/servalproject/serval-dna.git
synced 2024-12-24 07:16:43 +00:00
4aeb6545ad
Change semantics of fake space limit to exclude used space, otherwise the result will keep changing on each call
1998 lines
68 KiB
C
1998 lines
68 KiB
C
/*
|
|
Serval DNA Rhizome storage
|
|
Copyright (C) 2013 Serval Project Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; either version 2
|
|
of the License, or (at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include <assert.h>
|
|
#ifdef HAVE_SYS_STATVFS_H
|
|
# include <sys/statvfs.h>
|
|
#else
|
|
# if defined(HAVE_SYS_STAT_H) && defined(HAVE_SYS_VFS_H)
|
|
# include <sys/stat.h>
|
|
# include <sys/vfs.h>
|
|
# define statvfs statfs
|
|
# endif
|
|
#endif
|
|
#include "serval.h"
|
|
#include "rhizome.h"
|
|
#include "conf.h"
|
|
#include "str.h"
|
|
#include "numeric_str.h"
|
|
#include "debug.h"
|
|
|
|
#define RHIZOME_BUFFER_MAXIMUM_SIZE (1024*1024)
|
|
|
|
uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size);
|
|
|
|
#define FORM_BLOB_PATH(BUFF,SUBDIR,HASH) FORMF_RHIZOME_STORE_PATH((BUFF),"%s/%02X/%02X/%s", (SUBDIR), (HASH)->binary[0], (HASH)->binary[1], alloca_tohex(&(HASH)->binary[2], sizeof((HASH)->binary)-2))
|
|
|
|
enum rhizome_payload_status rhizome_exists(const rhizome_filehash_t *hashp)
|
|
{
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
uint64_t gotfile = 0;
|
|
int stepcode = sqlite_exec_uint64_retry(&retry, &gotfile, "SELECT COUNT(*) FROM FILES WHERE id = ? and datavalid = 1;",
|
|
RHIZOME_FILEHASH_T, hashp, END);
|
|
if (sqlite_code_busy(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
if (!sqlite_code_ok(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
if (gotfile==0)
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
|
|
char blob_path[1024];
|
|
if (FORM_BLOB_PATH(blob_path, RHIZOME_BLOB_SUBDIR, hashp)){
|
|
struct stat st;
|
|
if (stat(blob_path, &st) == 0)
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
}
|
|
|
|
char legacy_path[1024];
|
|
// migrate from flat folder to sub-tree's
|
|
if (FORMF_RHIZOME_STORE_PATH(legacy_path, "%s/%s", RHIZOME_BLOB_SUBDIR, alloca_tohex_rhizome_filehash_t(*hashp))){
|
|
struct stat st;
|
|
if (stat(legacy_path, &st) == 0
|
|
&& emkdirsn(legacy_path, strrchr(legacy_path,'/') - legacy_path, 0700)!=-1
|
|
&& rename(legacy_path, blob_path) != -1){
|
|
INFOF("Moved %s to %s", legacy_path, blob_path);
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
}
|
|
}
|
|
|
|
uint64_t blob_rowid = 0;
|
|
stepcode = sqlite_exec_uint64_retry(&retry, &blob_rowid,
|
|
"SELECT rowid "
|
|
"FROM FILEBLOBS "
|
|
"WHERE id = ?", RHIZOME_FILEHASH_T, hashp, END);
|
|
|
|
if (sqlite_code_busy(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
if (!sqlite_code_ok(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
if (blob_rowid!=0)
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
}
|
|
|
|
/* Creates a row in the FILEBLOBS table and return the ROWID. Returns 0 if unsuccessful (error
|
|
* logged).
|
|
*/
|
|
static uint64_t rhizome_create_fileblob(sqlite_retry_state *retry, uint64_t id, size_t size)
|
|
{
|
|
if (sqlite_exec_void_retry(
|
|
retry,
|
|
"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES(?,?)",
|
|
UINT64_TOSTR, id,
|
|
ZEROBLOB, (int)size,
|
|
END) == -1
|
|
) {
|
|
WHYF("Failed to create blob, size=%zu, id=%"PRIu64, size, id);
|
|
return 0;
|
|
}
|
|
uint64_t rowid = sqlite3_last_insert_rowid(rhizome_database.db);
|
|
DEBUGF(rhizome_store, "Inserted fileblob rowid=%"PRId64" for id='%"PRIu64"'", rowid, id);
|
|
return rowid;
|
|
}
|
|
|
|
static int rhizome_delete_external(const rhizome_filehash_t *id)
|
|
{
|
|
// attempt to remove any external blob & partial hash file
|
|
char blob_path[1024];
|
|
if (FORM_BLOB_PATH(blob_path, RHIZOME_HASH_SUBDIR, id))
|
|
unlink(blob_path);
|
|
if (!FORM_BLOB_PATH(blob_path, RHIZOME_BLOB_SUBDIR, id))
|
|
return -1;
|
|
if (unlink(blob_path) == -1) {
|
|
if (errno != ENOENT)
|
|
return WHYF_perror("unlink(%s)", alloca_str_toprint(blob_path));
|
|
return 1;
|
|
}
|
|
DEBUGF(rhizome_store, "Deleted blob file %s", blob_path);
|
|
return 0;
|
|
}
|
|
|
|
static int rhizome_delete_file_retry(sqlite_retry_state *retry, const rhizome_filehash_t *filehash)
|
|
{
|
|
int ret = 0;
|
|
rhizome_delete_external(filehash);
|
|
sqlite3_stmt *statement = sqlite_prepare_bind(retry, "DELETE FROM fileblobs WHERE id = ?", RHIZOME_FILEHASH_T, filehash, END);
|
|
if (!statement || sqlite_exec_retry(retry, statement) == -1)
|
|
ret = -1;
|
|
statement = sqlite_prepare_bind(retry, "DELETE FROM files WHERE id = ?", RHIZOME_FILEHASH_T, filehash, END);
|
|
if (!statement || sqlite_exec_retry(retry, statement) == -1)
|
|
ret = -1;
|
|
return ret == -1 ? -1 : sqlite3_changes(rhizome_database.db) ? 0 : 1;
|
|
}
|
|
|
|
static int rhizome_delete_payload_retry(sqlite_retry_state *retry, const rhizome_bid_t *bidp)
|
|
{
|
|
strbuf fh = strbuf_alloca(RHIZOME_FILEHASH_STRLEN + 1);
|
|
int rows = sqlite_exec_strbuf_retry(retry, fh, "SELECT filehash FROM manifests WHERE id = ?", RHIZOME_BID_T, bidp, END);
|
|
if (rows == -1)
|
|
return -1;
|
|
if (rows){
|
|
rhizome_filehash_t hash;
|
|
if (str_to_rhizome_filehash_t(&hash, strbuf_str(fh))==-1)
|
|
return -1;
|
|
if (rhizome_delete_file_retry(retry, &hash) == -1)
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/* Remove a bundle's payload (file) from the database, given its manifest ID, leaving its manifest
|
|
* untouched if present.
|
|
*
|
|
* Returns 0 if manifest is found, its payload is found and removed
|
|
* Returns 1 if manifest or payload is not found
|
|
* Returns -1 on error
|
|
*
|
|
* @author Andrew Bettison <andrew@servalproject.com>
|
|
*/
|
|
int rhizome_delete_payload(const rhizome_bid_t *bidp)
|
|
{
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
return rhizome_delete_payload_retry(&retry, bidp);
|
|
}
|
|
|
|
/* Remove a file from the database, given its file hash.
|
|
*
|
|
* Returns 0 if file is found and removed
|
|
* Returns 1 if file is not found
|
|
* Returns -1 on error
|
|
*
|
|
* @author Andrew Bettison <andrew@servalproject.com>
|
|
*/
|
|
int rhizome_delete_file(const rhizome_filehash_t *filehash)
|
|
{
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
return rhizome_delete_file_retry(&retry, filehash);
|
|
}
|
|
|
|
static enum rhizome_payload_status store_space_report(sqlite_retry_state *retry, struct rhizome_space_report *space){
|
|
int stepcode = sqlite_exec_uint64_retry(retry, &space->db_page_size, "PRAGMA page_size;", END);
|
|
if (sqlite_code_ok(stepcode))
|
|
stepcode = sqlite_exec_uint64_retry(retry, &space->db_total_pages, "PRAGMA page_count;", END);
|
|
if (sqlite_code_ok(stepcode))
|
|
stepcode = sqlite_exec_uint64_retry(retry, &space->db_available_pages, "PRAGMA freelist_count;", END);
|
|
if (sqlite_code_ok(stepcode)){
|
|
sqlite3_stmt *statement = sqlite_prepare_bind(retry,
|
|
"SELECT CASE WHEN B.ID IS NULL THEN 0 ELSE 1 END, SUM(length), count(*) "
|
|
"FROM FILES F "
|
|
"LEFT JOIN FILEBLOBS B "
|
|
"ON F.ID = B.ID "
|
|
"GROUP BY CASE WHEN B.ID IS NULL THEN 0 ELSE 1 END;",
|
|
END);
|
|
if (statement == NULL)
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
|
|
space->file_count=0;
|
|
space->internal_bytes=0;
|
|
space->external_bytes=0;
|
|
while((stepcode = sqlite_step_retry(retry, statement)) == SQLITE_ROW) {
|
|
int64_t type = sqlite3_column_int64(statement, 0);
|
|
int64_t len = sqlite3_column_int64(statement, 1);
|
|
int64_t count = sqlite3_column_int64(statement, 2);
|
|
|
|
space->file_count += count;
|
|
if (type==1){
|
|
space->internal_bytes = len;
|
|
}else{
|
|
space->external_bytes = len;
|
|
}
|
|
}
|
|
sqlite3_finalize(statement);
|
|
}
|
|
if (sqlite_code_busy(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
if (!sqlite_code_ok(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
|
|
space->content_bytes = space->external_bytes + space->db_page_size * (space->db_total_pages - space->db_available_pages);
|
|
|
|
// Measure filesystem free space
|
|
space->filesystem_bytes = UINT64_MAX;
|
|
space->filesystem_free_bytes = UINT64_MAX;
|
|
|
|
#if defined(HAVE_SYS_STATVFS_H) || (defined(HAVE_SYS_STAT_H) && defined(HAVE_SYS_VFS_H))
|
|
{
|
|
struct statvfs stats;
|
|
if (statvfs(rhizome_database.dir_path, &stats)==-1)
|
|
WARNF_perror("statvfs(%s)", rhizome_database.dir_path);
|
|
else{
|
|
space->filesystem_bytes = stats.f_frsize * (uint64_t)stats.f_blocks;
|
|
space->filesystem_free_bytes = stats.f_frsize * (uint64_t)stats.f_bavail;
|
|
}
|
|
}
|
|
#endif
|
|
// Fake limit for reproducible testing
|
|
const char *fake_space = getenv("SERVALD_FAKE_SPACE_LIMIT");
|
|
if (fake_space){
|
|
uint64_t space_limit;
|
|
// subtrace measured space used to give the same result as we add and remove content
|
|
if (str_to_uint64_scaled(fake_space, 10, &space_limit, NULL)==1
|
|
&& space_limit < space->filesystem_free_bytes + space->content_bytes)
|
|
space->filesystem_free_bytes = space_limit - space->content_bytes;
|
|
}
|
|
|
|
// Calculate storage limit
|
|
space->content_limit_bytes = config.rhizome.database_size;
|
|
|
|
if (config.rhizome.min_free_space !=0){
|
|
uint64_t space_limit;
|
|
if (space->content_bytes + space->filesystem_free_bytes < config.rhizome.min_free_space)
|
|
space_limit = 0;
|
|
else
|
|
space_limit = space->content_bytes + space->filesystem_free_bytes - config.rhizome.min_free_space;
|
|
if (space_limit < space->content_limit_bytes)
|
|
space->content_limit_bytes = space_limit;
|
|
}
|
|
|
|
DEBUGF(rhizome, "RHIZOME SPACE USED bytes=%"PRIu64" (%sB), FREE bytes=%"PRIu64" (%sB), LIMIT bytes=%"PRIu64" (%sB)",
|
|
space->content_bytes, alloca_double_scaled_binary(space->content_bytes),
|
|
space->filesystem_free_bytes, alloca_double_scaled_binary(space->filesystem_free_bytes),
|
|
space->content_limit_bytes, alloca_double_scaled_binary(space->content_limit_bytes));
|
|
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_store_space_usage(struct rhizome_space_report *space)
|
|
{
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
return store_space_report(&retry, space);
|
|
}
|
|
|
|
|
|
static enum rhizome_payload_status sqlite_vacuum(sqlite_retry_state *retry, struct rhizome_space_report *space){
|
|
if (space->db_available_pages == 0)
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
|
|
// vacuum database pages if more than 1/4 of the db is free or we're already over the limit
|
|
if (space->db_available_pages > (space->db_total_pages>>2)+1 || space->external_bytes + space->db_page_size * space->db_total_pages > space->content_limit_bytes){
|
|
rhizome_vacuum_db(retry);
|
|
|
|
int stepcode = sqlite_exec_uint64_retry(retry, &space->db_total_pages, "PRAGMA page_count;", END);
|
|
if (sqlite_code_ok(stepcode))
|
|
stepcode = sqlite_exec_uint64_retry(retry, &space->db_available_pages, "PRAGMA freelist_count;", END);
|
|
|
|
if (sqlite_code_busy(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
if (!sqlite_code_ok(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
}
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
}
|
|
|
|
// TODO readonly version?
|
|
static enum rhizome_payload_status store_make_space(uint64_t bytes, struct rhizome_cleanup_report *report)
|
|
{
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
struct rhizome_space_report *space = (report ? &report->space_used : alloca(sizeof *space));
|
|
int stepcode;
|
|
|
|
enum rhizome_payload_status r;
|
|
if ((r = store_space_report(&retry, space)) != RHIZOME_PAYLOAD_STATUS_EMPTY)
|
|
return r;
|
|
|
|
if (bytes && bytes >= space->content_limit_bytes){
|
|
DEBUGF(rhizome, "Not enough space for %"PRIu64". Used; %"PRIu64" = %"PRIu64" + %"PRIu64" * (%"PRIu64" - %"PRIu64"), Limit; %"PRIu64,
|
|
bytes, space->content_bytes, space->external_bytes, space->db_page_size, space->db_total_pages, space->db_available_pages, space->content_limit_bytes);
|
|
return RHIZOME_PAYLOAD_STATUS_TOO_BIG;
|
|
}
|
|
|
|
if ((r = sqlite_vacuum(&retry, space)) != RHIZOME_PAYLOAD_STATUS_EMPTY)
|
|
return r;
|
|
|
|
// If there is enough space, do nothing
|
|
if (space->content_bytes + bytes <= space->content_limit_bytes)
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
|
|
// penalise new things by 10 minutes to reduce churn
|
|
time_ms_t cost = gettime_ms() - 60000 - bytes;
|
|
|
|
// query files by age, penalise larger files so they are removed earlier
|
|
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
|
|
"SELECT id, length, inserttime FROM FILES ORDER BY (inserttime - length)",
|
|
END);
|
|
if (!statement)
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
|
|
while (space->content_bytes + bytes > space->content_limit_bytes && (stepcode=sqlite_step_retry(&retry, statement)) == SQLITE_ROW) {
|
|
const char *id=(const char *) sqlite3_column_text(statement, 0);
|
|
uint64_t length = sqlite3_column_int(statement, 1);
|
|
time_ms_t inserttime = sqlite3_column_int64(statement, 2);
|
|
|
|
time_ms_t cost_existing = inserttime - length;
|
|
|
|
DEBUGF(rhizome, "Considering dropping file %s, size %"PRId64" cost %"PRId64" vs %"PRId64" to add %"PRId64" new bytes",
|
|
id, length, cost, cost_existing, bytes);
|
|
// don't allow the new file, we've got more important things to store
|
|
if (bytes && cost < cost_existing)
|
|
break;
|
|
|
|
// drop the existing content and recalculate used space
|
|
rhizome_filehash_t hash;
|
|
if (str_to_rhizome_filehash_t(&hash, id)!=-1
|
|
&& rhizome_delete_external(&hash)==0)
|
|
space->external_bytes -= length;
|
|
|
|
int rowcount=0;
|
|
sqlite3_stmt *s = sqlite_prepare_bind(&retry, "DELETE FROM fileblobs WHERE id = ?", STATIC_TEXT, id, END);
|
|
if (s && !sqlite_code_ok(stepcode = sqlite_exec_code_retry(&retry, s, &rowcount)))
|
|
break;
|
|
if (rowcount>0)
|
|
space->internal_bytes -= length;
|
|
|
|
s = sqlite_prepare_bind(&retry, "DELETE FROM files WHERE id = ?", STATIC_TEXT, id, END);
|
|
if (s && !sqlite_code_ok(stepcode = sqlite_exec_code_retry(&retry, s, &rowcount)))
|
|
break;
|
|
if (rowcount>0)
|
|
space->file_count --;
|
|
|
|
if (!sqlite_code_ok(stepcode = sqlite_exec_uint64_retry(&retry, &space->db_total_pages, "PRAGMA page_count;", END)))
|
|
break;
|
|
if (!sqlite_code_ok(stepcode = sqlite_exec_uint64_retry(&retry, &space->db_available_pages, "PRAGMA freelist_count;", END)))
|
|
break;
|
|
|
|
if (report)
|
|
report->deleted_expired_files++;
|
|
space->content_bytes = space->external_bytes + space->db_page_size * (space->db_total_pages - space->db_available_pages);
|
|
}
|
|
sqlite3_finalize(statement);
|
|
|
|
if (sqlite_code_busy(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
if (!sqlite_code_ok(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
|
|
if ((r = sqlite_vacuum(&retry, space)) != RHIZOME_PAYLOAD_STATUS_EMPTY)
|
|
return r;
|
|
|
|
if (space->content_bytes + bytes <= space->content_limit_bytes)
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
|
|
DEBUGF(rhizome, "Not enough space for %"PRIu64". Used; %"PRIu64" = %"PRIu64" + %"PRIu64" * (%"PRIu64" - %"PRIu64"), Limit; %"PRIu64,
|
|
bytes, space->content_bytes, space->external_bytes, space->db_page_size, space->db_total_pages, space->db_available_pages, space->content_limit_bytes);
|
|
|
|
return RHIZOME_PAYLOAD_STATUS_EVICTED;
|
|
}
|
|
|
|
int rhizome_store_cleanup(struct rhizome_cleanup_report *report)
|
|
{
|
|
return store_make_space(0, report);
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length)
|
|
{
|
|
DEBUGF(rhizome_store, "file_length=%"PRIu64, file_length);
|
|
|
|
if (file_length == 0)
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
|
|
write->blob_fd=-1;
|
|
write->sql_blob=NULL;
|
|
|
|
if (expectedHashp){
|
|
if (rhizome_exists(expectedHashp) == RHIZOME_PAYLOAD_STATUS_STORED)
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
write->id = *expectedHashp;
|
|
write->id_known=1;
|
|
}else{
|
|
write->id_known=0;
|
|
}
|
|
|
|
if (file_length!=RHIZOME_SIZE_UNSET){
|
|
enum rhizome_payload_status status = store_make_space(file_length, NULL);
|
|
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
|
|
return status;
|
|
}
|
|
|
|
static unsigned id=0;
|
|
write->temp_id = (getpid()<<16) + id++;
|
|
|
|
write->file_length = file_length;
|
|
write->file_offset = 0;
|
|
write->written_offset = 0;
|
|
crypto_hash_sha512_init(&write->sha512_context);
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
}
|
|
|
|
/* blob_open / close will lock the database, this is bad for other processes that might attempt to
|
|
* use it at the same time. However, opening a blob has about O(n^2) performance.
|
|
* */
|
|
|
|
// encrypt and hash data, data buffers must be passed in file order.
|
|
static int prepare_data(struct rhizome_write *write_state, uint8_t *buffer, size_t data_size)
|
|
{
|
|
if (data_size <= 0)
|
|
return WHY("No content supplied");
|
|
|
|
/* Make sure we aren't being asked to write more data than we expected */
|
|
if ( write_state->file_length != RHIZOME_SIZE_UNSET
|
|
&& write_state->file_offset + data_size > write_state->file_length
|
|
)
|
|
return WHYF("Too much content supplied, %"PRIu64" + %zu > %"PRIu64,
|
|
write_state->file_offset, data_size, write_state->file_length);
|
|
|
|
if (write_state->crypt){
|
|
if (rhizome_crypt_xor_block(
|
|
buffer, data_size,
|
|
write_state->file_offset + write_state->tail,
|
|
write_state->key, write_state->nonce))
|
|
return -1;
|
|
}
|
|
|
|
crypto_hash_sha512_update(&write_state->sha512_context, buffer, data_size);
|
|
write_state->file_offset+=data_size;
|
|
|
|
DEBUGF(rhizome_store, "Processed %"PRIu64" of %"PRIu64, write_state->file_offset, write_state->file_length);
|
|
return 0;
|
|
}
|
|
|
|
// open database locks
|
|
static int write_get_lock(struct rhizome_write *write_state)
|
|
{
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
|
|
if (write_state->blob_fd != -1 || write_state->sql_blob)
|
|
return 0;
|
|
|
|
if (write_state->file_length == RHIZOME_SIZE_UNSET ||
|
|
write_state->file_length > config.rhizome.max_blob_size){
|
|
char blob_path[1024];
|
|
if (!FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write_state->temp_id))
|
|
return -1;
|
|
DEBUGF(rhizome_store, "Attempting to put blob for id='%"PRIu64"' in %s", write_state->temp_id, blob_path);
|
|
if (emkdirsn(blob_path, strrchr(blob_path,'/') - blob_path, 0700) == -1)
|
|
return -1;
|
|
if ((write_state->blob_fd = open(blob_path, O_CREAT | O_TRUNC | O_RDWR, 0664)) == -1) {
|
|
WHYF("Failed to create payload file, id='%"PRIu64"'", write_state->temp_id);
|
|
return -1;
|
|
}
|
|
DEBUGF(rhizome_store, "Writing to new blob file %s (fd=%d)", blob_path, write_state->blob_fd);
|
|
}else{
|
|
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
|
|
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
|
|
return -1;
|
|
if (write_state->blob_rowid == 0){
|
|
write_state->blob_rowid = rhizome_create_fileblob(&retry, write_state->temp_id, write_state->file_length);
|
|
if (write_state->blob_rowid == 0)
|
|
goto fail;
|
|
}
|
|
if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", write_state->blob_rowid, 1 /* read/write */, &write_state->sql_blob) == -1)
|
|
goto fail;
|
|
}
|
|
return 0;
|
|
|
|
fail:
|
|
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
|
|
return -1;
|
|
}
|
|
|
|
// write data to disk
|
|
static int write_data(struct rhizome_write *write_state, uint64_t file_offset, uint8_t *buffer, size_t data_size)
|
|
{
|
|
DEBUGF(rhizome_store, "write_state->file_length=%"PRIu64" file_offset=%"PRIu64, write_state->file_length, file_offset);
|
|
|
|
if (data_size<=0)
|
|
return 0;
|
|
|
|
if (file_offset != write_state->written_offset)
|
|
WARNF("Writing file data out of order! [%"PRId64",%"PRId64"]", file_offset, write_state->written_offset);
|
|
|
|
if (write_state->blob_fd != -1) {
|
|
size_t ofs = 0;
|
|
// keep trying until all of the data is written.
|
|
if (lseek64(write_state->blob_fd, (off64_t) file_offset, SEEK_SET) == -1)
|
|
return WHYF_perror("lseek64(%d,%"PRIu64",SEEK_SET)", write_state->blob_fd, file_offset);
|
|
while (ofs < data_size){
|
|
ssize_t r = write(write_state->blob_fd, buffer + ofs, (size_t)(data_size - ofs));
|
|
if (r == -1)
|
|
return WHY_perror("write");
|
|
DEBUGF(rhizome_store, "Wrote %zd bytes to fd %d", (size_t)r, write_state->blob_fd);
|
|
ofs += (size_t)r;
|
|
}
|
|
}else{
|
|
if (!write_state->sql_blob)
|
|
return WHY("Must call write_get_lock() before write_data()");
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
if (sqlite_blob_write_retry(&retry, write_state->sql_blob, buffer, data_size, file_offset) == -1)
|
|
return -1;
|
|
}
|
|
|
|
write_state->written_offset = file_offset + data_size;
|
|
|
|
DEBUGF(rhizome_store, "Wrote %"PRIu64" of %"PRIu64, file_offset + data_size, write_state->file_length);
|
|
return 0;
|
|
}
|
|
|
|
// close database locks
|
|
static int write_release_lock(struct rhizome_write *write_state)
|
|
{
|
|
int ret=0;
|
|
if (write_state->sql_blob){
|
|
ret = sqlite_blob_close(write_state->sql_blob);
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1){
|
|
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
|
|
ret=-1;
|
|
}
|
|
write_state->sql_blob=NULL;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
// Write data buffers in any order, the data will be cached and streamed into the database in file order.
|
|
// Though there is an upper bound on the amount of cached data
|
|
int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uint8_t *buffer, size_t data_size)
|
|
{
|
|
DEBUGF(rhizome_store, "write_state->file_length=%"PRIu64" offset=%"PRIu64, write_state->file_length, offset);
|
|
if ( write_state->file_length != RHIZOME_SIZE_UNSET
|
|
&& offset >= write_state->file_length)
|
|
return 0;
|
|
|
|
if ( write_state->file_length != RHIZOME_SIZE_UNSET
|
|
&& offset + data_size > write_state->file_length)
|
|
data_size = write_state->file_length - offset;
|
|
|
|
struct rhizome_write_buffer **ptr = &write_state->buffer_list;
|
|
int ret=0;
|
|
int should_write = 0;
|
|
|
|
// if we are writing to a file, or already have the sql blob open, or are finishing, write as much
|
|
// as we can.
|
|
if (write_state->blob_fd != -1 ||
|
|
write_state->sql_blob ||
|
|
buffer == NULL ||
|
|
write_state->file_length > config.rhizome.max_blob_size ||
|
|
write_state->file_offset > config.rhizome.max_blob_size) {
|
|
should_write = 1;
|
|
DEBUGF(rhizome_store, "Attempting to write (fd=%d, blob=%p, buffer=%p, len=%"PRId64", offset=%"PRId64")",
|
|
write_state->blob_fd, write_state->sql_blob, buffer,
|
|
write_state->file_length, write_state->file_offset);
|
|
} else {
|
|
// cache up to RHIZOME_BUFFER_MAXIMUM_SIZE or file length before attempting to write everything in one go.
|
|
// (Not perfect if the range overlaps)
|
|
uint64_t new_size = write_state->written_offset + write_state->buffer_size + data_size;
|
|
if ( (write_state->file_length != RHIZOME_SIZE_UNSET && new_size >= write_state->file_length)
|
|
|| new_size >= RHIZOME_BUFFER_MAXIMUM_SIZE
|
|
)
|
|
should_write = 1;
|
|
}
|
|
uint64_t last_offset = write_state->written_offset;
|
|
|
|
while(1){
|
|
|
|
// can we process this existing data block now?
|
|
if (*ptr && (*ptr)->offset == write_state->file_offset){
|
|
if (prepare_data(write_state, (*ptr)->data, (*ptr)->data_size)){
|
|
ret=-1;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// if existing data should be written, do so now
|
|
if (should_write && *ptr && (*ptr)->offset == write_state->written_offset){
|
|
struct rhizome_write_buffer *n=*ptr;
|
|
|
|
if ( write_get_lock(write_state)
|
|
|| write_data(write_state, n->offset, n->data, n->data_size)){
|
|
// ignore transient write errors, eg db locks
|
|
should_write = 0;
|
|
continue;
|
|
}
|
|
|
|
*ptr=n->_next;
|
|
write_state->buffer_size-=n->data_size;
|
|
last_offset = n->offset + n->data_size;
|
|
free(n);
|
|
continue;
|
|
}
|
|
|
|
// skip over incoming data that we've already received
|
|
if (offset < last_offset){
|
|
uint64_t delta = last_offset - offset;
|
|
if (delta >= data_size)
|
|
break;
|
|
data_size -= delta;
|
|
offset+=delta;
|
|
buffer+=delta;
|
|
}
|
|
|
|
// no new data? we can just stop now.
|
|
if (data_size<=0)
|
|
break;
|
|
|
|
if (!*ptr || offset < (*ptr)->offset){
|
|
// found the insert position in the list
|
|
size_t size = data_size;
|
|
|
|
// allow for buffers to overlap, we may need to split the incoming buffer into multiple pieces.
|
|
if (*ptr && offset+size > (*ptr)->offset)
|
|
size = (*ptr)->offset - offset;
|
|
|
|
// should we process the incoming data block now?
|
|
if (offset == write_state->file_offset){
|
|
if (prepare_data(write_state, buffer, size)){
|
|
ret=-1;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (should_write && offset == write_state->written_offset){
|
|
|
|
if ( write_get_lock(write_state)
|
|
|| write_data(write_state, offset, buffer, size)){
|
|
should_write = 0;
|
|
continue;
|
|
}
|
|
|
|
// we need to go around the loop again to re-test if *ptr can now be written
|
|
|
|
}else{
|
|
// impose a limit on the total amount of cached data
|
|
if (write_state->buffer_size + size > RHIZOME_BUFFER_MAXIMUM_SIZE)
|
|
size = RHIZOME_BUFFER_MAXIMUM_SIZE - write_state->buffer_size;
|
|
if (size<=0)
|
|
break;
|
|
|
|
DEBUGF(rhizome_store, "Caching block @%"PRId64", %zu", offset, size);
|
|
struct rhizome_write_buffer *i = emalloc(size + sizeof(struct rhizome_write_buffer));
|
|
if (!i){
|
|
ret=-1;
|
|
break;
|
|
}
|
|
i->offset = offset;
|
|
i->buffer_size = i->data_size = size;
|
|
bcopy(buffer, i->data, size);
|
|
i->_next = *ptr;
|
|
write_state->buffer_size += size;
|
|
*ptr = i;
|
|
// if there's any overlap of this buffer and the current one, we may need to add another buffer.
|
|
ptr = &((*ptr)->_next);
|
|
}
|
|
data_size -= size;
|
|
offset+=size;
|
|
buffer+=size;
|
|
continue;
|
|
}
|
|
|
|
last_offset = (*ptr)->offset + (*ptr)->data_size;
|
|
ptr = &((*ptr)->_next);
|
|
}
|
|
if (write_release_lock(write_state))
|
|
ret=-1;
|
|
return ret;
|
|
}
|
|
|
|
int rhizome_write_buffer(struct rhizome_write *write_state, uint8_t *buffer, size_t data_size)
|
|
{
|
|
return rhizome_random_write(write_state, write_state->file_offset, buffer, data_size);
|
|
}
|
|
|
|
/* If file_length is known, then expects file to be at least file_length in size, ignoring anything
|
|
* longer than that. Returns 0 if successful, -1 if error (logged).
|
|
*/
|
|
int rhizome_write_file(struct rhizome_write *write, const char *filename, off_t offset, uint64_t length)
|
|
{
|
|
int fd = open(filename, O_RDONLY);
|
|
if (fd == -1)
|
|
return WHYF_perror("open(%s,O_RDONLY)", alloca_str_toprint(filename));
|
|
unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE];
|
|
int ret=0;
|
|
if (offset){
|
|
if (lseek(fd, offset, SEEK_SET)==-1)
|
|
return WHYF_perror("lseek(%d,%zu,SEEK_SET)", fd, (unsigned long long)offset);
|
|
}
|
|
if (length == RHIZOME_SIZE_UNSET || length > write->file_length)
|
|
length = write->file_length;
|
|
while (length == RHIZOME_SIZE_UNSET || write->file_offset < length) {
|
|
size_t size = sizeof buffer;
|
|
if (length != RHIZOME_SIZE_UNSET && write->file_offset + size > length)
|
|
size = length - write->file_offset;
|
|
ssize_t r = read(fd, buffer, size);
|
|
if (r == -1) {
|
|
ret = WHYF_perror("read(%d,%p,%zu)", fd, buffer, size);
|
|
break;
|
|
}
|
|
if (length != RHIZOME_SIZE_UNSET && (size_t) r != size) {
|
|
ret = WHYF("file truncated - read(%d,%p,%zu) returned %zu", fd, buffer, size, (size_t) r);
|
|
break;
|
|
}
|
|
if (r && rhizome_write_buffer(write, buffer, (size_t) r)) {
|
|
ret = -1;
|
|
break;
|
|
}
|
|
if ((size_t) r != size)
|
|
break;
|
|
}
|
|
if (write_release_lock(write))
|
|
ret = -1;
|
|
close(fd);
|
|
return ret;
|
|
}
|
|
|
|
int is_rhizome_write_open(const struct rhizome_write *write)
|
|
{
|
|
return write->temp_id ? 1:0;
|
|
}
|
|
|
|
void rhizome_fail_write(struct rhizome_write *write)
|
|
{
|
|
if (write->blob_fd != -1){
|
|
DEBUGF(rhizome_store, "Closing and removing fd %d", write->blob_fd);
|
|
close(write->blob_fd);
|
|
write->blob_fd=-1;
|
|
char blob_path[1024];
|
|
if (FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write->temp_id)){
|
|
unlink(blob_path);
|
|
}
|
|
}
|
|
write_release_lock(write);
|
|
if (write->blob_rowid){
|
|
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM FILEBLOBS WHERE rowid = ?;",
|
|
INT64, write->blob_rowid, END);
|
|
}
|
|
while(write->buffer_list){
|
|
struct rhizome_write_buffer *n=write->buffer_list;
|
|
write->buffer_list=n->_next;
|
|
free(n);
|
|
}
|
|
write->temp_id=0;
|
|
}
|
|
|
|
static int keep_hash(struct rhizome_write *write_state, struct crypto_hash_sha512_state *hash_state)
|
|
{
|
|
char dest_path[1024];
|
|
// capture the state of writing the file hash
|
|
if (!FORM_BLOB_PATH(dest_path, RHIZOME_HASH_SUBDIR, &write_state->id))
|
|
return WHYF("Path too long?");
|
|
if (emkdirsn(dest_path, strrchr(dest_path,'/') - dest_path, 0700)<0)
|
|
return -1;
|
|
int fd = open(dest_path, O_WRONLY | O_CREAT | O_TRUNC, 0664);
|
|
if (fd < 0)
|
|
return WHYF_perror("Failed to create %s", dest_path);
|
|
ssize_t w = write(fd, hash_state, sizeof *hash_state);
|
|
close(fd);
|
|
if (w != sizeof *hash_state)
|
|
return WHYF("Failed to write hash state");
|
|
DEBUGF(rhizome, "Preserved partial hash to %s", dest_path);
|
|
return 1;
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
|
|
{
|
|
DEBUGF(rhizome_store, "blob_fd=%d file_offset=%"PRIu64"", write->blob_fd, write->file_offset);
|
|
|
|
enum rhizome_payload_status status = RHIZOME_PAYLOAD_STATUS_NEW;
|
|
|
|
// Once the whole file has been processed, we should finally know its length
|
|
if (write->file_length == RHIZOME_SIZE_UNSET) {
|
|
DEBUGF(rhizome_store, "Wrote %"PRIu64" bytes, set file_length", write->file_offset);
|
|
write->file_length = write->file_offset;
|
|
if (write->file_length == 0)
|
|
status = RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
else {
|
|
status = store_make_space(write->file_length, NULL);
|
|
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
|
|
goto failure;
|
|
}
|
|
}
|
|
|
|
// flush out any remaining buffered pieces to disk
|
|
if (write->buffer_list){
|
|
if (rhizome_random_write(write, 0, NULL, 0) || write->buffer_list) {
|
|
INFOF("Failed to flush write buffer");
|
|
status = RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
goto failure;
|
|
}
|
|
}
|
|
|
|
if (write->file_offset < write->file_length) {
|
|
WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length);
|
|
status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
|
|
goto failure;
|
|
}
|
|
assert(write->file_offset == write->file_length && write->written_offset == write->file_length);
|
|
|
|
if (write->file_length == 0) {
|
|
// whoops, no payload, don't store anything
|
|
DEBUGF(rhizome_store, "Ignoring empty write");
|
|
goto failure;
|
|
}
|
|
|
|
struct crypto_hash_sha512_state hash_state;
|
|
if (write->journal)
|
|
bcopy(&write->sha512_context, &hash_state, sizeof hash_state);
|
|
|
|
rhizome_filehash_t hash_out;
|
|
crypto_hash_sha512_final(&write->sha512_context, hash_out.binary);
|
|
|
|
if (write->id_known) {
|
|
if (cmp_rhizome_filehash_t(&write->id, &hash_out) != 0) {
|
|
WARNF("expected filehash=%s, got %s", alloca_tohex_rhizome_filehash_t(write->id), alloca_tohex_rhizome_filehash_t(hash_out));
|
|
write->id = hash_out;
|
|
status = RHIZOME_PAYLOAD_STATUS_WRONG_HASH;
|
|
goto failure;
|
|
}
|
|
} else
|
|
write->id = hash_out;
|
|
|
|
char blob_path[1024];
|
|
if (!FORMF_RHIZOME_STORE_PATH(blob_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write->temp_id)) {
|
|
WHYF("Failed to generate external blob path");
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
goto failure;
|
|
}
|
|
// If the payload was written into an external blob (file) but is small enough to fit into a
|
|
// SQLite blob, then copy it into a proper blob (this occurs if rhizome_open_write() was called
|
|
// with file_length == RHIZOME_SIZE_UNSET) and max_blob_size > RHIZOME_BUFFER_MAXIMUM_SIZE.
|
|
int external = 0;
|
|
if (write->blob_fd != -1) {
|
|
external = 1;
|
|
if (write->file_length <= config.rhizome.max_blob_size) {
|
|
DEBUGF(rhizome_store, "Copying %zu bytes from external file %s into blob, id=%"PRIu64, (size_t)write->file_offset, blob_path, write->temp_id);
|
|
int ret = 0;
|
|
if (lseek(write->blob_fd, 0, SEEK_SET) == (off_t) -1)
|
|
ret = WHYF_perror("lseek(%d,0,SEEK_SET)", write->blob_fd);
|
|
else if ((write->blob_rowid = rhizome_copy_file_to_blob(write->blob_fd, write->temp_id, (size_t)write->file_length)) == 0)
|
|
ret = -1;
|
|
if (ret == -1) {
|
|
WHY("Failed to copy external file into blob; keeping external file");
|
|
} else {
|
|
external = 0;
|
|
if (unlink(blob_path) == -1)
|
|
WARNF_perror("unlink(%s)", alloca_str_toprint(blob_path));
|
|
}
|
|
}
|
|
DEBUGF(rhizome_store, "Closing fd=%d", write->blob_fd);
|
|
close(write->blob_fd);
|
|
write->blob_fd = -1;
|
|
}
|
|
if (write_release_lock(write)) {
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
goto failure;
|
|
}
|
|
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
|
|
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
|
|
goto dbfailure;
|
|
|
|
// attempt the insert first
|
|
time_ms_t now = gettime_ms();
|
|
int rowcount, changes;
|
|
int stepcode = sqlite_exec_changes_retry_loglevel(
|
|
LOG_LEVEL_INFO,
|
|
&retry, &rowcount, &changes,
|
|
"INSERT INTO FILES(id,length,datavalid,inserttime,last_verified) VALUES(?,?,1,?,?);",
|
|
RHIZOME_FILEHASH_T, &write->id,
|
|
INT64, write->file_length,
|
|
INT64, now,
|
|
INT64, now,
|
|
END
|
|
);
|
|
|
|
if (stepcode == SQLITE_CONSTRAINT){
|
|
// we've already got that payload, delete the new copy
|
|
if (write->blob_rowid){
|
|
sqlite_exec_void_retry_loglevel(LOG_LEVEL_WARN, &retry, "DELETE FROM FILEBLOBS WHERE rowid = ?;",
|
|
INT64, write->blob_rowid, END);
|
|
}
|
|
if (external){
|
|
if (unlink(blob_path) == -1)
|
|
WARNF_perror("unlink(%s)", alloca_str_toprint(blob_path));
|
|
}
|
|
DEBUGF(rhizome_store, "Payload id=%s already present, removed id='%"PRIu64"'", alloca_tohex_rhizome_filehash_t(write->id), write->temp_id);
|
|
status = RHIZOME_PAYLOAD_STATUS_STORED;
|
|
|
|
}else if(sqlite_code_ok(stepcode)){
|
|
|
|
if (external) {
|
|
char dest_path[1024];
|
|
if (!FORM_BLOB_PATH(dest_path, RHIZOME_BLOB_SUBDIR, &write->id))
|
|
goto dbfailure;
|
|
if (emkdirsn(dest_path, strrchr(dest_path,'/') - dest_path, 0700)<0)
|
|
goto dbfailure;
|
|
if (rename(blob_path, dest_path) == -1) {
|
|
WHYF_perror("rename(%s, %s)", blob_path, dest_path);
|
|
goto dbfailure;
|
|
}
|
|
DEBUGF(rhizome_store, "Renamed %s to %s", blob_path, dest_path);
|
|
if (write->journal)
|
|
keep_hash(write, &hash_state);
|
|
}else{
|
|
if (sqlite_exec_void_retry(
|
|
&retry,
|
|
"UPDATE FILEBLOBS SET id = ? WHERE rowid = ?",
|
|
RHIZOME_FILEHASH_T, &write->id,
|
|
INT64, write->blob_rowid,
|
|
END
|
|
) == -1
|
|
)
|
|
goto dbfailure;
|
|
}
|
|
}else
|
|
goto dbfailure;
|
|
|
|
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1)
|
|
goto dbfailure;
|
|
|
|
write->blob_rowid = 0;
|
|
// A test case in tests/rhizomeprotocol depends on this debug message:
|
|
if (status == RHIZOME_PAYLOAD_STATUS_NEW)
|
|
DEBUGF(rhizome_store, "Stored file %s", alloca_tohex_rhizome_filehash_t(write->id));
|
|
|
|
return status;
|
|
|
|
dbfailure:
|
|
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
failure:
|
|
if (status != RHIZOME_PAYLOAD_STATUS_BUSY)
|
|
rhizome_fail_write(write);
|
|
return status;
|
|
}
|
|
|
|
/* Import the payload for an existing manifest with a known file size and hash. Compute the hash of
|
|
* the payload as it is imported, and when finished, check if the size and hash match the manifest.
|
|
* If the import is successful and the size and hash match, return 0. If the size or hash do not
|
|
* match, return 1. If there is an error reading the payload file or writing to the database,
|
|
* return -1.
|
|
*/
|
|
enum rhizome_payload_status rhizome_import_payload_from_file(rhizome_manifest *m, const char *filepath)
|
|
{
|
|
assert(m->filesize != RHIZOME_SIZE_UNSET);
|
|
if (m->filesize == 0)
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
|
|
/* Import the file first, checking the hash as we go */
|
|
struct rhizome_write write;
|
|
bzero(&write, sizeof(write));
|
|
|
|
enum rhizome_payload_status status = rhizome_open_write(&write, &m->filehash, m->filesize);
|
|
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
|
|
return status;
|
|
|
|
// file payload is not in the store yet
|
|
if (rhizome_write_file(&write, filepath, 0, RHIZOME_SIZE_UNSET)){
|
|
rhizome_fail_write(&write);
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
}
|
|
|
|
status = rhizome_finish_write(&write);
|
|
if (status == RHIZOME_PAYLOAD_STATUS_BUSY)
|
|
rhizome_fail_write(&write);
|
|
|
|
return status;
|
|
}
|
|
|
|
// store a whole payload from a single buffer
|
|
enum rhizome_payload_status rhizome_import_buffer(rhizome_manifest *m, uint8_t *buffer, size_t length)
|
|
{
|
|
assert(m->filesize != RHIZOME_SIZE_UNSET);
|
|
if (m->filesize == 0)
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
|
|
if (length != m->filesize) {
|
|
WHYF("Expected %"PRIu64" bytes, got %zu", m->filesize, length);
|
|
return RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
|
|
}
|
|
|
|
/* Import the file first, checking the hash as we go */
|
|
struct rhizome_write write;
|
|
bzero(&write, sizeof(write));
|
|
|
|
enum rhizome_payload_status status = rhizome_open_write(&write, &m->filehash, m->filesize);
|
|
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
|
|
return status;
|
|
|
|
// file payload is not in the store yet
|
|
if (rhizome_write_buffer(&write, buffer, length)){
|
|
rhizome_fail_write(&write);
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
}
|
|
|
|
status = rhizome_finish_write(&write);
|
|
if (status == RHIZOME_PAYLOAD_STATUS_BUSY)
|
|
rhizome_fail_write(&write);
|
|
|
|
return status;
|
|
}
|
|
|
|
/* Checks the size of the file with the given path as a candidate payload for an existing manifest.
|
|
* An empty path (zero length) is taken to mean empty payload (size = 0). If the manifest's
|
|
* 'filesize' is not yet set, then sets the manifest's 'filesize' to the size of the file and
|
|
* returns 0. Otherwise, if the file's size equals the 'filesize' in the manifest, return 0. If
|
|
* the file size does not match the manifest's 'filesize', returns 1. If there is an error calling
|
|
* stat(2) on the payload file (eg, file does not exist), returns -1.
|
|
*/
|
|
enum rhizome_payload_status rhizome_stat_payload_file(rhizome_manifest *m, const char *filepath)
|
|
{
|
|
uint64_t size = 0;
|
|
if (filepath[0]) {
|
|
struct stat stat;
|
|
if (lstat(filepath, &stat)) {
|
|
WHYF_perror("lstat(%s)", alloca_str_toprint(filepath));
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
}
|
|
size = stat.st_size;
|
|
}
|
|
if (m->filesize == RHIZOME_SIZE_UNSET)
|
|
rhizome_manifest_set_filesize(m, size);
|
|
else if (size != m->filesize) {
|
|
DEBUGF(rhizome_store, "payload file %s (size=%"PRIu64") does not match manifest %p filesize=%"PRIu64,
|
|
alloca_str_toprint(filepath), size, m, m->filesize);
|
|
return RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
|
|
}
|
|
return size ? RHIZOME_PAYLOAD_STATUS_NEW : RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
}
|
|
|
|
static enum rhizome_payload_status rhizome_write_derive_key(rhizome_manifest *m, struct rhizome_write *write)
|
|
{
|
|
if (m->payloadEncryption != PAYLOAD_ENCRYPTED)
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
|
|
// if the manifest specifies encryption, make sure we can generate the payload key and encrypt the
|
|
// contents as we go
|
|
if (!rhizome_derive_payload_key(m))
|
|
return RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL;
|
|
|
|
DEBUGF(rhizome_store, "Encrypting payload contents for bid=%s, version=%"PRIu64,
|
|
alloca_tohex_rhizome_bid_t(m->keypair.public_key), m->version);
|
|
|
|
write->crypt=1;
|
|
if (m->is_journal && m->tail > 0)
|
|
write->tail = m->tail;
|
|
|
|
bcopy(m->payloadKey, write->key, sizeof(write->key));
|
|
bcopy(m->payloadNonce, write->nonce, sizeof(write->nonce));
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_write_open_manifest(struct rhizome_write *write, rhizome_manifest *m)
|
|
{
|
|
enum rhizome_payload_status status = rhizome_open_write(
|
|
write,
|
|
m->has_filehash ? &m->filehash : NULL,
|
|
m->filesize
|
|
);
|
|
if (status == RHIZOME_PAYLOAD_STATUS_NEW)
|
|
status = rhizome_write_derive_key(m, write);
|
|
return status;
|
|
}
|
|
|
|
// import a file for a new bundle with an unknown file hash
|
|
// update the manifest with the details of the file
|
|
enum rhizome_payload_status rhizome_store_payload_file(rhizome_manifest *m, const char *filepath)
|
|
{
|
|
// Stream the file directly into the database, encrypting & hashing as we go.
|
|
struct rhizome_write write;
|
|
bzero(&write, sizeof(write));
|
|
enum rhizome_payload_status status = rhizome_write_open_manifest(&write, m);
|
|
int status_ok = 0;
|
|
switch (status) {
|
|
case RHIZOME_PAYLOAD_STATUS_EMPTY:
|
|
case RHIZOME_PAYLOAD_STATUS_NEW:
|
|
status_ok = 1;
|
|
break;
|
|
case RHIZOME_PAYLOAD_STATUS_BUSY:
|
|
case RHIZOME_PAYLOAD_STATUS_STORED:
|
|
case RHIZOME_PAYLOAD_STATUS_TOO_BIG:
|
|
case RHIZOME_PAYLOAD_STATUS_EVICTED:
|
|
case RHIZOME_PAYLOAD_STATUS_ERROR:
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
|
|
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
|
|
return status;
|
|
}
|
|
if (!status_ok)
|
|
FATALF("rhizome_write_open_manifest() returned status = %d", status);
|
|
if (rhizome_write_file(&write, filepath, 0, RHIZOME_SIZE_UNSET) == -1)
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
else
|
|
status = rhizome_finish_write(&write);
|
|
return rhizome_finish_store(&write, m, status);
|
|
}
|
|
|
|
/* Returns RHIZOME_PAYLOAD_STATUS_STORED if file blob found
|
|
* Returns RHIZOME_PAYLOAD_STATUS_NEW if not found
|
|
* Returns RHIZOME_PAYLOAD_STATUS_ERROR if unexpected error
|
|
*/
|
|
enum rhizome_payload_status rhizome_open_read(struct rhizome_read *read, const rhizome_filehash_t *hashp)
|
|
{
|
|
read->id = *hashp;
|
|
read->blob_rowid = 0;
|
|
read->blob_fd = -1;
|
|
read->verified = 0;
|
|
read->offset = 0;
|
|
read->hash_offset = 0;
|
|
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
|
|
int stepcode = sqlite_exec_uint64_retry(&retry, &read->length,"SELECT length FROM FILES WHERE id = ?",
|
|
RHIZOME_FILEHASH_T, &read->id, END);
|
|
if (sqlite_code_busy(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
if (stepcode != SQLITE_ROW){
|
|
if (sqlite_code_ok(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
}
|
|
|
|
assert(read->length>0);
|
|
crypto_hash_sha512_init(&read->sha512_context);
|
|
|
|
char blob_path[1024];
|
|
if (FORM_BLOB_PATH(blob_path, RHIZOME_BLOB_SUBDIR, &read->id)){
|
|
int fd = open(blob_path, O_RDONLY);
|
|
DEBUGF(rhizome_store, "open(%s) = %d", alloca_str_toprint(blob_path), fd);
|
|
|
|
if (fd == -1 && errno == ENOENT){
|
|
char legacy_path[1024];
|
|
// migrate from flat folder to sub-tree's
|
|
if (FORMF_RHIZOME_STORE_PATH(legacy_path, "%s/%s", RHIZOME_BLOB_SUBDIR, alloca_tohex_rhizome_filehash_t(*hashp))){
|
|
struct stat st;
|
|
if (stat(legacy_path, &st) == 0
|
|
&& emkdirsn(legacy_path, strrchr(legacy_path,'/') - legacy_path, 0700)!=-1
|
|
&& rename(legacy_path, blob_path) != -1){
|
|
INFOF("Moved %s to %s", legacy_path, blob_path);
|
|
fd = open(blob_path, O_RDONLY);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (fd == -1){
|
|
if (errno!=ENOENT)
|
|
WHYF_perror("open(%s)", alloca_str_toprint(blob_path));
|
|
}else{
|
|
off64_t pos = lseek64(fd, 0, SEEK_END);
|
|
if (pos == -1)
|
|
WHYF_perror("lseek64(%s,0,SEEK_END)", alloca_str_toprint(blob_path));
|
|
if (read->length <= (uint64_t)pos){
|
|
read->blob_fd = fd;
|
|
DEBUGF(rhizome_store, "Opened stored file %s as fd %d, len %"PRIu64" (%"PRIu64")", blob_path, read->blob_fd, read->length, pos);
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
}
|
|
DEBUGF(rhizome_store, "Ignoring file? %s fd %d, len %"PRIu64", seek %zd", blob_path, fd, read->length, pos);
|
|
close(fd);
|
|
}
|
|
}
|
|
|
|
stepcode = sqlite_exec_uint64_retry(&retry, &read->blob_rowid,
|
|
"SELECT rowid "
|
|
"FROM FILEBLOBS "
|
|
"WHERE id = ?", RHIZOME_FILEHASH_T, &read->id, END);
|
|
|
|
if (sqlite_code_busy(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_BUSY;
|
|
|
|
if (!sqlite_code_ok(stepcode))
|
|
return RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
|
|
if (stepcode == SQLITE_ROW){
|
|
DEBUGF(rhizome_store, "Opened stored blob, rowid %d", read->blob_rowid);
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
}
|
|
// database is inconsistent, clean it up
|
|
rhizome_delete_file(&read->id);
|
|
return RHIZOME_PAYLOAD_STATUS_NEW;
|
|
}
|
|
|
|
static ssize_t rhizome_read_retry(sqlite_retry_state *retry, struct rhizome_read *read_state, unsigned char *buffer, size_t bufsz)
|
|
{
|
|
IN();
|
|
if (read_state->blob_fd != -1) {
|
|
assert(read_state->offset <= read_state->length);
|
|
if (lseek64(read_state->blob_fd, (off64_t) read_state->offset, SEEK_SET) == -1)
|
|
RETURN(WHYF_perror("lseek64(%d,%"PRIu64",SEEK_SET)", read_state->blob_fd, read_state->offset));
|
|
if (bufsz + read_state->offset > read_state->length)
|
|
bufsz = read_state->length - read_state->offset;
|
|
if (bufsz == 0)
|
|
RETURN(0);
|
|
ssize_t rd = read(read_state->blob_fd, buffer, bufsz);
|
|
if (rd == -1)
|
|
RETURN(WHYF_perror("read(%d,%p,%zu)", read_state->blob_fd, buffer, bufsz));
|
|
DEBUGF(rhizome_store, "Read %zu bytes from fd=%d @%"PRIx64, (size_t) rd, read_state->blob_fd, read_state->offset);
|
|
RETURN(rd);
|
|
}
|
|
if (read_state->blob_rowid == 0)
|
|
RETURN(WHY("blob not created"));
|
|
sqlite3_blob *blob = NULL;
|
|
if (sqlite_blob_open_retry(retry, "main", "FILEBLOBS", "data", read_state->blob_rowid, 0 /* read only */, &blob) == -1)
|
|
RETURN(WHY("blob open failed"));
|
|
assert(blob != NULL);
|
|
assert(read_state->length == (uint64_t)sqlite3_blob_bytes(blob));
|
|
// A NULL buffer skips the actual sqlite3_blob_read() call, which is useful just to work out
|
|
// the length.
|
|
size_t bytes_read = 0;
|
|
if (buffer && bufsz && read_state->offset < read_state->length) {
|
|
bytes_read = (size_t)(read_state->length - read_state->offset);
|
|
if (bytes_read > bufsz)
|
|
bytes_read = bufsz;
|
|
assert(bytes_read > 0);
|
|
int ret;
|
|
do {
|
|
ret = sqlite3_blob_read(blob, buffer, (int) bytes_read, read_state->offset);
|
|
} while (sqlite_code_busy(ret) && sqlite_retry(retry, "sqlite3_blob_read"));
|
|
if (ret != SQLITE_OK) {
|
|
WHYF("sqlite3_blob_read() failed: %s", sqlite3_errmsg(rhizome_database.db));
|
|
sqlite_blob_close(blob);
|
|
RETURN(-1);
|
|
}
|
|
}
|
|
sqlite_blob_close(blob);
|
|
RETURN(bytes_read);
|
|
OUT();
|
|
}
|
|
|
|
/* Read content from the store, hashing and decrypting as we go.
|
|
Random access is supported, but hashing requires all payload contents to be read sequentially. */
|
|
// returns the number of bytes read
|
|
ssize_t rhizome_read(struct rhizome_read *read_state, unsigned char *buffer, size_t buffer_length)
|
|
{
|
|
IN();
|
|
// hash check failed, just return an error
|
|
if (read_state->verified == -1)
|
|
RETURN(-1);
|
|
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
ssize_t n = rhizome_read_retry(&retry, read_state, buffer, buffer_length);
|
|
if (n == -1)
|
|
RETURN(-1);
|
|
size_t bytes_read = (size_t) n;
|
|
|
|
// hash the payload as we go, but only if we happen to read the payload data in order
|
|
if (read_state->hash_offset == read_state->offset && buffer && bytes_read>0){
|
|
crypto_hash_sha512_update(&read_state->sha512_context, buffer, bytes_read);
|
|
read_state->hash_offset += bytes_read;
|
|
|
|
// if we hash everything and the hash doesn't match, we need to delete the payload
|
|
if (read_state->hash_offset >= read_state->length){
|
|
rhizome_filehash_t hash_out;
|
|
crypto_hash_sha512_final(&read_state->sha512_context, hash_out.binary);
|
|
if (cmp_rhizome_filehash_t(&read_state->id, &hash_out) != 0) {
|
|
// hash failure, mark the payload as invalid
|
|
read_state->verified = -1;
|
|
RETURN(WHYF("Expected hash=%s, got %s", alloca_tohex_rhizome_filehash_t(read_state->id), alloca_tohex_rhizome_filehash_t(hash_out)));
|
|
}else{
|
|
// we read it, and it's good. Lets remember that (not fatal if the database is locked)
|
|
read_state->verified = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (read_state->crypt && buffer && bytes_read>0){
|
|
if(rhizome_crypt_xor_block(
|
|
buffer, bytes_read,
|
|
read_state->offset + read_state->tail,
|
|
read_state->key, read_state->nonce)){
|
|
RETURN(-1);
|
|
}
|
|
}
|
|
read_state->offset += bytes_read;
|
|
DEBUGF(rhizome_store, "read %zu bytes, read_state->offset=%"PRIu64, bytes_read, read_state->offset);
|
|
RETURN(bytes_read);
|
|
OUT();
|
|
}
|
|
|
|
/* Read len bytes from read->offset into data, using *buffer to cache any reads */
|
|
ssize_t rhizome_read_buffered(struct rhizome_read *read, struct rhizome_read_buffer *buffer, unsigned char *data, size_t len)
|
|
{
|
|
size_t bytes_copied=0;
|
|
|
|
while (len>0){
|
|
//DEBUGF(rhizome_store, "len=%zu read->length=%"PRIu64" read->offset=%"PRIu64" buffer->offset=%"PRIu64"", len, read->length, read->offset, buffer->offset);
|
|
// make sure we only attempt to read data that actually exists
|
|
if (read->length != RHIZOME_SIZE_UNSET && read->offset + len > read->length){
|
|
assert(read->offset <= read->length);
|
|
len = read->length - read->offset;
|
|
}
|
|
|
|
// if we can supply either the beginning or end of the data from cache, do that first.
|
|
if (read->offset >= buffer->offset) {
|
|
assert(read->offset - buffer->offset <= SIZE_MAX);
|
|
size_t ofs = read->offset - buffer->offset;
|
|
if (ofs <= buffer->len){
|
|
size_t size = len;
|
|
if (size > buffer->len - ofs)
|
|
size = buffer->len - ofs;
|
|
if (size > 0){
|
|
// copy into the start of the data buffer
|
|
bcopy(buffer->data + ofs, data, size);
|
|
data+=size;
|
|
len-=size;
|
|
read->offset+=size;
|
|
bytes_copied+=size;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (read->offset + len > buffer->offset) {
|
|
assert(read->offset + len - buffer->offset <= SIZE_MAX);
|
|
size_t ofs = read->offset + len - buffer->offset;
|
|
if (ofs <= buffer->len){
|
|
size_t size = len;
|
|
if (size > ofs)
|
|
size = ofs;
|
|
if (size>0){
|
|
// copy into the end of the data buffer
|
|
bcopy(buffer->data + ofs - size, data + len - size, size);
|
|
len-=size;
|
|
bytes_copied+=size;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
// ok, so we need to read at least one buffer to fulfill the request.
|
|
|
|
// remember the requested read offset so we can put it back
|
|
uint64_t original_ofs = read->offset;
|
|
// round down to the previous block boundary
|
|
uint64_t read_offset = original_ofs & ~(RHIZOME_CRYPT_PAGE_SIZE -1);
|
|
|
|
if (read->length != RHIZOME_SIZE_UNSET && original_ofs + len == read->length){
|
|
// if more than one PAGE is being requested, and the end of the requested range lines up with the end of the file
|
|
// we should probably read the last block first.
|
|
// That way, if the reader is scanning a payload backwards,
|
|
// we will end up caching part of the previous block for the next buffered read
|
|
read_offset = (read->length -1) & ~(RHIZOME_CRYPT_PAGE_SIZE -1);
|
|
}
|
|
|
|
buffer->len = 0;
|
|
buffer->offset = read->offset = read_offset;
|
|
ssize_t r = rhizome_read(read, buffer->data, sizeof buffer->data);
|
|
read->offset = original_ofs;
|
|
if (r == -1)
|
|
return -1;
|
|
buffer->len = (size_t) r;
|
|
}
|
|
return bytes_copied;
|
|
}
|
|
|
|
void rhizome_read_close(struct rhizome_read *read)
|
|
{
|
|
if (read->length == 0)
|
|
// bzero'd & never opened, or already closed
|
|
return;
|
|
|
|
if (read->blob_fd != -1) {
|
|
DEBUGF(rhizome_store, "Closing store fd %d", read->blob_fd);
|
|
close(read->blob_fd);
|
|
read->blob_fd = -1;
|
|
}
|
|
|
|
if (read->verified==-1) {
|
|
// delete payload!
|
|
rhizome_delete_file(&read->id);
|
|
}else if(read->verified==1) {
|
|
// remember when we verified the file
|
|
time_ms_t now = gettime_ms();
|
|
sqlite_exec_void_loglevel(LOG_LEVEL_WARN,
|
|
"UPDATE FILES SET last_verified = ? WHERE id=?",
|
|
INT64, now,
|
|
RHIZOME_FILEHASH_T, &read->id,
|
|
END);
|
|
}
|
|
read->length = 0;
|
|
read->offset = 0;
|
|
read->tail = 0;
|
|
}
|
|
|
|
struct cache_entry{
|
|
struct cache_entry *_left;
|
|
struct cache_entry *_right;
|
|
rhizome_bid_t bundle_id;
|
|
uint64_t version;
|
|
struct rhizome_read read_state;
|
|
time_ms_t expires;
|
|
};
|
|
struct cache_entry *root;
|
|
|
|
static struct cache_entry ** find_entry_location(struct cache_entry **ptr, const rhizome_bid_t *bundle_id, uint64_t version)
|
|
{
|
|
while(*ptr){
|
|
struct cache_entry *entry = *ptr;
|
|
int cmp = cmp_rhizome_bid_t(bundle_id, &entry->bundle_id);
|
|
if (cmp==0){
|
|
if (entry->version==version)
|
|
break;
|
|
if (version < entry->version)
|
|
ptr = &entry->_left;
|
|
else
|
|
ptr = &entry->_right;
|
|
continue;
|
|
}
|
|
if (cmp<0)
|
|
ptr = &entry->_left;
|
|
else
|
|
ptr = &entry->_right;
|
|
}
|
|
return ptr;
|
|
}
|
|
|
|
static time_ms_t close_entries(struct cache_entry **entry, time_ms_t timeout)
|
|
{
|
|
if (!*entry)
|
|
return 0;
|
|
|
|
time_ms_t ret = close_entries(&(*entry)->_left, timeout);
|
|
time_ms_t t_right = close_entries(&(*entry)->_right, timeout);
|
|
if (t_right!=0 && (t_right < ret || ret==0))
|
|
ret=t_right;
|
|
|
|
if ((*entry)->expires < timeout || timeout==0){
|
|
rhizome_read_close(&(*entry)->read_state);
|
|
// remember the two children
|
|
struct cache_entry *left=(*entry)->_left;
|
|
struct cache_entry *right=(*entry)->_right;
|
|
// free this entry
|
|
free(*entry);
|
|
// re-add both children to the tree
|
|
*entry=left;
|
|
if (right){
|
|
entry = find_entry_location(entry, &right->bundle_id, right->version);
|
|
*entry=right;
|
|
}
|
|
}else{
|
|
if ((*entry)->expires < ret || ret==0)
|
|
ret=(*entry)->expires;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
// close any expired cache entries
|
|
static void rhizome_cache_alarm(struct sched_ent *alarm)
|
|
{
|
|
alarm->alarm = close_entries(&root, gettime_ms());
|
|
if (alarm->alarm){
|
|
alarm->deadline = alarm->alarm + 1000;
|
|
schedule(alarm);
|
|
}
|
|
}
|
|
|
|
static struct profile_total cache_alarm_stats={
|
|
.name="rhizome_cache_alarm",
|
|
};
|
|
static struct sched_ent cache_alarm={
|
|
.function = rhizome_cache_alarm,
|
|
.stats = &cache_alarm_stats,
|
|
};
|
|
|
|
// close all cache entries
|
|
int rhizome_cache_close()
|
|
{
|
|
close_entries(&root, 0);
|
|
unschedule(&cache_alarm);
|
|
return 0;
|
|
}
|
|
|
|
static int _rhizome_cache_count(struct cache_entry *entry)
|
|
{
|
|
if (!entry)
|
|
return 0;
|
|
return 1+_rhizome_cache_count(entry->_left)+_rhizome_cache_count(entry->_right);
|
|
}
|
|
|
|
int rhizome_cache_count()
|
|
{
|
|
return _rhizome_cache_count(root);
|
|
}
|
|
|
|
// read a block of data, caching meta data for reuse
|
|
ssize_t rhizome_read_cached(const rhizome_bid_t *bidp, uint64_t version, time_ms_t timeout, uint64_t fileOffset, unsigned char *buffer, size_t length)
|
|
{
|
|
// look for a cached entry
|
|
struct cache_entry **ptr = find_entry_location(&root, bidp, version);
|
|
struct cache_entry *entry = *ptr;
|
|
|
|
// if we don't have one yet, create one and open it
|
|
if (!entry){
|
|
rhizome_filehash_t filehash;
|
|
if (rhizome_database_filehash_from_id(bidp, version, &filehash) != 0){
|
|
DEBUGF(rhizome_store, "Payload not found for bundle bid=%s version=%"PRIu64,
|
|
alloca_tohex_rhizome_bid_t(*bidp), version);
|
|
return -1;
|
|
}
|
|
entry = emalloc_zero(sizeof(struct cache_entry));
|
|
if (entry == NULL)
|
|
return -1;
|
|
enum rhizome_payload_status status = rhizome_open_read(&entry->read_state, &filehash);
|
|
switch (status) {
|
|
case RHIZOME_PAYLOAD_STATUS_EMPTY:
|
|
case RHIZOME_PAYLOAD_STATUS_STORED:
|
|
break;
|
|
case RHIZOME_PAYLOAD_STATUS_NEW:
|
|
free(entry);
|
|
return WHYF("Payload %s not found", alloca_tohex_rhizome_filehash_t(filehash));
|
|
case RHIZOME_PAYLOAD_STATUS_ERROR:
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
|
|
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
|
|
free(entry);
|
|
return WHYF("Error opening payload %s", alloca_tohex_rhizome_filehash_t(filehash));
|
|
default:
|
|
FATALF("status = %d", status);
|
|
}
|
|
entry->bundle_id = *bidp;
|
|
entry->version = version;
|
|
*ptr = entry;
|
|
}
|
|
|
|
entry->read_state.offset = fileOffset;
|
|
if (entry->read_state.length != RHIZOME_SIZE_UNSET && fileOffset >= entry->read_state.length)
|
|
return 0;
|
|
|
|
if (entry->expires < timeout){
|
|
entry->expires = timeout;
|
|
|
|
if (!cache_alarm.alarm){
|
|
cache_alarm.alarm = timeout;
|
|
cache_alarm.deadline = timeout + 1000;
|
|
schedule(&cache_alarm);
|
|
}
|
|
}
|
|
|
|
return rhizome_read(&entry->read_state, buffer, length);
|
|
}
|
|
|
|
/* Returns -1 on error, 0 on success.
|
|
*/
|
|
static int write_file(struct rhizome_read *read, const char *filepath){
|
|
int fd=-1, ret=0;
|
|
|
|
if (filepath&&filepath[0]) {
|
|
fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, 0775);
|
|
if (fd == -1)
|
|
return WHY_perror("open");
|
|
}
|
|
|
|
unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE];
|
|
while((ret=rhizome_read(read, buffer, sizeof(buffer)))>0){
|
|
if (fd!=-1){
|
|
if (write(fd,buffer,ret)!=ret) {
|
|
ret = WHY_perror("Failed to write data to file");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (fd!=-1){
|
|
if (close(fd)==-1)
|
|
ret=WHY_perror("close");
|
|
if (ret<0){
|
|
// TODO delete partial file
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static enum rhizome_payload_status read_derive_key(rhizome_manifest *m, struct rhizome_read *read_state)
|
|
{
|
|
read_state->crypt = m->payloadEncryption == PAYLOAD_ENCRYPTED;
|
|
if (read_state->crypt){
|
|
// if the manifest specifies encryption, make sure we can generate the payload key and encrypt
|
|
// the contents as we go
|
|
if (!rhizome_derive_payload_key(m)) {
|
|
rhizome_read_close(read_state);
|
|
WHY("Unable to decrypt bundle, valid key not found");
|
|
return RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL;
|
|
}
|
|
DEBUGF(rhizome_store, "Decrypting payload contents for bid=%s version=%"PRIu64, alloca_tohex_rhizome_bid_t(m->keypair.public_key), m->version);
|
|
if (m->is_journal && m->tail > 0)
|
|
read_state->tail = m->tail;
|
|
bcopy(m->payloadKey, read_state->key, sizeof(read_state->key));
|
|
bcopy(m->payloadNonce, read_state->nonce, sizeof(read_state->nonce));
|
|
}
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state)
|
|
{
|
|
if (m->filesize == 0 && !m->has_filehash)
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
enum rhizome_payload_status status = rhizome_open_read(read_state, &m->filehash);
|
|
if (status == RHIZOME_PAYLOAD_STATUS_STORED)
|
|
status = read_derive_key(m, read_state);
|
|
return status;
|
|
}
|
|
|
|
/* Extract the file related to a manifest to the file system. The file will be de-crypted and
|
|
* verified while reading. If filepath is not supplied, the file will still be checked.
|
|
*/
|
|
enum rhizome_payload_status rhizome_extract_file(rhizome_manifest *m, const char *filepath)
|
|
{
|
|
struct rhizome_read read_state;
|
|
bzero(&read_state, sizeof read_state);
|
|
enum rhizome_payload_status status = rhizome_open_decrypt_read(m, &read_state);
|
|
if (status == RHIZOME_PAYLOAD_STATUS_STORED) {
|
|
if (write_file(&read_state, filepath) == -1)
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
}
|
|
rhizome_read_close(&read_state);
|
|
return status;
|
|
}
|
|
|
|
/* dump the raw contents of a file
|
|
*/
|
|
enum rhizome_payload_status rhizome_dump_file(const rhizome_filehash_t *hashp, const char *filepath, uint64_t *lengthp)
|
|
{
|
|
struct rhizome_read read_state;
|
|
bzero(&read_state, sizeof read_state);
|
|
enum rhizome_payload_status status = rhizome_open_read(&read_state, hashp);
|
|
if (status == RHIZOME_PAYLOAD_STATUS_STORED) {
|
|
if (write_file(&read_state, filepath) == -1)
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
else if (lengthp)
|
|
*lengthp = read_state.length;
|
|
}
|
|
rhizome_read_close(&read_state);
|
|
return status;
|
|
}
|
|
|
|
// pipe data from one payload to another
|
|
static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write, uint64_t length)
|
|
{
|
|
assert(write->file_offset <= write->file_length);
|
|
if (length > (uint64_t)(write->file_length - write->file_offset))
|
|
return WHY("Unable to pipe that much data");
|
|
|
|
unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE];
|
|
while(length>0){
|
|
size_t size=sizeof(buffer);
|
|
if (size > length)
|
|
size=length;
|
|
|
|
ssize_t r = rhizome_read(read, buffer, size);
|
|
if (r == -1)
|
|
return r;
|
|
|
|
length -= (size_t) r;
|
|
|
|
if (rhizome_write_buffer(write, buffer, (size_t) r))
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
uint64_t rhizome_copy_file_to_blob(int fd, uint64_t id, size_t size)
|
|
{
|
|
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
|
|
|
|
// use an explicit transaction so we can delay I/O failures until COMMIT so they can be retried.
|
|
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
|
|
return 0;
|
|
|
|
sqlite3_blob *blob = NULL;
|
|
|
|
uint64_t rowid = rhizome_create_fileblob(&retry, id, size);
|
|
if (rowid == 0)
|
|
goto fail;
|
|
|
|
if (sqlite_blob_open_retry(&retry, "main", "FILEBLOBS", "data", rowid, 1 /* read/write */, &blob) == -1)
|
|
goto fail;
|
|
|
|
uint8_t buf[16384];
|
|
size_t offset = 0;
|
|
while (offset < size) {
|
|
size_t toread = size - offset;
|
|
if (toread > sizeof buf)
|
|
toread = sizeof buf;
|
|
ssize_t nread = read(fd, buf, toread);
|
|
if (nread == -1) {
|
|
WHYF_perror("read(%d,%p,%zu)", fd, buf, toread);
|
|
goto fail;
|
|
}
|
|
if ((size_t)nread == 0) {
|
|
WHYF("read(%d,%p,%zu) returned 0", fd, buf, toread);
|
|
goto fail;
|
|
}
|
|
if (sqlite_blob_write_retry(&retry, blob, buf, (int)nread, (int)offset) == -1)
|
|
goto fail;
|
|
assert((size_t)nread <= toread);
|
|
offset += (size_t)nread;
|
|
}
|
|
assert(offset == size);
|
|
sqlite_blob_close(blob);
|
|
blob = NULL;
|
|
|
|
if (sqlite_exec_void_retry(&retry, "COMMIT;", END) == -1)
|
|
goto fail;
|
|
|
|
return rowid;
|
|
|
|
fail:
|
|
if (blob)
|
|
sqlite_blob_close(blob);
|
|
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
|
|
return 0;
|
|
}
|
|
|
|
static int append_existing_journal_file(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t length){
|
|
// Try to append directly into the previous journal file, linking them together
|
|
DEBUGF(rhizome, "Attempting to append into journal blob");
|
|
// First, we need to read a previous partial hash state
|
|
char existing_path[1024];
|
|
if (!FORM_BLOB_PATH(existing_path, RHIZOME_BLOB_SUBDIR, hashp))
|
|
return WHYF("existing path too long?");
|
|
|
|
int payloadfd = open(existing_path, O_RDWR, 0664);
|
|
if (payloadfd<0){
|
|
if (errno != ENOENT)
|
|
WHYF_perror("Failed to open existing journal payload %s", existing_path);
|
|
else
|
|
DEBUGF(rhizome, "No existing journal payloadpartial hash state");
|
|
return -1;
|
|
}
|
|
|
|
off64_t pos = lseek64(payloadfd, 0, SEEK_END);
|
|
if ((uint64_t)pos != length){
|
|
DEBUGF(rhizome, "Existing journal file is not the right length");
|
|
close(payloadfd);
|
|
return -1;
|
|
}
|
|
|
|
char hash_path[1024];
|
|
if (!FORM_BLOB_PATH(hash_path, RHIZOME_HASH_SUBDIR, hashp)){
|
|
close(payloadfd);
|
|
return WHYF("hash path too long?");
|
|
}
|
|
|
|
int hashfd = open(hash_path, O_RDONLY);
|
|
if (hashfd < 0){
|
|
if (errno != ENOENT)
|
|
WHYF_perror("Failed to open partial hash state %s", hash_path);
|
|
else
|
|
DEBUGF(rhizome, "No partial hash state");
|
|
close(payloadfd);
|
|
return -1;
|
|
}
|
|
|
|
struct crypto_hash_sha512_state hash_state;
|
|
ssize_t r = read(hashfd, &hash_state, sizeof hash_state);
|
|
close(hashfd);
|
|
|
|
if (r != sizeof hash_state){
|
|
close(payloadfd);
|
|
return WHYF("Expected %u bytes", (unsigned)sizeof hash_state);
|
|
}
|
|
|
|
char new_path[1024];
|
|
if (!FORMF_RHIZOME_STORE_PATH(new_path, "%s/%"PRIu64, RHIZOME_BLOB_SUBDIR, write->temp_id)){
|
|
close(payloadfd);
|
|
return WHYF("Temp path too long?");
|
|
}
|
|
|
|
if (link(existing_path, new_path)==-1){
|
|
close(payloadfd);
|
|
return WHYF_perror("Failed to link journal payloads together");
|
|
}
|
|
|
|
// (write_data always seeks so we don't have to)
|
|
write->written_offset = write->file_offset = length;
|
|
write->blob_fd = payloadfd;
|
|
bcopy(&hash_state, &write->sha512_context, sizeof hash_state);
|
|
|
|
// Used by tests
|
|
DEBUGF(rhizome,"Reusing journal payload file, keeping %"PRIu64" existing bytes", length);
|
|
return 1;
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_journal_pipe(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t start_offset, uint64_t length)
|
|
{
|
|
if (length==0)
|
|
return RHIZOME_PAYLOAD_STATUS_EMPTY;
|
|
|
|
struct rhizome_read read_state;
|
|
bzero(&read_state, sizeof read_state);
|
|
assert(!write->crypt);
|
|
|
|
DEBUGF(rhizome, "Piping journal from %"PRIu64", len %"PRIu64" to %"PRIu64,
|
|
start_offset, length, write->file_offset);
|
|
|
|
if (start_offset == 0 && write->file_offset == 0
|
|
&& append_existing_journal_file(write, hashp, length)!=-1){
|
|
return RHIZOME_PAYLOAD_STATUS_STORED;
|
|
}
|
|
|
|
enum rhizome_payload_status status = rhizome_open_read(&read_state, hashp);
|
|
if (status == RHIZOME_PAYLOAD_STATUS_NEW || status == RHIZOME_PAYLOAD_STATUS_EMPTY)
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
if (status == RHIZOME_PAYLOAD_STATUS_STORED) {
|
|
read_state.offset = start_offset;
|
|
if (rhizome_pipe(&read_state, write, length) == -1)
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
}
|
|
rhizome_read_close(&read_state);
|
|
return status;
|
|
}
|
|
|
|
// open an existing journal bundle, advance the head pointer, duplicate the existing content and get ready to add more.
|
|
enum rhizome_payload_status rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, uint64_t advance_by, uint64_t append_size)
|
|
{
|
|
assert(m->is_journal);
|
|
assert(m->filesize != RHIZOME_SIZE_UNSET);
|
|
assert(advance_by <= m->filesize);
|
|
uint64_t copy_length = m->filesize - advance_by;
|
|
uint64_t new_filesize = RHIZOME_SIZE_UNSET;
|
|
if (append_size != RHIZOME_SIZE_UNSET) {
|
|
assert(m->filesize + append_size > m->filesize); // no wraparound
|
|
new_filesize = m->filesize + append_size - advance_by;
|
|
}
|
|
if (advance_by > 0)
|
|
rhizome_manifest_set_tail(m, m->tail + advance_by);
|
|
|
|
enum rhizome_payload_status status = rhizome_open_write(write, NULL, new_filesize);
|
|
DEBUGF(rhizome, "rhizome_open_write() returned %d %s", status, rhizome_payload_status_message(status));
|
|
if (status == RHIZOME_PAYLOAD_STATUS_NEW) {
|
|
write->journal=1;
|
|
|
|
if (copy_length > 0){
|
|
// we don't need to bother decrypting the existing journal payload
|
|
enum rhizome_payload_status rstatus = rhizome_journal_pipe(write, &m->filehash, advance_by, copy_length);
|
|
DEBUGF(rhizome, "rhizome_journal_pipe() returned %d %s", rstatus, rhizome_payload_status_message(rstatus));
|
|
int rstatus_valid = 0;
|
|
switch (rstatus) {
|
|
case RHIZOME_PAYLOAD_STATUS_EMPTY:
|
|
case RHIZOME_PAYLOAD_STATUS_NEW:
|
|
case RHIZOME_PAYLOAD_STATUS_STORED:
|
|
rstatus_valid = 1;
|
|
break;
|
|
case RHIZOME_PAYLOAD_STATUS_BUSY:
|
|
case RHIZOME_PAYLOAD_STATUS_ERROR:
|
|
case RHIZOME_PAYLOAD_STATUS_TOO_BIG:
|
|
rstatus_valid = 1;
|
|
status = rstatus;
|
|
break;
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
|
|
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
|
|
case RHIZOME_PAYLOAD_STATUS_EVICTED:
|
|
// rhizome_journal_pipe() should not return any of these codes
|
|
FATALF("rhizome_journal_pipe() returned %d %s", rstatus, rhizome_payload_status_message(rstatus));
|
|
}
|
|
if (!rstatus_valid)
|
|
FATALF("rstatus = %d", rstatus);
|
|
}
|
|
}
|
|
if (status == RHIZOME_PAYLOAD_STATUS_NEW) {
|
|
assert(write->file_offset == copy_length);
|
|
status = rhizome_write_derive_key(m, write);
|
|
DEBUGF(rhizome, "rhizome_write_derive_key() returned %d %s", status, rhizome_payload_status_message(status));
|
|
}
|
|
if (status != RHIZOME_PAYLOAD_STATUS_NEW) {
|
|
rhizome_fail_write(write);
|
|
}
|
|
return status;
|
|
}
|
|
|
|
// Call to finish any payload store operation
|
|
enum rhizome_payload_status rhizome_finish_store(struct rhizome_write *write, rhizome_manifest *m, enum rhizome_payload_status status)
|
|
{
|
|
DEBUGF(rhizome, "write=%p m=manifest %p, status=%d %s", write, m, status, rhizome_payload_status_message_nonnull(status));
|
|
int status_valid = 0;
|
|
switch (status) {
|
|
case RHIZOME_PAYLOAD_STATUS_EMPTY:
|
|
status_valid = 1;
|
|
assert(write->file_length == 0);
|
|
break;
|
|
case RHIZOME_PAYLOAD_STATUS_NEW:
|
|
assert(write->file_length != RHIZOME_SIZE_UNSET);
|
|
status_valid = 1;
|
|
break;
|
|
case RHIZOME_PAYLOAD_STATUS_STORED:
|
|
assert(write->file_length != RHIZOME_SIZE_UNSET);
|
|
status_valid = 1;
|
|
// TODO: check that stored hash matches received payload's hash
|
|
break;
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
|
|
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
|
|
case RHIZOME_PAYLOAD_STATUS_TOO_BIG:
|
|
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
|
|
case RHIZOME_PAYLOAD_STATUS_EVICTED:
|
|
case RHIZOME_PAYLOAD_STATUS_ERROR:
|
|
case RHIZOME_PAYLOAD_STATUS_BUSY:
|
|
status_valid = 1;
|
|
rhizome_fail_write(write);
|
|
return status;
|
|
}
|
|
if (!status_valid)
|
|
FATALF("status = %d", status);
|
|
// Fill in missing manifest fields and check consistency with existing fields.
|
|
if (m->is_journal || m->filesize == RHIZOME_SIZE_UNSET)
|
|
rhizome_manifest_set_filesize(m, write->file_length);
|
|
else if (m->filesize != write->file_length) {
|
|
DEBUGF(rhizome, "m->filesize=%"PRIu64", write->file_length=%"PRIu64, m->filesize, write->file_length);
|
|
return RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
|
|
}
|
|
if (m->is_journal) {
|
|
// TODO ensure new version is greater than previous version
|
|
rhizome_manifest_set_version(m, m->tail + m->filesize);
|
|
}
|
|
if (m->filesize) {
|
|
if (m->is_journal || !m->has_filehash)
|
|
rhizome_manifest_set_filehash(m, &write->id);
|
|
else if (cmp_rhizome_filehash_t(&write->id, &m->filehash) != 0) {
|
|
DEBUGF(rhizome, "m->filehash=%s, write->id=%s", alloca_tohex_rhizome_filehash_t(m->filehash), alloca_tohex_rhizome_filehash_t(write->id));
|
|
return RHIZOME_PAYLOAD_STATUS_WRONG_HASH;
|
|
}
|
|
} else if (m->is_journal)
|
|
rhizome_manifest_del_filehash(m);
|
|
else if (m->has_filehash)
|
|
return RHIZOME_PAYLOAD_STATUS_WRONG_HASH;
|
|
return status;
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_append_journal_buffer(rhizome_manifest *m, uint64_t advance_by, uint8_t *buffer, size_t len)
|
|
{
|
|
struct rhizome_write write;
|
|
bzero(&write, sizeof write);
|
|
assert(advance_by || (buffer && len));
|
|
enum rhizome_payload_status status = rhizome_write_open_journal(&write, m, advance_by, (uint64_t) len);
|
|
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
|
|
return status;
|
|
if (buffer && len && rhizome_write_buffer(&write, buffer, len) == -1)
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
else
|
|
status = rhizome_finish_write(&write);
|
|
return rhizome_finish_store(&write, m, status);
|
|
}
|
|
|
|
enum rhizome_payload_status rhizome_append_journal_file(rhizome_manifest *m, uint64_t advance_by, const char *filename)
|
|
{
|
|
struct stat stat;
|
|
if (lstat(filename,&stat))
|
|
return WHYF_perror("stat(%s)", alloca_str_toprint(filename));
|
|
struct rhizome_write write;
|
|
bzero(&write, sizeof write);
|
|
enum rhizome_payload_status status = rhizome_write_open_journal(&write, m, advance_by, stat.st_size);
|
|
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
|
|
return status;
|
|
if (stat.st_size != 0 && rhizome_write_file(&write, filename, 0, RHIZOME_SIZE_UNSET) == -1)
|
|
status = RHIZOME_PAYLOAD_STATUS_ERROR;
|
|
else
|
|
status = rhizome_finish_write(&write);
|
|
return rhizome_finish_store(&write, m, status);
|
|
}
|