Refactor all rhizome reading and writing

- The API in rhizome_store.c is used for all reading and writing
- external storage is now usable for all transport options
This commit is contained in:
Jeremy Lakeman 2013-02-20 14:29:08 +10:30
parent a6405fe387
commit 73786bcb5d
8 changed files with 775 additions and 895 deletions

View File

@ -33,12 +33,14 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
{
IN();
uint64_t version=
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]);
uint64_t fileOffset=
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8]);
uint32_t bitmap=
read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8]);
uint16_t blockLength=
read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]);
read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]);
if (blockLength>1024) RETURN(-1);
struct subscriber *source = find_subscriber(mdp->out.src.sid, SID_SIZE, 0);
@ -55,88 +57,95 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
journal, then the newer version is okay to use to service this request.
*/
rhizome_blob_handle *blob=NULL;
blob=rhizome_database_open_blob_bybid
(alloca_tohex_bid(&mdp->out.payload[0]),
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]), 0 /* read only */);
if (blob->blob_bytes<fileOffset) {
rhizome_database_blob_close(blob); blob=NULL;
char filehash[SHA512_DIGEST_STRING_LENGTH];
if (rhizome_database_filehash_from_id(alloca_tohex_bid(mdp->out.payload), version, filehash)<=0)
RETURN(-1);
struct rhizome_read read;
bzero(&read, sizeof read);
int ret=rhizome_open_read(&read, filehash, 0);
if (!ret){
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
// Reply is broadcast, so we cannot authcrypt, and signing is too time consuming
// for low devices. The result is that an attacker can prevent rhizome transfers
// if they want to by injecting fake blocks. The alternative is to not broadcast
// back replies, and then we can authcrypt.
// multiple receivers starting at different times, we really need merkle-tree hashing.
// so multiple receivers is not realistic for now. So use non-broadcast unicode
// for now would seem the safest. But that would stop us from allowing multiple
// receivers in the special case where additional nodes begin listening in from the
// beginning.
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
int send_broadcast=1;
if (source){
if (!(source->reachable&REACHABLE_DIRECT))
send_broadcast=0;
if (source->reachable&REACHABLE_UNICAST && source->interface && source->interface->prefer_unicast)
send_broadcast=0;
}
if (send_broadcast){
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
memset(reply.out.dst.sid,0xff,SID_SIZE);
reply.out.ttl=1;
}else{
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(mdp->out.src.sid,reply.out.dst.sid,SID_SIZE);
reply.out.ttl=64;
}
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
reply.out.queue=OQ_OPPORTUNISTIC;
reply.out.payload[0]='B'; // reply contains blocks
// include 16 bytes of BID prefix for identification
bcopy(&mdp->out.payload[0],&reply.out.payload[1],16);
// and version of manifest
bcopy(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES],
&reply.out.payload[1+16],8);
int i;
for(i=0;i<32;i++){
if (bitmap&(1<<(31-i)))
continue;
if (overlay_queue_remaining(reply.out.queue) < 10)
break;
// calculate and set offset of block
read.offset = fileOffset+i*blockLength;
// stop if we passed the length of the file
// (but we may not know the file length until we attempt a read)
if (read.length!=-1 && read.offset>read.length)
break;
write_uint64(&reply.out.payload[1+16+8], read.offset);
int bytes_read = rhizome_read(&read, &reply.out.payload[1+16+8+8], blockLength);
if (bytes_read<=0)
break;
reply.out.payload_length=1+16+8+8+bytes_read;
// Mark the last block of the file, if required
if (read.offset >= read.length)
reply.out.payload[0]='T';
// send packet
if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0))
break;
}
}
rhizome_read_close(&read);
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
// Reply is broadcast, so we cannot authcrypt, and signing is too time consuming
// for low devices. The result is that an attacker can prevent rhizome transfers
// if they want to by injecting fake blocks. The alternative is to not broadcast
// back replies, and then we can authcrypt.
// multiple receivers starting at different times, we really need merkle-tree hashing.
// so multiple receivers is not realistic for now. So use non-broadcast unicode
// for now would seem the safest. But that would stop us from allowing multiple
// receivers in the special case where additional nodes begin listening in from the
// beginning.
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
reply.out.ttl=1;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
int send_broadcast=1;
if (source){
if (!(source->reachable&REACHABLE_DIRECT))
send_broadcast=0;
if (source->reachable&REACHABLE_UNICAST && source->interface && source->interface->prefer_unicast)
send_broadcast=0;
}
if (send_broadcast){
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
memset(reply.out.dst.sid,0xff,SID_SIZE);
}else{
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(mdp->out.src.sid,reply.out.dst.sid,SID_SIZE);
}
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
reply.out.queue=OQ_OPPORTUNISTIC;
reply.out.payload[0]='B'; // reply contains blocks
// include 16 bytes of BID prefix for identification
bcopy(&mdp->out.payload[0],&reply.out.payload[1],16);
// and version of manifest
bcopy(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES],
&reply.out.payload[1+16],8);
int i;
for(i=0;i<32;i++)
if (!(bitmap&(1<<(31-i))))
{
// calculate and set offset of block
uint64_t blockOffset=fileOffset+i*blockLength;
write_uint64(&reply.out.payload[1+16+8],blockOffset);
// work out how many bytes to read
int blockBytes=blob->blob_bytes-blockOffset;
if (blockBytes>blockLength) blockBytes=blockLength;
// read data for block
if (blob->blob_bytes>=blockOffset) {
if (overlay_queue_remaining(reply.out.queue) < 10)
break;
rhizome_database_blob_read(blob,&reply.out.payload[1+16+8+8],
blockBytes,blockOffset);
reply.out.payload_length=1+16+8+8+blockBytes;
// Mark terminal block if required
if (blockOffset+blockBytes==blob->blob_bytes) reply.out.payload[0]='T';
// send packet
if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0))
break;
} else break;
}
rhizome_database_blob_close(blob); blob=NULL;
RETURN(-1);
RETURN(ret);
OUT();
}

View File

@ -363,6 +363,45 @@ int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len);
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length);
/* Rhizome file storage api */
struct rhizome_write{
char id[SHA512_DIGEST_STRING_LENGTH+1];
char id_known;
unsigned char *buffer;
int buffer_size;
int data_size;
int64_t file_offset;
int64_t file_length;
int crypt;
unsigned char key[RHIZOME_CRYPT_KEY_BYTES];
unsigned char nonce[crypto_stream_xsalsa20_NONCEBYTES];
SHA512_CTX sha512_context;
int64_t blob_rowid;
int blob_fd;
};
struct rhizome_read{
char id[SHA512_DIGEST_STRING_LENGTH+1];
int crypt;
unsigned char key[RHIZOME_CRYPT_KEY_BYTES];
unsigned char nonce[crypto_stream_xsalsa20_NONCEBYTES];
int hash;
SHA512_CTX sha512_context;
int64_t blob_rowid;
int blob_fd;
int64_t offset;
int64_t length;
};
typedef struct rhizome_http_request {
struct sched_ent alarm;
long long initiate_time; /* time connection was initiated */
@ -390,8 +429,8 @@ typedef struct rhizome_http_request {
#define RHIZOME_HTTP_REQUEST_ALLGROUPLIST 8
#define RHIZOME_HTTP_REQUEST_BUNDLESINGROUP 16
// manifests are small enough to send from a buffer
// #define RHIZOME_HTTP_REQUEST_BUNDLEMANIFEST 32
// for anything too big, we can just use a blob
#define RHIZOME_HTTP_REQUEST_STORE 32
#define RHIZOME_HTTP_REQUEST_BLOB 64
#define RHIZOME_HTTP_REQUEST_FAVICON 128
@ -405,6 +444,8 @@ typedef struct rhizome_http_request {
int buffer_length; // number of bytes loaded into buffer
int buffer_offset; // where we are between [0,buffer_length)
struct rhizome_read read_state;
/* Path of request (used by POST multipart form requests where
the actual processing of the request does not occur while the
request headers are still available. */
@ -606,41 +647,7 @@ struct http_response_parts {
int unpack_http_response(char *response, struct http_response_parts *parts);
/* Rhizome file storage api */
struct rhizome_write{
char id[SHA512_DIGEST_STRING_LENGTH+1];
char id_known;
unsigned char *buffer;
int buffer_size;
int data_size;
int64_t file_offset;
int64_t file_length;
int crypt;
unsigned char key[RHIZOME_CRYPT_KEY_BYTES];
unsigned char nonce[crypto_stream_xsalsa20_NONCEBYTES];
SHA512_CTX sha512_context;
int64_t blob_rowid;
};
struct rhizome_read{
char id[SHA512_DIGEST_STRING_LENGTH+1];
int crypt;
unsigned char key[RHIZOME_CRYPT_KEY_BYTES];
unsigned char nonce[crypto_stream_xsalsa20_NONCEBYTES];
int hash;
SHA512_CTX sha512_context;
int64_t blob_rowid;
int64_t offset;
int64_t length;
};
/* rhizome storage methods */
int rhizome_exists(const char *fileHash);
int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int64_t file_length, int priority);
@ -656,24 +663,12 @@ int rhizome_crypt_xor_block(unsigned char *buffer, int buffer_size, int64_t stre
const unsigned char *key, const unsigned char *nonce);
int rhizome_open_read(struct rhizome_read *read, const char *fileid, int hash);
int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_length);
int rhizome_read_close(struct rhizome_read *read);
int rhizome_store_delete(const char *id);
int rhizome_open_decrypt_read(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhizome_read *read_state, int hash);
int rhizome_extract_file(rhizome_manifest *m, const char *filepath, rhizome_bk_t *bsk);
int rhizome_dump_file(const char *id, const char *filepath, int64_t *length);
char *rhizome_database_get_blob_filename(int64_t fileblob_rowid);
typedef struct rhizome_blob_handle {
uint64_t blob_bytes;
sqlite3_blob *sqlite_blob;
int fd_blob;
} rhizome_blob_handle;
rhizome_blob_handle *rhizome_database_open_blob_bybid(const char *id,
uint64_t version,
int writeP);
rhizome_blob_handle *rhizome_database_open_blob_byrowid(int row_id,int writeP);
int rhizome_database_blob_close(rhizome_blob_handle *blob);
int rhizome_database_blob_read(rhizome_blob_handle *blob,unsigned char *buffer,
uint64_t count,uint64_t offset);
int rhizome_database_blob_write(rhizome_blob_handle *blob,unsigned char *buffer,
uint64_t count,uint64_t offset);
const char *rhizome_database_blob_errmsg(rhizome_blob_handle *blob);
int rhizome_database_filehash_from_id(const char *id, uint64_t version, char hash[SHA512_DIGEST_STRING_LENGTH]);
#endif //__SERVALDNA__RHIZOME_H

View File

@ -666,157 +666,20 @@ long long rhizome_database_used_bytes()
return db_page_size * (db_page_count - db_free_page_count);
}
char blobfile[1024];
char *rhizome_database_get_blob_filename(int64_t rowid)
{
const char *blobpath=config.rhizome.datastore_path;
if (!blobpath||!blobpath[0]) {
blobpath=serval_instancepath();
}
if (!blobpath) return NULL;
snprintf(blobfile,1024,"%s/blob.%lld",blobpath,rowid);
return blobfile;
}
rhizome_blob_handle *rhizome_database_open_blob_bybid(const char *id,
uint64_t version,
int writeP)
int rhizome_database_filehash_from_id(const char *id, uint64_t version, char hash[SHA512_DIGEST_STRING_LENGTH])
{
IN();
long long row_id=-1;
if (sqlite_exec_int64(&row_id, "SELECT rowid FROM FILEBLOBS WHERE id IN (SELECT filehash FROM MANIFESTS WHERE manifests.version=%lld AND manifests.id='%s');",
version,id) < 1)
{
DEBUGF("Couldn't find stored bundle.");
#if 0
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT rowid,id,filehash FROM MANIFESTS");
while(sqlite_step_retry(&retry, statement)==SQLITE_ROW){
sqlite3_int64 rowid = sqlite3_column_int64(statement, 0);
const char *idhex = (const char *) sqlite3_column_text(statement, 1);
const char *filehashhex = (const char *) sqlite3_column_text(statement, 2);
DEBUGF("rowid=%lld, id='%s', filehash='%s'",
rowid,idhex,filehashhex);
}
sqlite3_finalize(statement);
#endif
RETURN(NULL);
}
if (row_id==-1) {
DEBUGF("Couldn't find stored bundle.");
RETURN(NULL);
}
RETURN(rhizome_database_open_blob_byrowid(row_id,writeP));
strbuf hash_sb = strbuf_local(hash, SHA512_DIGEST_STRING_LENGTH);
RETURN(sqlite_exec_strbuf(hash_sb, "SELECT filehash FROM MANIFESTS WHERE manifests.version=%lld AND manifests.id='%s';",
version,id));
OUT();
}
rhizome_blob_handle *rhizome_database_open_blob_byrowid(int row_id,int writeP)
{
IN();
struct rhizome_blob_handle *blob=calloc(sizeof(struct rhizome_blob_handle),1);
if (!blob) RETURN(NULL);
if (config.debug.externalblobs)
DEBUGF("Opening blob for rowid #%d",row_id);
if (!config.rhizome.external_blobs)
{
// Try opening as internal blob.
// If column is not a blob, then this will fail.
int ret=sqlite3_blob_open(rhizome_db, "main", "fileblobs", "data",
row_id, writeP, &blob->sqlite_blob);
if (ret==SQLITE_OK) {
blob->blob_bytes=sqlite3_blob_bytes(blob->sqlite_blob);
RETURN(blob);
}
free(blob);
RETURN(NULL);
}
else
{
// Try opening as an external file
char *blobfile=rhizome_database_get_blob_filename(row_id);
errno=0;
blob->fd_blob=open(blobfile,O_RDWR);
if (blob->fd_blob==-1&&writeP) blob->fd_blob=open(blobfile,O_CREAT|O_RDWR,0664);
if (blob->fd_blob>-1) {
// File is stored externally
blob->blob_bytes=lseek(blob->fd_blob,0,SEEK_END);
DEBUGF("Opened fileblobs blob file '%s' (%lld bytes)",
blobfile,blob->blob_bytes);
RETURN(blob);
}
DEBUGF("Could not open fileblobs blob file '%s', will try sqlite blob",
blobfile);
// WHY_perror("open");
}
// Couldn't open, so fail
free(blob);
RETURN(NULL);
OUT();
}
int rhizome_database_blob_close(rhizome_blob_handle *blob)
{
if (!blob) return 0;
if (blob->sqlite_blob) sqlite3_blob_close(blob->sqlite_blob);
if (blob->fd_blob) close(blob->fd_blob);
bzero(blob,sizeof(struct rhizome_blob_handle));
return 0;
}
int rhizome_database_blob_read(rhizome_blob_handle *blob,unsigned char *out,
uint64_t count,uint64_t offset)
{
IN();
if (!blob) RETURN(-1);
if (blob->sqlite_blob) RETURN(sqlite3_blob_read(blob->sqlite_blob,
out,count,offset));
if (blob->fd_blob) {
lseek(blob->fd_blob,offset,SEEK_SET);
int r=read(blob->fd_blob,out,count);
if (r==count) RETURN(SQLITE_OK);
}
RETURN(-1);
OUT();
}
int rhizome_database_blob_write(rhizome_blob_handle *blob,unsigned char *buffer,
uint64_t count,uint64_t offset)
{
IN();
if (!blob) RETURN(-1);
if (blob->sqlite_blob) RETURN(sqlite3_blob_write(blob->sqlite_blob,
buffer,count,offset));
if (blob->fd_blob) {
DEBUGF("Writing to external file backed blob");
if (lseek(blob->fd_blob,offset,SEEK_SET)<0)
RETURN(WHYF("lseek(fd,%lld,SEEK_SET) failed",offset));
int r=write(blob->fd_blob,buffer,count);
DEBUGF(" wrote %d of %lld bytes",r,count);
if (r==count) RETURN(SQLITE_OK);
}
RETURN(-1);
OUT();
}
const char *rhizome_database_blob_errmsg(rhizome_blob_handle *blob)
{
if (!blob) return "blob is null";
if (blob->sqlite_blob) return sqlite3_errmsg(rhizome_db);
if (blob->fd_blob) return strerror(errno);
return "blob has no open channel";
}
void rhizome_cleanup()
{
IN();
/* FIXME
// clean out unreferenced files
// TODO keep updating inserttime for *very* long transfers?
if (sqlite_exec_void("DELETE FROM FILES WHERE inserttime < %lld AND datavalid=0;", gettime_ms() - 300000)) {
@ -847,6 +710,7 @@ void rhizome_cleanup()
WARNF("delete failed: %s", sqlite3_errmsg(rhizome_db));
}
}
*/
OUT();
}
@ -946,12 +810,7 @@ int rhizome_drop_stored_file(const char *id,int maximum_priority)
sqlite3_finalize(statement);
if (can_drop) {
sqlite_exec_void_retry(&retry, "delete from files where id='%s';",id);
int64_t fileblob_rowid=-1;
sqlite_exec_int64_retry(&retry,&fileblob_rowid,"select rowid from fileblobs where id='%s';",id);
if (fileblob_rowid>-1) {
char *blobfile=rhizome_database_get_blob_filename(fileblob_rowid);
if (blobfile) unlink(blobfile);
}
rhizome_store_delete(id);
sqlite_exec_void_retry(&retry, "delete from fileblobs where id='%s';",id);
}
return 0;
@ -1286,86 +1145,6 @@ cleanup:
OUT();
}
int64_t rhizome_database_create_blob_for(const char *filehashhex_or_tempid,
int64_t fileLength, int priority)
{
IN();
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (config.debug.externalblobs)
DEBUGF("Creating blob for (filehash or temp id)='%s', length=%d, priority=%d",
filehashhex_or_tempid,fileLength,priority);
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") != SQLITE_OK)
RETURN(WHY("Failed to begin transaction"));
/* INSERT INTO FILES(id as text, data blob, length integer, highestpriority integer).
BUT, we have to do this incrementally so that we can handle blobs larger than available memory.
This is possible using:
int sqlite3_bind_zeroblob(sqlite3_stmt*, int, int n);
That binds an all zeroes blob to a field. We can then populate the data by
opening a handle to the blob using:
int sqlite3_blob_write(sqlite3_blob *, const void *z, int n, int iOffset);
*/
int ret=sqlite_exec_void_retry(&retry,
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES('%s',%lld,%d,0,%lld);",
filehashhex_or_tempid, (long long)fileLength,
priority, (long long)gettime_ms());
if (ret!=SQLITE_OK) {
DEBUGF("insert or replace into files ... failed: %s",
sqlite3_errmsg(rhizome_db));
goto insert_row_fail;
}
sqlite3_int64 fileblob_rowid=sqlite3_last_insert_rowid(rhizome_db);
sqlite3_stmt *statement=NULL;
if (!config.rhizome.external_blobs) {
statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",filehashhex_or_tempid);
if (!statement)
goto insert_row_fail;
/* Bind appropriate sized zero-filled blob to data field */
if (sqlite3_bind_zeroblob(statement, 1, fileLength) != SQLITE_OK) {
WHYF("sqlite3_bind_zeroblob() failed: %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
sqlite3_finalize(statement);
goto insert_row_fail;
}
} else {
char *blobfile=rhizome_database_get_blob_filename(fileblob_rowid);
DEBUGF("Attempting to put blob for %s in %s",
filehashhex_or_tempid,blobfile?blobfile:"(null)");
int fd=open(blobfile, O_CREAT | O_TRUNC | O_WRONLY, 0664);
if (fd<0) goto insert_row_fail;
else DEBUGF("Blob file created (fd=%d)",fd);
close(fd);
statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',%lld)",filehashhex_or_tempid,fileblob_rowid);
if (!statement)
goto insert_row_fail;
}
/* Do actual insert, and abort if it fails */
if (_sqlite_exec_void_prepared(__WHENCE__, LOG_LEVEL_ERROR, &retry, statement) == -1) {
insert_row_fail:
WHYF("Failed to insert row for fileid=%s", filehashhex_or_tempid);
sqlite_exec_void_retry(&retry, "ROLLBACK;");
RETURN(-1);
}
/* Get rowid for inserted row, so that we can modify the blob */
int64_t rowid = sqlite3_last_insert_rowid(rhizome_db);
ret = sqlite_exec_void_retry(&retry, "COMMIT;");
if (ret!=SQLITE_OK){
sqlite_exec_void_retry(&retry, "ROLLBACK;");
RETURN(WHYF("Failed to commit transaction"));
}
if (config.debug.externalblobs)
DEBUGF("Got rowid %lld for %s", rowid, filehashhex_or_tempid);
RETURN(rowid);
OUT();
}
void rhizome_bytes_to_hex_upper(unsigned const char *in, char *out, int byteCount)
{
(void) tohex(out, in, byteCount);

View File

@ -948,36 +948,42 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
if (r<0) goto closeit;
}
/* send file contents now */
rhizome_blob_handle *blob=rhizome_database_open_blob_bybid(id,version,
0 /* read */);
if (!blob) goto closeit;
int i;
for(i=0;i<filesize;)
{
int count=4096;
if (filesize-i<count) count=filesize-i;
/* send file contents */
{
char filehash[SHA512_DIGEST_STRING_LENGTH];
if (rhizome_database_filehash_from_id(id, version, filehash)<=0)
goto closeit;
struct rhizome_read read;
bzero(&read, sizeof read);
if (rhizome_open_read(&read, filehash, 0))
goto closeit;
int read_ofs;
for(read_ofs=0;read_ofs<filesize;){
unsigned char buffer[4096];
DEBUGF("reading %d bytes @ %d from blob",count,i);
int sr=rhizome_database_blob_read(blob,buffer,count,i);
if (sr==SQLITE_OK||sr==SQLITE_DONE) {
count=write(sock,buffer,count);
if (count<0) {
WHY_perror("write");
rhizome_database_blob_close(blob);
goto closeit;
} else {
i+=count;
DEBUGF("Wrote %d bytes of file",count);
}
} else {
WHYF("sqlite error #%d occurred reading from the blob: %s",sr, sqlite3_errmsg(rhizome_db));
rhizome_database_blob_close(blob);
read.offset=read_ofs;
int bytes_read = rhizome_read(&read, buffer, sizeof buffer);
if (bytes_read<0){
rhizome_read_close(&read);
goto closeit;
}
int write_ofs=0;
while(write_ofs < bytes_read){
int written = write(sock, buffer + write_ofs, bytes_read - write_ofs);
if (written<0){
WHY_perror("write");
rhizome_read_close(&read);
goto closeit;
}
write_ofs+=written;
}
read_ofs+=bytes_read;
}
rhizome_database_blob_close(blob);
rhizome_read_close(&read);
}
/* Send final mime boundary */
len=snprintf(buffer,8192,"\r\n--%s--\r\n",boundary);
sent=0;

View File

@ -60,19 +60,11 @@ struct rhizome_fetch_slot {
#define RHIZOME_FETCH_RXFILEMDP 5
/* Keep track of how much of the file we have read */
SHA512_CTX sha512_context;
int64_t rowid;
int64_t file_len;
int64_t file_ofs;
struct rhizome_write write_state;
int64_t last_write_time;
int64_t start_time;
#define RHIZOME_BLOB_BUFFER_MAXIMUM_SIZE (1024*1024)
int blob_buffer_size;
unsigned char *blob_buffer;
int blob_buffer_bytes;
/* HTTP transport specific elements */
char request[1024];
int request_len;
@ -151,7 +143,7 @@ int rhizome_active_fetch_bytes_received(int q)
if (q<0) return -1;
if (q>=NQUEUES) return -1;
if (rhizome_fetch_queues[q].active.state==RHIZOME_FETCH_FREE) return -1;
return (int)rhizome_fetch_queues[q].active.file_ofs;
return (int)rhizome_fetch_queues[q].active.write_state.file_offset + rhizome_fetch_queues[q].active.write_state.data_size;
}
static struct sched_ent sched_activate = STRUCT_SCHED_ENT_UNUSED;
@ -527,37 +519,21 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
IN();
int sock = -1;
/* TODO Don't forget to implement resume */
/* TODO We should stream file straight into the database */
slot->start_time=gettime_ms();
if (create_rhizome_import_dir() == -1)
RETURN(WHY("Unable to create import directory"));
if (slot->manifest) {
slot->file_len=slot->manifest->fileLength;
slot->rowid=
rhizome_database_create_blob_for(slot->manifest->fileHexHash,
slot->file_len,
RHIZOME_PRIORITY_DEFAULT);
if (slot->rowid<0) {
RETURN(WHYF_perror("Could not obtain rowid for blob for file '%s'",
alloca_tohex_sid(slot->bid)));
}
if (rhizome_open_write(&slot->write_state, slot->manifest->fileHexHash, slot->manifest->fileLength, RHIZOME_PRIORITY_DEFAULT))
RETURN(-1);
} else {
slot->rowid=-1;
slot->write_state.blob_rowid=-1;
slot->write_state.file_offset=0;
slot->write_state.file_length=-1;
}
slot->request_ofs = 0;
slot->state = RHIZOME_FETCH_CONNECTING;
slot->file_len = -1;
slot->file_ofs = 0;
slot->blob_buffer_bytes = 0;
if (slot->blob_buffer) {
free(slot->blob_buffer);
slot->blob_buffer=NULL;
}
slot->blob_buffer_size=0;
SHA512_Init(&slot->sha512_context);
if (slot->peer_ipandport.sin_family == AF_INET && slot->peer_ipandport.sin_port) {
/* Transfer via HTTP over IPv4 */
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
@ -814,7 +790,7 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip,
We do need to cache it in the slot structure, though, and then offer it
for inserting into the database, but we can avoid the temporary file in
the process. */
slot->rowid=-1;
slot->write_state.blob_rowid=-1;
slot->manifest_bytes=0;
if (schedule_fetch(slot) == -1) {
@ -1068,9 +1044,8 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
rhizome_manifest_free(slot->manifest);
slot->manifest = NULL;
if (slot->blob_buffer) free(slot->blob_buffer);
slot->blob_buffer=NULL;
slot->blob_buffer_size=0;
if (slot->write_state.buffer)
rhizome_fail_write(&slot->write_state);
// Release the fetch slot.
slot->state = RHIZOME_FETCH_FREE;
@ -1098,14 +1073,14 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
if (now-slot->last_write_time>slot->mdpIdleTimeout) {
DEBUGF("MDP connection timed out: last RX %lldms ago (read %d of %d bytes)",
now-slot->last_write_time,
slot->file_ofs,slot->file_len);
slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length);
rhizome_fetch_close(slot);
OUT();
return;
}
if (config.debug.rhizome_rx)
DEBUGF("Timeout: Resending request for slot=0x%p (%d of %d received)",
slot,slot->file_ofs,slot->file_len);
slot,slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length);
if (slot->bidP)
rhizome_fetch_mdp_requestblocks(slot);
else
@ -1155,14 +1130,14 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES);
write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES],slot->bidVersion);
write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8],slot->file_ofs);
write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8],slot->write_state.file_offset + slot->write_state.data_size);
write_uint32(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8],slot->mdpRXBitmap);
write_uint16(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4],slot->mdpRXBlockLength);
if (config.debug.rhizome_tx)
DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x",
alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid),
slot->file_ofs);
slot->write_state.file_offset + slot->write_state.data_size);
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
@ -1235,7 +1210,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
if (config.debug.rhizome_rx)
DEBUGF("Trying to switch to MDP for Rhizome fetch: slot=0x%p (%d bytes)",
slot,slot->file_len);
slot,slot->write_state.file_length);
/* close socket and stop watching it */
if (slot->alarm.poll.fd>=0) {
@ -1267,8 +1242,6 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
down too much. Much careful thought is required to optimise this
transport.
*/
slot->file_len=slot->manifest->fileLength;
slot->mdpIdleTimeout=config.rhizome.idle_timeout; // give up if nothing received for 5 seconds
slot->mdpRXBitmap=0x00000000; // no blocks received yet
slot->mdpRXBlockLength=config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size
@ -1317,146 +1290,62 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
return;
}
int rhizome_fetch_flush_blob_buffer(struct rhizome_fetch_slot *slot)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
do{
rhizome_blob_handle *blob
= rhizome_database_open_blob_byrowid(slot->rowid,1 /* read/write */);
if (!blob) goto again;
int ret=rhizome_database_blob_write(blob, slot->blob_buffer,
slot->blob_buffer_bytes,
slot->file_ofs-slot->blob_buffer_bytes);
if (ret==SQLITE_BUSY || ret==SQLITE_LOCKED)
goto again;
else if (ret!=SQLITE_OK) {
WHYF("rhizome_database_blob_write(,,%d,%lld) failed, possibly due to %s",
slot->blob_buffer_bytes,slot->file_ofs-slot->blob_buffer_bytes,
rhizome_database_blob_errmsg(blob));
goto failed;
}
ret=rhizome_database_blob_close(blob);
blob=NULL;
if (ret==SQLITE_BUSY || ret==SQLITE_LOCKED)
goto again;
else if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_close() failed, %s",
sqlite3_errmsg(rhizome_db));
goto failed;
}
slot->blob_buffer_bytes=0;
return 0;
failed:
if (blob) rhizome_database_blob_close(blob);
rhizome_fetch_close(slot);
return -1;
again:
if (blob)
rhizome_database_blob_close(blob);
blob=NULL;
if (_sqlite_retry(__WHENCE__, &retry, "rhizome_database_blob_write")==0)
return -1;
}while(1);
}
int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes)
{
IN();
if (bytes<=0)
RETURN(0);
// Truncate to known length of file (handy for reading from journal bundles that
// might grow while we are reading from them).
if (bytes>(slot->file_len-slot->file_ofs))
bytes=slot->file_len-slot->file_ofs;
if (bytes>(slot->write_state.file_length-(slot->write_state.file_offset+slot->write_state.data_size))){
bytes=slot->write_state.file_length-(slot->write_state.file_offset+slot->write_state.data_size);
}
if (slot->rowid==-1) {
if (slot->write_state.blob_rowid==-1) {
/* We are reading a manifest. Read it into a buffer. */
int count=bytes;
if (count+slot->manifest_bytes>1024) count=1024-slot->manifest_bytes;
bcopy(buffer,&slot->manifest_buffer[slot->manifest_bytes],count);
slot->manifest_bytes+=count;
slot->file_ofs+=count;
slot->write_state.file_offset+=count;
} else {
/* We are reading a file. Stream it into the database. */
if (bytes>0)
SHA512_Update(&slot->sha512_context,(unsigned char *)buffer,bytes);
if (config.debug.rhizome_rx) {
DEBUGF("slot->blob_buffer_bytes=%d, slot->file_ofs=%d",
slot->blob_buffer_bytes,slot->file_ofs);
// dump("buffer",buffer,bytes);
}
if (!slot->blob_buffer_size) {
/* Allocate an appropriately sized buffer so that we don't have to pay
the cost of blob_open() and blob_close() too often. */
slot->blob_buffer_size=slot->file_len;
if (slot->blob_buffer_size>RHIZOME_BLOB_BUFFER_MAXIMUM_SIZE)
slot->blob_buffer_size=RHIZOME_BLOB_BUFFER_MAXIMUM_SIZE;
slot->blob_buffer=malloc(slot->blob_buffer_size);
assert(slot->blob_buffer);
}
assert(slot->blob_buffer_size);
int bytesRemaining=bytes;
while(bytesRemaining>0) {
int count=slot->blob_buffer_size-slot->blob_buffer_bytes;
if (count>bytes) count=bytesRemaining;
// DEBUGF("copying %d bytes to &blob_buffer[0x%x]",
// count,slot->blob_buffer_bytes);
// dump("buffer",buffer,count);
bcopy(buffer,&slot->blob_buffer[slot->blob_buffer_bytes],count);
// dump("first bytes into slot->blob_buffer",slot->blob_buffer,256);
slot->blob_buffer_bytes+=count;
slot->file_ofs+=count;
buffer+=count; bytesRemaining-=count;
if (slot->blob_buffer_bytes==slot->blob_buffer_size){
if (rhizome_fetch_flush_blob_buffer(slot))
RETURN (-1);
/* We are reading a file. Stream it into the database. */
int ofs=0;
while (ofs<bytes){
int block_size = bytes - ofs;
if (block_size > slot->write_state.buffer_size - slot->write_state.data_size)
block_size = slot->write_state.buffer_size - slot->write_state.data_size;
if (block_size>0){
bcopy(buffer+ofs, slot->write_state.buffer + slot->write_state.data_size, block_size);
slot->write_state.data_size+=block_size;
ofs+=block_size;
}
if (slot->write_state.data_size>=slot->write_state.buffer_size){
int ret = rhizome_flush(&slot->write_state);
if (ret!=0){
rhizome_fetch_close(slot);
RETURN(-1);
}
}
}
}
slot->last_write_time=gettime_ms();
if (slot->file_ofs>=slot->file_len) {
if (slot->write_state.file_offset + slot->write_state.data_size>=slot->write_state.file_length) {
/* got all of file */
if (config.debug.rhizome_rx)
DEBUGF("Received all of file via rhizome -- now to import it");
if (slot->manifest) {
// Were fetching payload, now we have it.
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&slot->sha512_context, (char *)hash_out);
if (slot->blob_buffer_bytes){
if (rhizome_fetch_flush_blob_buffer(slot))
RETURN (-1);
}
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (strcasecmp(hash_out,slot->manifest->fileHexHash)) {
if (config.debug.rhizome_rx)
DEBUGF("Hash mismatch -- dropping row from table.");
WARNF("Expected hash=%s, got %s",
slot->manifest->fileHexHash,hash_out);
sqlite_exec_void_retry(&retry,
"DELETE FROM FILEBLOBS WHERE rowid=%lld",slot->rowid);
sqlite_exec_void_retry(&retry,
"DELETE FROM FILES WHERE id='%s'",
slot->manifest->fileHexHash);
if (rhizome_finish_write(&slot->write_state)){
rhizome_fetch_close(slot);
RETURN(-1);
} else {
int ret=sqlite_exec_void_retry(&retry,
"UPDATE FILES SET inserttime=%lld, datavalid=1 WHERE id='%s'",
gettime_ms(), slot->manifest->fileHexHash);
if (ret!=SQLITE_OK)
if (config.debug.rhizome_rx)
DEBUGF("error marking row valid: %s",sqlite3_errmsg(rhizome_db));
}
if (!rhizome_import_received_bundle(slot->manifest)){
@ -1499,9 +1388,10 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
}
if (config.debug.rhizome_rx)
DEBUGF("Closing rhizome fetch slot = 0x%p. Received %lld bytes in %lldms (%lldKB/sec). Buffer size = %d",
slot,(long long)slot->file_ofs,(long long)gettime_ms()-slot->start_time,
(long long)slot->file_ofs/(gettime_ms()-slot->start_time),
slot->blob_buffer_size);
slot,(long long)slot->write_state.file_offset+slot->write_state.data_size,
(long long)gettime_ms()-slot->start_time,
(long long)(slot->write_state.file_offset+slot->write_state.data_size)/(gettime_ms()-slot->start_time),
slot->write_state.buffer_size);
rhizome_fetch_close(slot);
RETURN(-1);
}
@ -1522,7 +1412,7 @@ int rhizome_received_content(unsigned char *bidprefix,
if (slot->state==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) {
if (!memcmp(slot->bid,bidprefix,16))
{
if (slot->file_ofs==offset) {
if (slot->write_state.file_offset + slot->write_state.data_size==offset) {
if (!rhizome_write_content(slot,(char *)bytes,count))
{
rhizome_fetch_mdp_touch_timeout(slot);
@ -1578,7 +1468,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
} else {
if (config.debug.rhizome_rx)
DEBUGF("Empty read, closing connection: received %lld of %lld bytes",
slot->file_ofs,slot->file_len);
slot->write_state.file_offset + slot->write_state.data_size,slot->write_state.file_length);
rhizome_fetch_switch_to_mdp(slot);
return;
}
@ -1626,7 +1516,10 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
rhizome_fetch_switch_to_mdp(slot);
return;
}
slot->file_len = parts.content_length;
if (slot->write_state.file_length==-1)
slot->write_state.file_length=parts.content_length;
else if (parts.content_length != slot->write_state.file_length)
WARNF("Expected content length %lld, got %lld", slot->write_state.file_length, parts.content_length);
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/

View File

@ -316,6 +316,7 @@ int rhizome_server_free_http_request(rhizome_http_request *r)
close(r->alarm.poll.fd);
if (r->buffer)
free(r->buffer);
rhizome_read_close(&r->read_state);
free(r);
return 0;
}
@ -552,84 +553,85 @@ int rhizome_server_parse_http_request(rhizome_http_request *r)
rhizome_active_fetch_bytes_received(4)
);
rhizome_server_simple_http_response(r, 200, temp);
} else if (is_rhizome_http_enabled()&&(strcmp(path, "/rhizome/groups") == 0)) {
/* Return the list of known groups */
rhizome_server_sql_query_http_response(r, "id", "groups", "from groups", 32, 1);
} else if (is_rhizome_http_enabled()&&(strcmp(path, "/rhizome/files") == 0)) {
/* Return the list of known files */
rhizome_server_sql_query_http_response(r, "id", "files", "from files", 32, 1);
} else if (is_rhizome_http_enabled()&&(strcmp(path, "/rhizome/bars") == 0)) {
/* Return the list of known BARs */
rhizome_server_sql_query_http_response(r, "bar", "manifests", "from manifests", 32, 0);
} else if (is_rhizome_http_enabled()
&&(str_startswith(path, "/rhizome/file/", (const char **)&id))) {
/* Stream the specified payload */
if (!rhizome_str_is_file_hash(id)) {
rhizome_server_simple_http_response(r, 400, "<html><h1>Invalid payload ID</h1></html>\r\n");
} else {
// TODO: Check for Range: header and return 206 if returning partial content
str_toupper_inplace(id);
r->rowid=-1;
} else if (is_rhizome_http_enabled()){
if (strcmp(path, "/rhizome/groups") == 0) {
/* Return the list of known groups */
rhizome_server_sql_query_http_response(r, "id", "groups", "from groups", 32, 1);
} else if (strcmp(path, "/rhizome/files") == 0) {
/* Return the list of known files */
rhizome_server_sql_query_http_response(r, "id", "files", "from files", 32, 1);
} else if (strcmp(path, "/rhizome/bars") == 0) {
/* Return the list of known BARs */
rhizome_server_sql_query_http_response(r, "bar", "manifests", "from manifests", 32, 0);
} else if (str_startswith(path, "/rhizome/file/", (const char **)&id)) {
/* Stream the specified payload */
if (!rhizome_str_is_file_hash(id)) {
rhizome_server_simple_http_response(r, 400, "<html><h1>Invalid payload ID</h1></html>\r\n");
} else {
// TODO: Check for Range: header and return 206 if returning partial content
str_toupper_inplace(id);
bzero(&r->read_state, sizeof(r->read_state));
if (rhizome_open_read(&r->read_state, id, 1))
rhizome_server_simple_http_response(r, 404, "<html><h1>Payload not found</h1></html>\r\n");
else{
if (r->read_state.length==-1){
if (rhizome_read(&r->read_state, NULL, 0)){
rhizome_server_simple_http_response(r, 404, "<html><h1>Unknown length</h1></html>\r\n");
}
}
r->read_state.offset = r->source_index = 0;
if (r->read_state.length - r->read_state.offset>0){
rhizome_server_http_response_header(r, 200, "application/binary", r->read_state.length - r->read_state.offset);
r->request_type |= RHIZOME_HTTP_REQUEST_STORE;
}
}
}
} else if (str_startswith(path, "/rhizome/manifest/", (const char **)&id)) {
// TODO: Stream the specified manifest
rhizome_server_simple_http_response(r, 500, "<html><h1>Not implemented</h1></html>\r\n");
} else if (str_startswith(path, "/rhizome/manifestbyprefix/", (const char **)&id)) {
/* Manifest by prefix */
char bid_low[RHIZOME_MANIFEST_ID_STRLEN+1];
char bid_high[RHIZOME_MANIFEST_ID_STRLEN+1];
int i;
for (i=0;i<RHIZOME_MANIFEST_ID_STRLEN
&&path[strlen("/rhizome/manifestbyprefix/")+i];i++) {
bid_low[i]=path[strlen("/rhizome/manifestbyprefix/")+i];
bid_high[i]=path[strlen("/rhizome/manifestbyprefix/")+i];
}
for(;i<RHIZOME_MANIFEST_ID_STRLEN;i++) {
bid_low[i]='0';
bid_high[i]='f';
}
bid_low[RHIZOME_MANIFEST_ID_STRLEN]=0;
bid_high[RHIZOME_MANIFEST_ID_STRLEN]=0;
DEBUGF("Looking for manifest between %s and %s",
bid_low,bid_high);
r->rowid = -1;
sqlite3_blob *blob=NULL;
sqlite_exec_int64(&r->rowid, "select rowid from fileblobs where id='%s';", id);
r->sql_table="fileblobs";
r->sql_row="data";
r->sql_table="manifests";
r->sql_row="manifest";
sqlite_exec_int64(&r->rowid, "select rowid from manifests where id between '%s' and '%s';", bid_low,bid_high);
if (r->rowid >= 0 && sqlite3_blob_open(rhizome_db, "main", r->sql_table, r->sql_row, r->rowid, 0, &blob) != SQLITE_OK)
r->rowid = -1;
if (r->rowid == -1) {
DEBUGF("Row not found");
rhizome_server_simple_http_response(r, 404, "<html><h1>Payload not found</h1></html>\r\n");
} else {
DEBUGF("row id = %d",r->rowid);
r->source_index = 0;
r->blob_end = sqlite3_blob_bytes(blob);
rhizome_server_http_response_header(r, 200, "application/binary", r->blob_end - r->source_index);
r->request_type |= RHIZOME_HTTP_REQUEST_BLOB;
}
if (blob) sqlite3_blob_close(blob);
}
} else if (is_rhizome_http_enabled()&&
(str_startswith(path, "/rhizome/manifest/", (const char **)&id))) {
// TODO: Stream the specified manifest
rhizome_server_simple_http_response(r, 500, "<html><h1>Not implemented</h1></html>\r\n");
} else if (str_startswith(path, "/rhizome/manifestbyprefix/", (const char **)&id)) {
/* Manifest by prefix */
char bid_low[RHIZOME_MANIFEST_ID_STRLEN+1];
char bid_high[RHIZOME_MANIFEST_ID_STRLEN+1];
int i;
for (i=0;i<RHIZOME_MANIFEST_ID_STRLEN
&&path[strlen("/rhizome/manifestbyprefix/")+i];i++) {
bid_low[i]=path[strlen("/rhizome/manifestbyprefix/")+i];
bid_high[i]=path[strlen("/rhizome/manifestbyprefix/")+i];
}
for(;i<RHIZOME_MANIFEST_ID_STRLEN;i++) {
bid_low[i]='0';
bid_high[i]='f';
}
bid_low[RHIZOME_MANIFEST_ID_STRLEN]=0;
bid_high[RHIZOME_MANIFEST_ID_STRLEN]=0;
DEBUGF("Looking for manifest between %s and %s",
bid_low,bid_high);
r->rowid = -1;
sqlite3_blob *blob=NULL;
r->sql_table="manifests";
r->sql_row="manifest";
sqlite_exec_int64(&r->rowid, "select rowid from manifests where id between '%s' and '%s';", bid_low,bid_high);
if (r->rowid >= 0 && sqlite3_blob_open(rhizome_db, "main", r->sql_table, r->sql_row, r->rowid, 0, &blob) != SQLITE_OK)
r->rowid = -1;
if (r->rowid == -1) {
DEBUGF("Row not found");
rhizome_server_simple_http_response(r, 404, "<html><h1>Payload not found</h1></html>\r\n");
if (blob)
sqlite3_blob_close(blob);
} else {
DEBUGF("row id = %d",r->rowid);
r->source_index = 0;
r->blob_end = sqlite3_blob_bytes(blob);
rhizome_server_http_response_header(r, 200, "application/binary", r->blob_end - r->source_index);
r->request_type |= RHIZOME_HTTP_REQUEST_BLOB;
rhizome_server_simple_http_response(r, 404, "<html><h1>Not found</h1></html>\r\n");
DEBUGF("Sending 404 not found for '%s'",path);
}
if (blob)
sqlite3_blob_close(blob);
}else {
} else {
rhizome_server_simple_http_response(r, 404, "<html><h1>Not found</h1></html>\r\n");
DEBUGF("Sending 404 not found for '%s'",path);
}
@ -749,8 +751,6 @@ int rhizome_server_http_send_bytes(rhizome_http_request *r)
return 1;
}
if (0)
dump("bytes written",&r->buffer[r->buffer_offset],bytes);
r->buffer_offset+=bytes;
// reset inactivity timer
@ -790,6 +790,34 @@ int rhizome_server_http_send_bytes(rhizome_http_request *r)
r->request_type=RHIZOME_HTTP_REQUEST_FROMBUFFER;
}
break;
case RHIZOME_HTTP_REQUEST_STORE:
{
r->request_type=0;
int suggested_size=65536;
if (suggested_size > r->read_state.length - r->read_state.offset)
suggested_size = r->read_state.length - r->read_state.offset;
if (r->buffer_size < suggested_size){
r->buffer_size = suggested_size;
if (r->buffer)
free(r->buffer);
r->buffer = malloc(r->buffer_size);
if (!r->buffer){
r->buffer_size=0;
break;
}
}
r->buffer_length = rhizome_read(&r->read_state, r->buffer, r->buffer_size);
if (r->buffer_length>0)
r->request_type|=RHIZOME_HTTP_REQUEST_FROMBUFFER;
if (r->read_state.offset < r->read_state.length)
r->request_type|=RHIZOME_HTTP_REQUEST_STORE;
break;
}
case RHIZOME_HTTP_REQUEST_BLOB:
{
/* Get more data from the file and put it in the buffer */

View File

@ -9,7 +9,7 @@ int rhizome_exists(const char *fileHash){
long long gotfile = 0;
if (sqlite_exec_int64(&gotfile,
"SELECT COUNT(*) FROM FILES, FILEBLOBS WHERE FILES.ID='%s' and FILES.datavalid=1 and FILES.ID=FILEBLOBS.ID;",
"SELECT COUNT(*) FROM FILES WHERE ID='%s' and datavalid=1;",
fileHash) != 1){
return 0;
}
@ -27,10 +27,98 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
write->id_known=0;
}
write->blob_rowid=rhizome_database_create_blob_for(write->id,file_length,priority);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") != SQLITE_OK)
return WHY("Failed to begin transaction");
/*
we have to write incrementally so that we can handle blobs larger than available memory.
This is possible using:
int sqlite3_bind_zeroblob(sqlite3_stmt*, int, int n);
That binds an all zeroes blob to a field. We can then populate the data by
opening a handle to the blob using:
int sqlite3_blob_write(sqlite3_blob *, const void *z, int n, int iOffset);
*/
sqlite3_stmt *statement = NULL;
int ret=sqlite_exec_void_retry(&retry,
"INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES('%s',%lld,%d,0,%lld);",
write->id, (long long)file_length, priority, (long long)gettime_ms());
if (ret!=SQLITE_OK) {
WHYF("Failed to insert into files: %s", sqlite3_errmsg(rhizome_db));
goto insert_row_fail;
}
char blob_path[1024];
if (config.rhizome.external_blobs) {
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, write->id)){
WHY("Invalid path");
goto insert_row_fail;
}
if (config.debug.externalblobs)
DEBUGF("Attempting to put blob for %s in %s",
write->id,blob_path);
write->blob_fd=open(blob_path, O_CREAT | O_TRUNC | O_WRONLY, 0664);
if (write->blob_fd<0)
goto insert_row_fail;
if (config.debug.externalblobs)
DEBUGF("Blob file created (fd=%d)", write->blob_fd);
}else{
statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",write->id);
if (!statement) {
WHYF("Failed to insert into fileblobs: %s", sqlite3_errmsg(rhizome_db));
goto insert_row_fail;
}
/* Bind appropriate sized zero-filled blob to data field */
if (sqlite3_bind_zeroblob(statement, 1, file_length) != SQLITE_OK) {
WHYF("sqlite3_bind_zeroblob() failed: %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
goto insert_row_fail;
}
/* Do actual insert, and abort if it fails */
int rowcount = 0;
int stepcode;
while ((stepcode = _sqlite_step_retry(__WHENCE__, LOG_LEVEL_ERROR, &retry, statement)) == SQLITE_ROW)
++rowcount;
if (rowcount)
WARNF("void query unexpectedly returned %d row%s", rowcount, rowcount == 1 ? "" : "s");
if (!sqlite_code_ok(stepcode)){
insert_row_fail:
WHYF("Failed to insert row for fileid=%s", write->id);
if (statement) sqlite3_finalize(statement);
sqlite_exec_void_retry(&retry, "ROLLBACK;");
return -1;
}
sqlite3_finalize(statement);
statement=NULL;
/* Get rowid for inserted row, so that we can modify the blob */
write->blob_rowid = sqlite3_last_insert_rowid(rhizome_db);
if (config.debug.rhizome_rx)
DEBUGF("Got rowid %lld for %s", write->blob_rowid, write->id);
}
if (sqlite_exec_void_retry(&retry, "COMMIT;")!=SQLITE_OK){
if (write->blob_fd>0){
close(write->blob_fd);
unlink(blob_path);
}
return WHYF("Failed to commit transaction: %s", sqlite3_errmsg(rhizome_db));
}
write->file_length = file_length;
write->file_offset = 0;
SHA512_Init(&write->sha512_context);
write->buffer_size=write->file_length;
@ -39,69 +127,100 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
write->buffer_size=RHIZOME_BUFFER_MAXIMUM_SIZE;
write->buffer=malloc(write->buffer_size);
if (!write->buffer)
return WHY("Unable to allocate write buffer");
return 0;
}
/* Write write->buffer into the database blob */
int rhizome_flush(struct rhizome_write *write){
/* Write write_state->buffer into the store
Note that we don't support random writes as the contents must be hashed in order
But we don't enforce linear writes yet. */
int rhizome_flush(struct rhizome_write *write_state){
IN();
/* Just in case we're reading in a file that is still being written to. */
if (write->file_offset + write->data_size > write->file_length)
RETURN(WHY("Too much content supplied"));
/* Make sure we aren't being asked to write more data than we expected */
if (write_state->file_offset + write_state->data_size > write_state->file_length)
RETURN(WHYF("Too much content supplied, %d + %d > %d",
write_state->file_offset, write_state->data_size, write_state->file_length));
if (write->data_size<=0)
if (write_state->data_size<=0)
RETURN(WHY("No content supplied"));
if (write->crypt){
if (rhizome_crypt_xor_block(write->buffer, write->data_size, write->file_offset, write->key, write->nonce))
if (write_state->crypt){
if (rhizome_crypt_xor_block(write_state->buffer, write_state->data_size,
write_state->file_offset, write_state->key, write_state->nonce))
RETURN(-1);
}
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
do{
rhizome_blob_handle *blob=
rhizome_database_open_blob_byrowid(write->blob_rowid,1 /* write */);
if (!blob) goto again;
int ret=rhizome_database_blob_write(blob, write->buffer, write->data_size,
write->file_offset);
if (sqlite_code_busy(ret))
goto again;
else if (ret!=SQLITE_OK) {
WHYF("rhizome_database_blob_write() failed: %s",
rhizome_database_blob_errmsg(blob));
if (blob) rhizome_database_blob_close(blob);
RETURN(-1);
if (config.rhizome.external_blobs) {
int ofs=0;
// keep trying until all of the data is written.
while(ofs < write_state->data_size){
int r=write(write_state->blob_fd, write_state->buffer + ofs, write_state->data_size - ofs);
if (r<0)
RETURN(WHY_perror("write"));
DEBUGF("Wrote %d bytes into external blob", r);
ofs+=r;
}
}else{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
ret = rhizome_database_blob_close(blob);
blob=NULL;
if (sqlite_code_busy(ret))
goto again;
else if (ret==SQLITE_OK)
break;
WHYF("sqlite3_blob_close() failed: %s", sqlite3_errmsg(rhizome_db));
RETURN(-1);
again:
if (blob) rhizome_database_blob_close(blob);
if (sqlite_retry(&retry, "rhizome_database_blob_write")==0)
RETURN(-1);
}while(1);
do{
sqlite3_blob *blob=NULL;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", write_state->blob_rowid, 1 /* read/write */, &blob);
if (sqlite_code_busy(ret))
goto again;
else if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_open() failed: %s",
sqlite3_errmsg(rhizome_db));
if (blob) sqlite3_blob_close(blob);
RETURN(-1);
}
ret=sqlite3_blob_write(blob, write_state->buffer, write_state->data_size,
write_state->file_offset);
if (sqlite_code_busy(ret))
goto again;
else if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_write() failed: %s",
sqlite3_errmsg(rhizome_db));
if (blob) sqlite3_blob_close(blob);
RETURN(-1);
}
ret = sqlite3_blob_close(blob);
blob=NULL;
if (sqlite_code_busy(ret))
goto again;
else if (ret==SQLITE_OK){
DEBUGF("Success");
break;
}
RETURN(WHYF("sqlite3_blob_close() failed: %s", sqlite3_errmsg(rhizome_db)));
again:
if (blob) sqlite3_blob_close(blob);
if (sqlite_retry(&retry, "sqlite3_blob_write")==0)
RETURN(1);
}while(1);
}
SHA512_Update(&write->sha512_context, write->buffer, write->data_size);
write->file_offset+=write->data_size;
DEBUGF("Wrote %d bytes", write_state->data_size);
SHA512_Update(&write_state->sha512_context, write_state->buffer, write_state->data_size);
write_state->file_offset+=write_state->data_size;
if (config.debug.rhizome)
DEBUGF("Written %lld of %lld", write->file_offset, write->file_length);
write->data_size=0;
DEBUGF("Written %lld of %lld", write_state->file_offset, write_state->file_length);
write_state->data_size=0;
RETURN(0);
OUT();
}
/* Expects file to be at least file_length in size */
/* Expects file to be at least file_length in size, ignoring anything longer than that */
int rhizome_write_file(struct rhizome_write *write, const char *filename){
FILE *f = fopen(filename, "r");
if (!f)
@ -131,15 +250,28 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename){
return 0;
}
int rhizome_store_delete(const char *id){
char blob_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, id))
return -1;
return unlink(blob_path)?-1:0;
}
int rhizome_fail_write(struct rhizome_write *write){
if (write->buffer)
free(write->buffer);
write->buffer=NULL;
if (write->blob_fd){
close(write->blob_fd);
rhizome_store_delete(write->id);
}
// don't worry too much about sql failures.
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite_exec_void_retry(&retry,
"DELETE FROM FILEBLOBS WHERE rowid=%lld",write->blob_rowid);
if (!config.rhizome.external_blobs)
sqlite_exec_void_retry(&retry,
"DELETE FROM FILEBLOBS WHERE rowid=%lld",write->blob_rowid);
sqlite_exec_void_retry(&retry,
"DELETE FROM FILES WHERE id='%s'",
write->id);
@ -150,7 +282,9 @@ int rhizome_finish_write(struct rhizome_write *write){
if (write->data_size>0){
if (rhizome_flush(write))
return -1;
}
}
if (write->blob_fd)
close(write->blob_fd);
if (write->buffer)
free(write->buffer);
write->buffer=NULL;
@ -192,11 +326,33 @@ int rhizome_finish_write(struct rhizome_write *write){
WHYF("Failed to update files: %s", sqlite3_errmsg(rhizome_db));
goto failure;
}
if (sqlite_exec_void_retry(&retry,
"UPDATE FILEBLOBS SET id='%s' WHERE rowid=%lld",
hash_out, write->blob_rowid)!=SQLITE_OK){
WHYF("Failed to update files: %s", sqlite3_errmsg(rhizome_db));
goto failure;
if (config.rhizome.external_blobs){
char blob_path[1024];
char dest_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, write->id)){
WHYF("Failed to generate file path");
goto failure;
}
if (!FORM_RHIZOME_DATASTORE_PATH(dest_path, hash_out)){
WHYF("Failed to generate file path");
goto failure;
}
if (link(blob_path, dest_path)){
WHY_perror("link");
goto failure;
}
if (unlink(blob_path))
WHY_perror("unlink");
}else{
if (sqlite_exec_void_retry(&retry,
"UPDATE FILEBLOBS SET id='%s' WHERE rowid=%lld",
hash_out, write->blob_rowid)!=SQLITE_OK){
WHYF("Failed to update files: %s", sqlite3_errmsg(rhizome_db));
goto failure;
}
}
}
strlcpy(write->id, hash_out, SHA512_DIGEST_STRING_LENGTH);
@ -317,31 +473,43 @@ int rhizome_open_read(struct rhizome_read *read, const char *fileid, int hash){
read->id[RHIZOME_FILEHASH_STRLEN] = '\0';
str_toupper_inplace(read->id);
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT FILEBLOBS.rowid FROM FILEBLOBS, FILES WHERE FILEBLOBS.id = FILES.id AND FILES.id = ? AND FILES.datavalid != 0");
if (!statement)
return WHYF("Failed to prepare statement: %s", sqlite3_errmsg(rhizome_db));
sqlite3_bind_text(statement, 1, read->id, -1, SQLITE_STATIC);
int ret = sqlite_step_retry(&retry, statement);
if (ret != SQLITE_ROW){
WHYF("Failed to open file blob: %s", sqlite3_errmsg(rhizome_db));
if (config.rhizome.external_blobs){
// Don't even bother checking the FILES table...
char blob_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, read->id))
return WHYF("Failed to generate file path");
read->blob_fd = open(blob_path, O_RDONLY);
if (read->blob_fd<0)
return WHY_perror("Failed to open blob file");
read->length=lseek(read->blob_fd,0,SEEK_END);
}else{
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT FILEBLOBS.rowid FROM FILEBLOBS, FILES WHERE FILEBLOBS.id = FILES.id AND FILES.id = ? AND FILES.datavalid != 0");
if (!statement)
return WHYF("Failed to prepare statement: %s", sqlite3_errmsg(rhizome_db));
sqlite3_bind_text(statement, 1, read->id, -1, SQLITE_STATIC);
int ret = sqlite_step_retry(&retry, statement);
if (ret != SQLITE_ROW){
WHYF("Failed to open file blob: %s", sqlite3_errmsg(rhizome_db));
sqlite3_finalize(statement);
return -1;
}
if (!(sqlite3_column_count(statement) == 1
&& sqlite3_column_type(statement, 0) == SQLITE_INTEGER)) {
sqlite3_finalize(statement);
return WHY("Incorrect statement column");
}
read->blob_rowid = sqlite3_column_int64(statement, 0);
sqlite3_finalize(statement);
return -1;
read->length=-1;
}
if (!(sqlite3_column_count(statement) == 1
&& sqlite3_column_type(statement, 0) == SQLITE_INTEGER)) {
sqlite3_finalize(statement);
return WHY("Incorrect statement column");
}
read->blob_rowid = sqlite3_column_int64(statement, 0);
read->hash=hash;
read->offset=0;
read->length=-1;
sqlite3_finalize(statement);
if (hash)
SHA512_Init(&read->sha512_context);
@ -349,76 +517,96 @@ int rhizome_open_read(struct rhizome_read *read, const char *fileid, int hash){
return 0;
}
/* Read content from the store, hashing and decrypting as we go.
Random access is supported, but hashing requires reads to be sequential though we don't enforce this. */
// returns the number of bytes read
int rhizome_read(struct rhizome_read *read, unsigned char *buffer, int buffer_length){
int rhizome_read(struct rhizome_read *read_state, unsigned char *buffer, int buffer_length){
IN();
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
int bytes_read=0;
do{
rhizome_blob_handle *blob =
rhizome_database_open_blob_byrowid(read->blob_rowid,0 /* read only */);
if (!blob) goto again;
if (read->length==-1)
read->length=blob->blob_bytes;
if (!buffer){
rhizome_database_blob_close(blob);
RETURN(0);
}
int count = read->length - read->offset;
if (count>buffer_length)
count=buffer_length;
if (count>0){
int ret = rhizome_database_blob_read(blob, buffer, count, read->offset);
if (config.rhizome.external_blobs){
if (lseek(read_state->blob_fd, read_state->offset, SEEK_SET)<0)
RETURN(WHY_perror("lseek"));
bytes_read = read(read_state->blob_fd, buffer, buffer_length);
if (bytes_read<0)
RETURN(WHY_perror("read"));
}else{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
do{
sqlite3_blob *blob = NULL;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", read_state->blob_rowid, 0 /* read only */, &blob);
if (sqlite_code_busy(ret))
goto again;
else if(ret!=SQLITE_OK){
WHYF("rhizome_database_blob_read failed: %s",
rhizome_database_blob_errmsg(blob));
rhizome_database_blob_close(blob);
RETURN(-1);
}
else if(ret!=SQLITE_OK)
RETURN(WHYF("sqlite3_blob_open failed: %s",sqlite3_errmsg(rhizome_db)));
if (read->hash){
SHA512_Update(&read->sha512_context, buffer, count);
if (read->offset + count>=read->length){
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&read->sha512_context, hash_out);
if (strcasecmp(read->id, hash_out)){
rhizome_database_blob_close(blob);
WHYF("Expected hash=%s, got %s", read->id, hash_out);
}
if (read_state->length==-1)
read_state->length=sqlite3_blob_bytes(blob);
bytes_read = read_state->length - read_state->offset;
if (bytes_read>buffer_length)
bytes_read=buffer_length;
// allow the caller to do a dummy read, just to work out the length
if (!buffer)
bytes_read=0;
if (bytes_read>0){
ret = sqlite3_blob_read(blob, buffer, bytes_read, read_state->offset);
if (sqlite_code_busy(ret))
goto again;
else if(ret!=SQLITE_OK){
WHYF("sqlite3_blob_read failed: %s",sqlite3_errmsg(rhizome_db));
sqlite3_blob_close(blob);
return -1;
}
}
if (read->crypt){
if(rhizome_crypt_xor_block(buffer, count, read->offset, read->key, read->nonce)){
rhizome_database_blob_close(blob);
RETURN(-1);
}
sqlite3_blob_close(blob);
break;
again:
if (blob) sqlite3_blob_close(blob);
if (sqlite_retry(&retry, "sqlite3_blob_open")==0)
return -1;
}while (1);
}
if (read_state->hash){
if (buffer && bytes_read>0)
SHA512_Update(&read_state->sha512_context, buffer, bytes_read);
if (read_state->offset + bytes_read>=read_state->length){
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&read_state->sha512_context, hash_out);
if (strcasecmp(read_state->id, hash_out)){
WHYF("Expected hash=%s, got %s", read_state->id, hash_out);
}
read->offset+=count;
read_state->hash=0;
}
rhizome_database_blob_close(blob);
DEBUGF("Read and returned %d",count);
RETURN(count);
again:
if (blob) rhizome_database_blob_close(blob);
if (sqlite_retry(&retry, "rhizome_database_blob_open")==0)
}
if (read_state->crypt && buffer && bytes_read>0){
if(rhizome_crypt_xor_block(buffer, bytes_read, read_state->offset, read_state->key, read_state->nonce)){
RETURN(-1);
}while (1);
}
}
read_state->offset+=bytes_read;
RETURN(bytes_read);
OUT();
}
int rhizome_read_close(struct rhizome_read *read){
if (read->blob_fd)
close(read->blob_fd);
read->blob_fd=0;
return 0;
}
static int write_file(struct rhizome_read *read, const char *filepath){
int fd=-1, ret=0;
@ -449,20 +637,14 @@ static int write_file(struct rhizome_read *read, const char *filepath){
return ret;
}
/* Extract the file related to a manifest to the file system.
* The file will be de-crypted and verified while reading.
* If filepath is not supplied, the file will still be checked.
*/
int rhizome_extract_file(rhizome_manifest *m, const char *filepath, rhizome_bk_t *bsk){
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
int rhizome_open_decrypt_read(rhizome_manifest *m, rhizome_bk_t *bsk, struct rhizome_read *read_state, int hash){
// for now, always hash the file
if (rhizome_open_read(&read_state, m->fileHexHash, 1))
if (rhizome_open_read(read_state, m->fileHexHash, hash))
return -1;
read_state.crypt=m->payloadEncryption;
if (read_state.crypt){
read_state->crypt=m->payloadEncryption;
if (read_state->crypt){
// if the manifest specifies encryption, make sure we can generate the payload key and encrypt the contents as we go
if (rhizome_derive_key(m, bsk))
return -1;
@ -470,11 +652,24 @@ int rhizome_extract_file(rhizome_manifest *m, const char *filepath, rhizome_bk_t
if (config.debug.rhizome)
DEBUGF("Decrypting file contents");
bcopy(m->payloadKey, read_state.key, sizeof(read_state.key));
bcopy(m->payloadNonce, read_state.nonce, sizeof(read_state.nonce));
bcopy(m->payloadKey, read_state->key, sizeof(read_state->key));
bcopy(m->payloadNonce, read_state->nonce, sizeof(read_state->nonce));
}
return write_file(&read_state, filepath);
return 0;
}
/* Extract the file related to a manifest to the file system.
* The file will be de-crypted and verified while reading.
* If filepath is not supplied, the file will still be checked.
*/
int rhizome_extract_file(rhizome_manifest *m, const char *filepath, rhizome_bk_t *bsk){
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
int ret = rhizome_open_decrypt_read(m, bsk, &read_state, 1);
if (!ret)
ret = write_file(&read_state, filepath);
rhizome_read_close(&read_state);
return ret;
}
/* dump the raw contents of a file */
@ -482,11 +677,14 @@ int rhizome_dump_file(const char *id, const char *filepath, int64_t *length){
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
if (rhizome_open_read(&read_state, id, 1))
return -1;
int ret = rhizome_open_read(&read_state, id, 1);
if (length)
*length = read_state.length;
return write_file(&read_state, filepath);
if (!ret){
ret=write_file(&read_state, filepath);
if (length)
*length = read_state.length;
}
rhizome_read_close(&read_state);
return ret;
}

View File

@ -175,13 +175,9 @@ test_MDPTransport() {
assert_rhizome_received file2
}
doc_FileTransferBigMDP="Big new bundle transfers to one node via MDP"
setup_FileTransferBigMDP() {
setup_common
set_instance +B
executeOk_servald config set rhizome.http.enable 0
#common setup and test routines for transferring a 1MB file
setup_bigfile_common() {
set_instance +A
executeOk_servald config set rhizome.http.enable 0
dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1
echo x >>file1
ls -l file1
@ -190,151 +186,79 @@ setup_FileTransferBigMDP() {
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_FileTransferBigMDP() {
wait_until bundle_received_by $BID:$VERSION +B
bigfile_common_test() {
set_instance +B
wait_until bundle_received_by $BID:$VERSION +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
}
doc_FileTransferBigMDP="Big new bundle transfers to one node via MDP"
setup_FileTransferBigMDP() {
setup_common
foreach_instance +A +B \
executeOk_servald config set rhizome.http.enable 0
setup_bigfile_common
}
test_FileTransferBigMDP() {
bigfile_common_test
}
doc_FileTransferBig="Big new bundle transfers to one node via HTTP"
setup_FileTransferBig() {
setup_common
set_instance +B
executeOk_servald config set rhizome.mdp.enable 0
set_instance +A
executeOk_servald config set rhizome.mdp.enable 0
dd if=/dev/urandom of=file1 bs=1k count=1k 2>&1
echo x >>file1
ls -l file1
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
foreach_instance +A +B \
executeOk_servald config set rhizome.mdp.enable 0
setup_bigfile_common
}
test_FileTransferBig() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
}
doc_FileTransferMulti="New bundle transfers to four nodes via HTTP"
setup_FileTransferMulti() {
setup_common
set_instance +A
executeOk_servald config set rhizome.mdp.enable 0
set_instance +B
executeOk_servald config set rhizome.mdp.enable 0
set_instance +C
executeOk_servald config set rhizome.mdp.enable 0
set_instance +D
executeOk_servald config set rhizome.mdp.enable 0
set_instance +E
executeOk_servald config set rhizome.mdp.enable 0
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B +C +D +E
foreach_instance +A assert_peers_are_instances +B +C +D +E
foreach_instance +B assert_peers_are_instances +A +C +D +E
foreach_instance +C assert_peers_are_instances +A +B +D +E
foreach_instance +D assert_peers_are_instances +A +B +C +E
}
test_FileTransferMulti() {
wait_until bundle_received_by $BID:$VERSION +B +C +D +E
for i in B C D E; do
set_instance +$i
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
done
}
doc_FileTransferMultiMDP="New bundle transfers to four nodes via MDP"
setup_FileTransferMultiMDP() {
setup_common
set_instance +A
executeOk_servald config set rhizome.http.enable 0
set_instance +B
executeOk_servald config set rhizome.http.enable 0
set_instance +C
executeOk_servald config set rhizome.http.enable 0
set_instance +D
executeOk_servald config set rhizome.http.enable 0
set_instance +E
executeOk_servald config set rhizome.http.enable 0
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B +C +D +E
foreach_instance +A assert_peers_are_instances +B +C +D +E
foreach_instance +B assert_peers_are_instances +A +C +D +E
foreach_instance +C assert_peers_are_instances +A +B +D +E
foreach_instance +D assert_peers_are_instances +A +B +C +E
}
test_FileTransferMultiMDP() {
wait_until bundle_received_by $BID:$VERSION +B +C +D +E
for i in B C D E; do
set_instance +$i
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
done
bigfile_common_test
}
doc_FileTransferBigMDPExtBlob="Big new bundle transfers to one node via MDP, external blob file"
setup_FileTransferBigMDPExtBlob() {
setup_common
set_instance +B
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.external_blobs 1
set_instance +A
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.external_blobs 1
dd if=/dev/urandom of=file1 bs=1k count=4k 2>&1
echo x >>file1
ls -l file1
rhizome_add_file file1
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
foreach_instance +A +B \
executeOk_servald config \
set rhizome.http.enable 0 \
set rhizome.external_blobs 1
setup_bigfile_common
}
test_FileTransferBigMDPExtBlob() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +B
executeOk_servald rhizome list
assert_rhizome_list --fromhere=0 file1
assert_rhizome_received file1
bigfile_common_test
}
doc_FileTransferMultiMDPExtBlob="New bundle transfers to four nodes via MDP, external blob files"
setup_FileTransferMultiMDPExtBlob() {
doc_FileTransferBigHTTPExtBlob="Big new bundle transfers to one node via HTTP, external blob file"
setup_FileTransferBigHTTPExtBlob() {
setup_common
set_instance +A
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.external_blobs 1
set_instance +B
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.external_blobs 1
set_instance +C
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.external_blobs 1
set_instance +D
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.external_blobs 1
set_instance +E
executeOk_servald config set rhizome.http.enable 0
executeOk_servald config set rhizome.external_blobs 1
foreach_instance +A +B \
executeOk_servald config \
set rhizome.mdp.enable 0 \
set rhizome.external_blobs 1
setup_bigfile_common
}
test_FileTransferBigHTTPExtBlob() {
bigfile_common_test
}
# common setup and test routines for transfers to 4 nodes
setup_multitransfer_common() {
set_instance +A
rhizome_add_file file1
start_servald_instances +A +B +C +D +E
foreach_instance +A assert_peers_are_instances +B +C +D +E
foreach_instance +B assert_peers_are_instances +A +C +D +E
foreach_instance +C assert_peers_are_instances +A +B +D +E
foreach_instance +D assert_peers_are_instances +A +B +C +E
set_instance +A
assert_peers_are_instances +B +C +D +E
set_instance +B
assert_peers_are_instances +A +C +D +E
set_instance +C
assert_peers_are_instances +A +B +D +E
set_instance +D
assert_peers_are_instances +A +B +C +E
set_instance +E
assert_peers_are_instances +A +B +C +D
}
test_FileTransferMultiMDPExtBlob() {
multitransfer_common_test() {
wait_until bundle_received_by $BID:$VERSION +B +C +D +E
for i in B C D E; do
set_instance +$i
@ -344,6 +268,54 @@ test_FileTransferMultiMDPExtBlob() {
done
}
doc_FileTransferMulti="New bundle transfers to four nodes via HTTP"
setup_FileTransferMulti() {
setup_common
foreach_instance +A +B +C +D +E \
executeOk_servald config set rhizome.mdp.enable 0
setup_multitransfer_common
}
test_FileTransferMulti() {
multitransfer_common_test
}
doc_FileTransferMultiMDP="New bundle transfers to four nodes via MDP"
setup_FileTransferMultiMDP() {
setup_common
foreach_instance +A +B +C +D +E \
executeOk_servald config set rhizome.http.enable 0
setup_multitransfer_common
}
test_FileTransferMultiMDP() {
multitransfer_common_test
}
doc_FileTransferMultiMDPExtBlob="New bundle transfers to four nodes via MDP, external blob files"
setup_FileTransferMultiMDPExtBlob() {
setup_common
foreach_instance +A +B +C +D +E \
executeOk_servald config \
set rhizome.http.enable 0 \
set rhizome.external_blobs 1
setup_multitransfer_common
}
test_FileTransferMultiMDPExtBlob() {
multitransfer_common_test
}
doc_FileTransferMultiHTTPExtBlob="New bundle transfers to four nodes via HTTP, external blob files"
setup_FileTransferMultiHTTPExtBlob() {
setup_common
foreach_instance +A +B +C +D +E \
executeOk_servald config \
set rhizome.mdp.enable 0 \
set rhizome.external_blobs 1
setup_multitransfer_common
}
test_FileTransferMultiHTTPExtBlob() {
multitransfer_common_test
}
doc_FileTransferDelete="Payload deletion transfers to one node"
setup_FileTransferDelete() {
setup_common