Create files for large rhizome bundles

- configurable size threashold
This commit is contained in:
gardners 2013-02-16 20:25:26 +10:30 committed by Jeremy Lakeman
parent 0ac403717d
commit 641d749ab4
10 changed files with 373 additions and 191 deletions

View File

@ -310,6 +310,7 @@ STRUCT(rhizome)
ATOM(int, enable, 1, cf_opt_int_boolean,, "If true, server opens Rhizome database when starting")
STRING(256, datastore_path, "", cf_opt_absolute_path,, "Path of rhizome storage directory, absolute or relative to instance directory")
ATOM(uint64_t, database_size, 1000000, cf_opt_uint64_scaled,, "Size of database in bytes")
ATOM(uint64_t, max_internal_blob_size, 10000000000, cf_opt_uint64_scaled,, "Size of largest bundle to store internally in database.")
ATOM(uint32_t, fetch_delay_ms, 50, cf_opt_uint32_nonzero,, "Delay from receiving first bundle advert to initiating fetch")
SUB_STRUCT(rhizome_direct, direct,)
SUB_STRUCT(rhizome_api, api,)

View File

@ -54,26 +54,14 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
TODO: If we have a newer version of the manifest, and the manifest is a
journal, then the newer version is okay to use to service this request.
*/
long long row_id=-1;
if (sqlite_exec_int64(&row_id, "SELECT rowid FROM FILEBLOBS WHERE id IN (SELECT filehash FROM MANIFESTS WHERE manifests.version=%lld AND manifests.id='%s');",
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]),
alloca_tohex_bid(&mdp->out.payload[0])) < 1)
{
DEBUGF("Couldn't find stored file.");
RETURN(-1);
}
sqlite3_blob *blob=NULL;
int ret=sqlite3_blob_open(rhizome_db, "main", "fileblobs", "data",
row_id, 0 /* read only */, &blob);
if (ret!=SQLITE_OK)
{
DEBUGF("Failed to open blob: %s",sqlite3_errmsg(rhizome_db));
RETURN(-1);
}
int blob_bytes=sqlite3_blob_bytes(blob);
if (blob_bytes<fileOffset) {
sqlite3_blob_close(blob); blob=NULL;
rhizome_blob_handle *blob=NULL;
blob=rhizome_database_open_blob_bybid
(alloca_tohex_bid(&mdp->out.payload[0]),
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]), 0 /* read only */);
if (blob->blob_bytes<fileOffset) {
rhizome_database_blob_close(blob); blob=NULL;
RETURN(-1);
}
@ -127,26 +115,26 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
uint64_t blockOffset=fileOffset+i*blockLength;
write_uint64(&reply.out.payload[1+16+8],blockOffset);
// work out how many bytes to read
int blockBytes=blob_bytes-blockOffset;
int blockBytes=blob->blob_bytes-blockOffset;
if (blockBytes>blockLength) blockBytes=blockLength;
// read data for block
if (blob_bytes>=blockOffset) {
if (blob->blob_bytes>=blockOffset) {
if (overlay_queue_remaining(reply.out.queue) < 10)
break;
sqlite3_blob_read(blob,&reply.out.payload[1+16+8+8],
blockBytes,blockOffset);
rhizome_database_blob_read(blob,&reply.out.payload[1+16+8+8],
blockBytes,blockOffset);
reply.out.payload_length=1+16+8+8+blockBytes;
// Mark terminal block if required
if (blockOffset+blockBytes==blob_bytes) reply.out.payload[0]='T';
if (blockOffset+blockBytes==blob->blob_bytes) reply.out.payload[0]='T';
// send packet
if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0))
break;
} else break;
}
sqlite3_blob_close(blob); blob=NULL;
rhizome_database_blob_close(blob); blob=NULL;
RETURN(-1);
}

View File

@ -460,8 +460,8 @@ struct http_response {
int rhizome_received_content(unsigned char *bidprefix,uint64_t version,
uint64_t offset,int count,unsigned char *bytes,
int type);
int64_t rhizome_database_create_blob_for(const char *hashhex,int64_t fileLength,
int priority);
int64_t rhizome_database_create_blob_for(const char *filehashhex_or_tempid,
int64_t fileLength,int priority);
int rhizome_server_set_response(rhizome_http_request *r, const struct http_response *h);
int rhizome_server_free_http_request(rhizome_http_request *r);
int rhizome_server_http_send_bytes(rhizome_http_request *r);
@ -658,5 +658,22 @@ int rhizome_open_read(struct rhizome_read *read, const char *fileid, int hash);
int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_length);
int rhizome_extract_file(rhizome_manifest *m, const char *filepath, rhizome_bk_t *bsk);
int rhizome_dump_file(const char *id, const char *filepath, int64_t *length);
char *rhizome_database_get_blob_filename(int64_t fileblob_rowid);
typedef struct rhizome_blob_handle {
uint64_t blob_bytes;
sqlite3_blob *sqlite_blob;
int fd_blob;
} rhizome_blob_handle;
rhizome_blob_handle *rhizome_database_open_blob_bybid(const char *id,
uint64_t version,
int writeP);
rhizome_blob_handle *rhizome_database_open_blob_byrowid(int row_id,int writeP);
int rhizome_database_blob_close(rhizome_blob_handle *blob);
int rhizome_database_blob_read(rhizome_blob_handle *blob,unsigned char *buffer,
uint64_t count,uint64_t offset);
int rhizome_database_blob_write(rhizome_blob_handle *blob,unsigned char *buffer,
uint64_t count,uint64_t offset);
const char *rhizome_database_blob_errmsg(rhizome_blob_handle *blob);
#endif //__SERVALDNA__RHIZOME_H

View File

@ -300,8 +300,6 @@ int rhizome_opendb()
All changes should attempt to preserve any existing data */
// We can't delete a file that is being transferred in another process at this very moment...
// TODO don't cleanup before every command line operation...
rhizome_cleanup();
RETURN(0);
}
@ -666,6 +664,144 @@ long long rhizome_database_used_bytes()
return db_page_size * (db_page_count - db_free_page_count);
}
char blobfile[1024];
char *rhizome_database_get_blob_filename(int64_t rowid)
{
const char *blobpath=config.rhizome.datastore_path;
if (!blobpath||!blobpath[0]) {
blobpath=serval_instancepath();
}
if (!blobpath) return NULL;
snprintf(blobfile,1024,"%s/blob.%lld",blobpath,rowid);
return blobfile;
}
rhizome_blob_handle *rhizome_database_open_blob_bybid(const char *id,
uint64_t version,
int writeP)
{
IN();
long long row_id=-1;
if (sqlite_exec_int64(&row_id, "SELECT rowid FROM FILEBLOBS WHERE id IN (SELECT filehash FROM MANIFESTS WHERE manifests.version=%lld AND manifests.id='%s');",
version,id) < 1)
{
DEBUGF("Couldn't find stored bundle.");
#if 0
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT rowid,id,filehash FROM MANIFESTS");
while(sqlite_step_retry(&retry, statement)==SQLITE_ROW){
sqlite3_int64 rowid = sqlite3_column_int64(statement, 0);
const char *idhex = (const char *) sqlite3_column_text(statement, 1);
const char *filehashhex = (const char *) sqlite3_column_text(statement, 2);
DEBUGF("rowid=%lld, id='%s', filehash='%s'",
rowid,idhex,filehashhex);
}
sqlite3_finalize(statement);
#endif
RETURN(NULL);
}
if (row_id==-1) {
DEBUGF("Couldn't find stored bundle.");
RETURN(NULL);
}
RETURN(rhizome_database_open_blob_byrowid(row_id,writeP));
}
rhizome_blob_handle *rhizome_database_open_blob_byrowid(int row_id,int writeP)
{
IN();
struct rhizome_blob_handle *blob=calloc(sizeof(struct rhizome_blob_handle),1);
if (!blob) RETURN(NULL);
DEBUGF("Opening blob for rowid #%d",row_id);
// Try opening as internal blob.
// If column is not a blob, then this will fail.
int ret=sqlite3_blob_open(rhizome_db, "main", "fileblobs", "data",
row_id, writeP, &blob->sqlite_blob);
if (ret==SQLITE_OK) {
blob->blob_bytes=sqlite3_blob_bytes(blob->sqlite_blob);
RETURN(blob);
}
// Try opening as an external file
{
char *blobfile=rhizome_database_get_blob_filename(row_id);
errno=0;
blob->fd_blob=open(blobfile,O_RDWR);
if (blob->fd_blob==-1&&writeP) blob->fd_blob=open(blobfile,O_CREAT|O_RDWR,0664);
if (blob->fd_blob>-1) {
// File is stored externally
blob->blob_bytes=lseek(blob->fd_blob,0,SEEK_END);
DEBUGF("Opened fileblobs blob file '%s' (%lld bytes)",
blobfile,blob->blob_bytes);
RETURN(blob);
}
DEBUGF("Could not open fileblobs blob file '%s', will try sqlite blob",
blobfile);
// WHY_perror("open");
}
// Couldn't open, so fail
free(blob);
RETURN(NULL);
}
int rhizome_database_blob_close(rhizome_blob_handle *blob)
{
if (!blob) return 0;
if (blob->sqlite_blob) sqlite3_blob_close(blob->sqlite_blob);
if (blob->fd_blob) close(blob->fd_blob);
bzero(blob,sizeof(struct rhizome_blob_handle));
return 0;
}
int rhizome_database_blob_read(rhizome_blob_handle *blob,unsigned char *out,
uint64_t count,uint64_t offset)
{
IN();
if (!blob) RETURN(-1);
if (blob->sqlite_blob) RETURN(sqlite3_blob_read(blob->sqlite_blob,
out,count,offset));
if (blob->fd_blob) {
lseek(blob->fd_blob,offset,SEEK_SET);
int r=read(blob->fd_blob,out,count);
if (r==count) RETURN(SQLITE_OK);
}
RETURN(-1);
}
int rhizome_database_blob_write(rhizome_blob_handle *blob,unsigned char *buffer,
uint64_t count,uint64_t offset)
{
IN();
if (!blob) RETURN(-1);
if (blob->sqlite_blob) RETURN(sqlite3_blob_write(blob->sqlite_blob,
buffer,count,offset));
if (blob->fd_blob) {
DEBUGF("Writing to external file backed blob");
if (lseek(blob->fd_blob,offset,SEEK_SET)<0)
RETURN(WHYF("lseek(fd,%lld,SEEK_SET) failed",offset));
int r=write(blob->fd_blob,buffer,count);
DEBUGF(" wrote %d of %lld bytes",r,count);
if (r==count) RETURN(SQLITE_OK);
}
RETURN(-1);
}
const char *rhizome_database_blob_errmsg(rhizome_blob_handle *blob)
{
if (!blob) return "blob is null";
if (blob->sqlite_blob) return sqlite3_errmsg(rhizome_db);
if (blob->fd_blob) return strerror(errno);
return "blob has no open channel";
}
void rhizome_cleanup()
{
IN();
@ -677,8 +813,27 @@ void rhizome_cleanup()
if (sqlite_exec_void("DELETE FROM FILES WHERE inserttime < %lld AND datavalid=1 AND NOT EXISTS( SELECT 1 FROM MANIFESTS WHERE MANIFESTS.filehash = FILES.id);", gettime_ms() - 1000)) {
WARNF("delete failed: %s", sqlite3_errmsg(rhizome_db));
}
if (sqlite_exec_void("DELETE FROM FILEBLOBS WHERE NOT EXISTS ( SELECT 1 FROM FILES WHERE FILES.id = FILEBLOBS.id );")) {
WARNF("delete failed: %s", sqlite3_errmsg(rhizome_db));
// Clean up unreferenced blobs and files
{
char sqlcmd[1024];
strbuf b = strbuf_local(sqlcmd, sizeof sqlcmd);
strbuf_puts(b, "SELECT rowid FROM fileblobs WHERE TYPEOF(data)<>\"blob\" AND NOT EXISTS ( SELECT 1 FROM FILES WHERE FILES.id = FILEBLOBS.id );");
if (!strbuf_overrun(b)) {
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "%s", strbuf_str(b));
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
int64_t rowid = sqlite3_column_int64(statement, 0);
char *blobfile=rhizome_database_get_blob_filename(rowid);
if (blobfile) {
INFOF("Cleaning up orphaned blob file %s",blobfile);
unlink(blobfile);
}
}
sqlite3_finalize(statement);
}
if (sqlite_exec_void("DELETE FROM FILEBLOBS WHERE NOT EXISTS ( SELECT 1 FROM FILES WHERE FILES.id = FILEBLOBS.id );")) {
WARNF("delete failed: %s", sqlite3_errmsg(rhizome_db));
}
}
OUT();
}
@ -779,6 +934,12 @@ int rhizome_drop_stored_file(const char *id,int maximum_priority)
sqlite3_finalize(statement);
if (can_drop) {
sqlite_exec_void_retry(&retry, "delete from files where id='%s';",id);
int64_t fileblob_rowid=-1;
sqlite_exec_int64_retry(&retry,&fileblob_rowid,"select rowid from fileblobs where id='%s';",id);
if (fileblob_rowid>-1) {
char *blobfile=rhizome_database_get_blob_filename(fileblob_rowid);
if (blobfile) unlink(blobfile);
}
sqlite_exec_void_retry(&retry, "delete from fileblobs where id='%s';",id);
}
return 0;
@ -1112,12 +1273,15 @@ cleanup:
RETURN(ret);
}
int64_t rhizome_database_create_blob_for(const char *hashhex,int64_t fileLength,
int priority)
int64_t rhizome_database_create_blob_for(const char *filehashhex_or_tempid,
int64_t fileLength, int priority)
{
IN();
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
DEBUGF("Creating blob for (filehash or temp id)='%s', length=%d, priority=%d",
filehashhex_or_tempid,fileLength,priority);
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") != SQLITE_OK)
RETURN(WHY("Failed to begin transaction"));
@ -1132,28 +1296,44 @@ int64_t rhizome_database_create_blob_for(const char *hashhex,int64_t fileLength,
int ret=sqlite_exec_void_retry(&retry,
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES('%s',%lld,%d,0,%lld);",
hashhex, (long long)fileLength, priority, (long long)gettime_ms()
);
filehashhex_or_tempid, (long long)fileLength,
priority, (long long)gettime_ms());
if (ret!=SQLITE_OK) {
DEBUGF("insert or replace into files ... failed: %s",
sqlite3_errmsg(rhizome_db));
goto insert_row_fail;
}
sqlite3_stmt *statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",hashhex);
if (!statement)
goto insert_row_fail;
/* Bind appropriate sized zero-filled blob to data field */
if (sqlite3_bind_zeroblob(statement, 1, fileLength) != SQLITE_OK) {
WHYF("sqlite3_bind_zeroblob() failed: %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
sqlite3_finalize(statement);
goto insert_row_fail;
sqlite3_int64 fileblob_rowid=sqlite3_last_insert_rowid(rhizome_db);
sqlite3_stmt *statement=NULL;
if (fileLength<config.rhizome.max_internal_blob_size) {
statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",filehashhex_or_tempid);
if (!statement)
goto insert_row_fail;
/* Bind appropriate sized zero-filled blob to data field */
if (sqlite3_bind_zeroblob(statement, 1, fileLength) != SQLITE_OK) {
WHYF("sqlite3_bind_zeroblob() failed: %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
sqlite3_finalize(statement);
goto insert_row_fail;
}
} else {
char *blobfile=rhizome_database_get_blob_filename(fileblob_rowid);
DEBUGF("Attempting to put blob for %s in %s",
filehashhex_or_tempid,blobfile?blobfile:"(null)");
int fd=open(blobfile, O_CREAT | O_TRUNC | O_WRONLY, 0664);
if (fd<0) goto insert_row_fail;
else DEBUGF("Blob file created (fd=%d)",fd);
close(fd);
statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',%lld)",filehashhex_or_tempid,fileblob_rowid);
if (!statement)
goto insert_row_fail;
}
/* Do actual insert, and abort if it fails */
if (_sqlite_exec_void_prepared(__WHENCE__, LOG_LEVEL_ERROR, &retry, statement) == -1) {
insert_row_fail:
WHYF("Failed to insert row for fileid=%s", hashhex);
insert_row_fail:
WHYF("Failed to insert row for fileid=%s", filehashhex_or_tempid);
sqlite_exec_void_retry(&retry, "ROLLBACK;");
RETURN(-1);
}
@ -1166,7 +1346,7 @@ insert_row_fail:
sqlite_exec_void_retry(&retry, "ROLLBACK;");
RETURN(WHYF("Failed to commit transaction"));
}
DEBUGF("Got rowid %lld for %s", rowid, hashhex);
DEBUGF("Got rowid %lld for %s", rowid, filehashhex_or_tempid);
RETURN(rowid);
}

View File

@ -885,6 +885,8 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
DEBUGF("bundle file hash = '%s'",hash);
long long filesize = rhizome_manifest_get_ll(m, "filesize");
DEBUGF("file size = %lld",filesize);
long long version = rhizome_manifest_get_ll(m, "version");
DEBUGF("version = %lld",version);
/* We now have everything we need to compose the POST request and send it.
*/
@ -947,13 +949,9 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
}
/* send file contents now */
long long rowid = -1;
sqlite3_blob *blob=NULL;
sqlite_exec_int64(&rowid, "select rowid from fileblobs where id='%s';", hash);
DEBUGF("Reading from rowid #%lld filehash='%s'",rowid,hash?hash:"(null)");
if (rowid >= 0 && sqlite3_blob_open(rhizome_db, "main", "fileblobs", "data",
rowid, 0, &blob) != SQLITE_OK)
goto closeit;
rhizome_blob_handle *blob=rhizome_database_open_blob_bybid(id,version,
0 /* read */);
if (!blob) goto closeit;
int i;
for(i=0;i<filesize;)
{
@ -961,12 +959,12 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
if (filesize-i<count) count=filesize-i;
unsigned char buffer[4096];
DEBUGF("reading %d bytes @ %d from blob",count,i);
int sr=sqlite3_blob_read(blob,buffer,count,i);
int sr=rhizome_database_blob_read(blob,buffer,count,i);
if (sr==SQLITE_OK||sr==SQLITE_DONE) {
count=write(sock,buffer,count);
if (count<0) {
WHY_perror("write");
sqlite3_blob_close(blob);
rhizome_database_blob_close(blob);
goto closeit;
} else {
i+=count;
@ -974,12 +972,12 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
}
} else {
WHYF("sqlite error #%d occurred reading from the blob: %s",sr, sqlite3_errmsg(rhizome_db));
sqlite3_blob_close(blob);
rhizome_database_blob_close(blob);
goto closeit;
}
}
sqlite3_blob_close(blob);
rhizome_database_blob_close(blob);
/* Send final mime boundary */
len=snprintf(buffer,8192,"\r\n--%s--\r\n",boundary);
sent=0;

View File

@ -530,7 +530,7 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
/* TODO We should stream file straight into the database */
slot->start_time=gettime_ms();
if (create_rhizome_import_dir() == -1)
return WHY("Unable to create import directory");
RETURN(WHY("Unable to create import directory"));
if (slot->manifest) {
slot->file_len=slot->manifest->fileLength;
slot->rowid=
@ -538,8 +538,8 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->file_len,
RHIZOME_PRIORITY_DEFAULT);
if (slot->rowid<0) {
return WHYF_perror("Could not obtain rowid for blob for file '%s'",
alloca_tohex_sid(slot->bid));
RETURN(WHYF_perror("Could not obtain rowid for blob for file '%s'",
alloca_tohex_sid(slot->bid)));
}
} else {
slot->rowid=-1;
@ -1316,30 +1316,24 @@ int rhizome_fetch_flush_blob_buffer(struct rhizome_fetch_slot *slot)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
do{
sqlite3_blob *blob=NULL;
rhizome_blob_handle *blob
= rhizome_database_open_blob_byrowid(slot->rowid,1 /* read/write */);
if (!blob) goto again;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", slot->rowid, 1 /* read/write */, &blob);
if (ret==SQLITE_BUSY || ret==SQLITE_LOCKED)
goto again;
else if (ret!=SQLITE_OK){
WHYF("sqlite3_blob_open() failed, %s",
sqlite3_errmsg(rhizome_db));
goto failed;
}
ret=sqlite3_blob_write(blob, slot->blob_buffer, slot->blob_buffer_bytes,
slot->file_ofs-slot->blob_buffer_bytes);
int ret=rhizome_database_blob_write(blob, slot->blob_buffer,
slot->blob_buffer_bytes,
slot->file_ofs-slot->blob_buffer_bytes);
if (ret==SQLITE_BUSY || ret==SQLITE_LOCKED)
goto again;
else if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_write(,,%d,%lld) failed, %s",
WHYF("rhizome_database_blob_write(,,%d,%lld) failed, possibly due to %s",
slot->blob_buffer_bytes,slot->file_ofs-slot->blob_buffer_bytes,
sqlite3_errmsg(rhizome_db));
rhizome_database_blob_errmsg(blob));
goto failed;
}
ret=sqlite3_blob_close(blob);
ret=rhizome_database_blob_close(blob);
blob=NULL;
if (ret==SQLITE_BUSY || ret==SQLITE_LOCKED)
goto again;
@ -1353,16 +1347,16 @@ int rhizome_fetch_flush_blob_buffer(struct rhizome_fetch_slot *slot)
return 0;
failed:
if (blob) sqlite3_blob_close(blob);
if (blob) rhizome_database_blob_close(blob);
rhizome_fetch_close(slot);
return -1;
again:
if (blob)
sqlite3_blob_close(blob);
rhizome_database_blob_close(blob);
blob=NULL;
if (_sqlite_retry(__WHENCE__, &retry, "sqlite3_blob_write")==0)
if (_sqlite_retry(__WHENCE__, &retry, "rhizome_database_blob_write")==0)
return -1;
}while(1);

View File

@ -27,69 +27,8 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
write->id_known=0;
}
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") != SQLITE_OK)
return WHY("Failed to begin transaction");
/* INSERT INTO FILES(id as text, data blob, length integer, highestpriority integer).
BUT, we have to do this incrementally so that we can handle blobs larger than available memory.
This is possible using:
int sqlite3_bind_zeroblob(sqlite3_stmt*, int, int n);
That binds an all zeroes blob to a field. We can then populate the data by
opening a handle to the blob using:
int sqlite3_blob_write(sqlite3_blob *, const void *z, int n, int iOffset);
*/
write->blob_rowid=rhizome_database_create_blob_for(write->id,file_length,priority);
sqlite3_stmt *statement = NULL;
int ret=sqlite_exec_void_retry(&retry,
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES('%s',%lld,%d,0,%lld);",
write->id, (long long)file_length, priority, (long long)gettime_ms());
if (ret!=SQLITE_OK) {
WHYF("Failed to insert into files: %s", sqlite3_errmsg(rhizome_db));
goto insert_row_fail;
}
statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",write->id);
if (!statement) {
WHYF("Failed to insert into fileblobs: %s", sqlite3_errmsg(rhizome_db));
goto insert_row_fail;
}
/* Bind appropriate sized zero-filled blob to data field */
if (sqlite3_bind_zeroblob(statement, 1, file_length) != SQLITE_OK) {
WHYF("sqlite3_bind_zeroblob() failed: %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
goto insert_row_fail;
}
/* Do actual insert, and abort if it fails */
int rowcount = 0;
int stepcode;
while ((stepcode = _sqlite_step_retry(__WHENCE__, LOG_LEVEL_ERROR, &retry, statement)) == SQLITE_ROW)
++rowcount;
if (rowcount)
WARNF("void query unexpectedly returned %d row%s", rowcount, rowcount == 1 ? "" : "s");
if (!sqlite_code_ok(stepcode)){
insert_row_fail:
WHYF("Failed to insert row for fileid=%s", write->id);
if (statement) sqlite3_finalize(statement);
sqlite_exec_void_retry(&retry, "ROLLBACK;");
return -1;
}
sqlite3_finalize(statement);
statement=NULL;
/* Get rowid for inserted row, so that we can modify the blob */
write->blob_rowid = sqlite3_last_insert_rowid(rhizome_db);
if (config.debug.rhizome_rx)
DEBUGF("Got rowid %lld for %s", write->blob_rowid, write->id);
if (sqlite_exec_void_retry(&retry, "COMMIT;")!=SQLITE_OK){
return WHYF("Failed to commit transaction: %s", sqlite3_errmsg(rhizome_db));
}
write->file_length = file_length;
write->file_offset = 0;
SHA512_Init(&write->sha512_context);
@ -105,45 +44,38 @@ insert_row_fail:
/* Write write->buffer into the database blob */
int rhizome_flush(struct rhizome_write *write){
IN();
/* Just in case we're reading in a file that is still being written to. */
if (write->file_offset + write->data_size > write->file_length)
return WHY("Too much content supplied");
RETURN(WHY("Too much content supplied"));
if (write->data_size<=0)
return WHY("No content supplied");
RETURN(WHY("No content supplied"));
if (write->crypt){
if (rhizome_crypt_xor_block(write->buffer, write->data_size, write->file_offset, write->key, write->nonce))
return -1;
RETURN(-1);
}
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
do{
sqlite3_blob *blob=NULL;
rhizome_blob_handle *blob=
rhizome_database_open_blob_byrowid(write->blob_rowid,1 /* write */);
if (!blob) goto again;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", write->blob_rowid, 1 /* read/write */, &blob);
int ret=rhizome_database_blob_write(blob, write->buffer, write->data_size,
write->file_offset);
if (sqlite_code_busy(ret))
goto again;
else if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_open() failed: %s",
sqlite3_errmsg(rhizome_db));
if (blob) sqlite3_blob_close(blob);
return -1;
WHYF("rhizome_database_blob_write() failed: %s",
rhizome_database_blob_errmsg(blob));
if (blob) rhizome_database_blob_close(blob);
RETURN(-1);
}
ret=sqlite3_blob_write(blob, write->buffer, write->data_size,
write->file_offset);
if (sqlite_code_busy(ret))
goto again;
else if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_write() failed: %s",
sqlite3_errmsg(rhizome_db));
if (blob) sqlite3_blob_close(blob);
return -1;
}
ret = sqlite3_blob_close(blob);
ret = rhizome_database_blob_close(blob);
blob=NULL;
if (sqlite_code_busy(ret))
goto again;
@ -151,12 +83,12 @@ int rhizome_flush(struct rhizome_write *write){
break;
WHYF("sqlite3_blob_close() failed: %s", sqlite3_errmsg(rhizome_db));
return -1;
RETURN(-1);
again:
if (blob) sqlite3_blob_close(blob);
if (sqlite_retry(&retry, "sqlite3_blob_write")==0)
return -1;
if (blob) rhizome_database_blob_close(blob);
if (sqlite_retry(&retry, "rhizome_database_blob_write")==0)
RETURN(-1);
}while(1);
@ -165,7 +97,7 @@ int rhizome_flush(struct rhizome_write *write){
if (config.debug.rhizome)
DEBUGF("Written %lld of %lld", write->file_offset, write->file_length);
write->data_size=0;
return 0;
RETURN(0);
}
/* Expects file to be at least file_length in size */
@ -418,23 +350,20 @@ int rhizome_open_read(struct rhizome_read *read, const char *fileid, int hash){
// returns the number of bytes read
int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_length){
IN();
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
do{
sqlite3_blob *blob = NULL;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", read->blob_rowid, 0 /* read only */, &blob);
if (sqlite_code_busy(ret))
goto again;
else if(ret!=SQLITE_OK)
return WHYF("sqlite3_blob_open failed: %s",sqlite3_errmsg(rhizome_db));
rhizome_blob_handle *blob =
rhizome_database_open_blob_byrowid(read->blob_rowid,0 /* read only */);
if (!blob) goto again;
if (read->length==-1)
read->length=sqlite3_blob_bytes(blob);
read->length=blob->blob_bytes;
if (!buffer){
sqlite3_blob_close(blob);
return 0;
rhizome_database_blob_close(blob);
RETURN(0);
}
int count = read->length - read->offset;
@ -442,13 +371,14 @@ int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_le
count=buffer_length;
if (count>0){
ret = sqlite3_blob_read(blob, buffer, count, read->offset);
int ret = rhizome_database_blob_read(blob, buffer, count, read->offset);
if (sqlite_code_busy(ret))
goto again;
else if(ret!=SQLITE_OK){
WHYF("sqlite3_blob_read failed: %s",sqlite3_errmsg(rhizome_db));
sqlite3_blob_close(blob);
return -1;
WHYF("rhizome_database_blob_read failed: %s",
rhizome_database_blob_errmsg(blob));
rhizome_database_blob_close(blob);
RETURN(-1);
}
if (read->hash){
@ -459,7 +389,7 @@ int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_le
SHA512_End(&read->sha512_context, hash_out);
if (strcasecmp(read->id, hash_out)){
sqlite3_blob_close(blob);
rhizome_database_blob_close(blob);
WHYF("Expected hash=%s, got %s", read->id, hash_out);
}
}
@ -467,8 +397,8 @@ int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_le
if (read->crypt){
if(rhizome_crypt_xor_block(buffer, count, read->offset, read->key, read->nonce)){
sqlite3_blob_close(blob);
return -1;
rhizome_database_blob_close(blob);
RETURN(-1);
}
}
@ -476,14 +406,16 @@ int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_le
}
sqlite3_blob_close(blob);
return count;
rhizome_database_blob_close(blob);
DEBUGF("Read and returned %d",count);
RETURN(count);
again:
if (blob) sqlite3_blob_close(blob);
if (sqlite_retry(&retry, "sqlite3_blob_open")==0)
return -1;
if (blob) rhizome_database_blob_close(blob);
if (sqlite_retry(&retry, "rhizome_database_blob_open")==0)
RETURN(-1);
}while (1);
OUT();
}
static int write_file(struct rhizome_read *read, const char *filepath){

View File

@ -131,6 +131,10 @@ int server(char *backing_file)
server_getpid = getpid();
fprintf(f,"%d\n", server_getpid);
fclose(f);
/* Open Rhizome database and clean out any cruft */
rhizome_opendb();
rhizome_cleanup();
overlayServerMode();

View File

@ -239,6 +239,39 @@ test_ExtractManifestFileAfterAdd() {
assert diff file1 file1x
}
doc_ExtractManifestFileFromExtBlob="Extract manifest and file from external blob"
setup_ExtractManifestFileFromExtBlob() {
setup_servald
setup_rhizome
executeOk_servald config set rhizome.max_internal_blob_size 1
echo "A test file" >file1
executeOk_servald rhizome add file $SIDB1 file1 file1.manifest
executeOk_servald rhizome list
assert_rhizome_list --fromhere=1 --author=$SIDB1 file1
extract_manifest_id manifestid file1.manifest
extract_manifest_version version file1.manifest
extract_manifest_filehash filehash file1.manifest
}
test_ExtractManifestFileFromExtBlob() {
executeOk_servald rhizome extract bundle $manifestid file1x.manifest file1x
tfw_cat --stdout --stderr
assertStdoutLineCount '==' 8
local size=$(( $(cat file1 | wc -c) + 0 ))
assertStdoutGrep --matches=1 "^service:file$"
assertStdoutGrep --matches=1 "^manifestid:$manifestid\$"
assertStdoutGrep --matches=1 "^version:$version\$"
assertStdoutGrep --matches=1 "^inserttime:$rexp_date\$"
assertStdoutGrep --matches=1 "^filehash:$filehash\$"
assertStdoutGrep --matches=1 "^filesize:$size\$"
assertStdoutGrep --matches=1 "^\.author:$SIDB1\$"
assertStdoutGrep --matches=1 "^\.readonly:0\$"
assert [ -e file1x.manifest ]
assert diff file1.manifest file1x.manifest
assert [ -e file1x ]
tfw_cat file1 file1x
assert diff file1 file1x
}
doc_ExtractManifestToStdout="Extract manifest to output field"
setup_ExtractManifestToStdout() {
setup_servald

View File

@ -283,6 +283,41 @@ test_FileTransferMultiMDP() {
done
}
doc_FileTransferMultiMDPExtBlob="New bundle transfers to four nodes via MDP, external blob files"
setup_FileTransferMultiMDPExtBlob() {
setup_common
set_instance +A
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.max_internal_blob_size 0
set_instance +B
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.max_internal_blob_size 0
set_instance +C
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.max_internal_blob_size 0
set_instance +D
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.max_internal_blob_size 0
set_instance +E
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.max_internal_blob_size 0
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B +C +D +E
foreach_instance +A assert_peers_are_instances +B +C +D +E
foreach_instance +B assert_peers_are_instances +A +C +D +E
foreach_instance +C assert_peers_are_instances +A +B +D +E
foreach_instance +D assert_peers_are_instances +A +B +C +E
}
test_FileTransferMultiMDPExtBlob() {
wait_until bundle_received_by $BID:$VERSION +B +C +D +E
for i in B C D E; do
set_instance +$i
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
done
}
doc_FileTransferDelete="Payload deletion transfers to one node"
setup_FileTransferDelete() {