Implicitly buffer all writes to database blobs

This commit is contained in:
Jeremy Lakeman 2013-07-18 17:04:12 +09:30
parent 7d1714d398
commit 15ad333195
4 changed files with 110 additions and 47 deletions

View File

@ -397,7 +397,10 @@ struct rhizome_write{
char id_known;
int64_t file_offset;
int64_t written_offset;
int64_t file_length;
struct rhizome_write_buffer *buffer_list;
int buffer_size;
int crypt;
unsigned char key[RHIZOME_CRYPT_KEY_BYTES];
@ -407,9 +410,6 @@ struct rhizome_write{
int64_t blob_rowid;
int blob_fd;
sqlite3_blob *sql_blob;
struct rhizome_write_buffer *out_of_order;
int total_data_size;
};
struct rhizome_read{

View File

@ -1012,7 +1012,7 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
uint32_t bitmap=0;
int requests=32;
int i;
struct rhizome_write_buffer *p = slot->write_state.out_of_order;
struct rhizome_write_buffer *p = slot->write_state.buffer_list;
uint64_t offset = slot->write_state.file_offset;
for (i=0;i<32;i++){
while(p && p->offset + p->data_size < offset)

View File

@ -18,6 +18,7 @@ int rhizome_exists(const char *fileHash){
int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int64_t file_length, int priority){
write->blob_fd=-1;
if (expectedFileHash){
if (rhizome_exists(expectedFileHash))
return 1;
@ -51,7 +52,7 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
char blob_path[1024];
if (config.rhizome.external_blobs || file_length > 640*1024) {
if (config.rhizome.external_blobs || file_length > 128*1024) {
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, write->id)){
WHY("Invalid path");
goto insert_row_fail;
@ -120,6 +121,7 @@ int rhizome_open_write(struct rhizome_write *write, char *expectedFileHash, int6
write->file_length = file_length;
write->file_offset = 0;
write->written_offset = 0;
SHA512_Init(&write->sha512_context);
@ -147,6 +149,9 @@ static int prepare_data(struct rhizome_write *write_state, unsigned char *buffer
SHA512_Update(&write_state->sha512_context, buffer, data_size);
write_state->file_offset+=data_size;
if (config.debug.rhizome)
DEBUGF("Processesd %"PRId64" of %"PRId64, write_state->file_offset, write_state->file_length);
return 0;
}
@ -172,6 +177,9 @@ static int write_get_lock(struct rhizome_write *write_state){
static int write_data(struct rhizome_write *write_state, uint64_t file_offset, unsigned char *buffer, int data_size){
if (data_size<=0)
return 0;
if (file_offset != write_state->written_offset)
WARNF("Writing file data out of order! [%"PRId64",%"PRId64"]", file_offset, write_state->written_offset);
if (write_state->blob_fd>=0) {
int ofs=0;
@ -200,8 +208,11 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u
return -1;
}
}
write_state->written_offset=file_offset + data_size;
if (config.debug.rhizome)
DEBUGF("Written %"PRId64" of %"PRId64, file_offset, write_state->file_length);
DEBUGF("Wrote %"PRId64" of %"PRId64, file_offset + data_size, write_state->file_length);
return 0;
}
@ -224,54 +235,74 @@ static int write_release_lock(struct rhizome_write *write_state){
return 0;
}
// Write a single buffer in file order
// Should be avoided or eliminated due to database locking overhead
int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffer, int data_size){
IN();
if (write_get_lock(write_state))
RETURN(-1);
int ret = stream_data(write_state, buffer, data_size);
write_release_lock(write_state);
RETURN(ret);
OUT();
}
// Write data buffers in any order, the data will be cached and streamed into the database in file order.
// Though there is an upper bound on the amount of cached data
int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsigned char *buffer, int data_size){
// search the out of order buffer list for the insert position
struct rhizome_write_buffer **ptr = &write_state->out_of_order;
if (offset + data_size > write_state->file_length)
data_size = write_state->file_length - offset;
if (data_size<=0)
return 0;
int64_t last_offset = write_state->file_offset;
struct rhizome_write_buffer **ptr = &write_state->buffer_list;
int ret=0;
int should_write = 0;
if (write_state->blob_fd>=0){
should_write = 1;
}else{
int64_t new_size = write_state->written_offset + write_state->buffer_size + data_size;
if (new_size>=write_state->file_length || new_size>=RHIZOME_BUFFER_MAXIMUM_SIZE)
should_write = 1;
}
int64_t last_offset = write_state->written_offset;
while(1){
// if existing data can be written, write it now
// can we process this existing data block now?
if (*ptr && (*ptr)->offset == write_state->file_offset){
if (prepare_data(write_state, (*ptr)->data, (*ptr)->data_size)){
ret=-1;
break;
}
continue;
}
// if existing data should be written, do so now
if (should_write && *ptr && (*ptr)->offset == write_state->written_offset){
struct rhizome_write_buffer *n=*ptr;
if (config.debug.rhizome)
DEBUGF("Writing caching block %"PRId64", %d", n->offset, n->data_size);
if (rhizome_write_buffer(write_state, n->data, n->data_size))
return -1;
last_offset = write_state->file_offset;
if (write_get_lock(write_state)){
ret=-1;
break;
}
if (write_data(write_state, n->offset, n->data, n->data_size)){
ret=-1;
break;
}
write_state->buffer_size-=n->data_size;
last_offset = n->offset + n->data_size;
*ptr=n->_next;
free(n);
continue;
}
// skip over incoming data that we've already received
if (offset < last_offset){
int64_t delta = last_offset - offset;
if (delta >= data_size)
return 0;
break;
data_size -= delta;
offset+=delta;
buffer+=delta;
}
if (data_size<=0)
return 0;
break;
// can we process the incoming data block now?
if (data_size>0 && offset == write_state->file_offset){
if (prepare_data(write_state, buffer, data_size)){
ret=-1;
break;
}
continue;
}
if (!*ptr || offset < (*ptr)->offset){
// found the insert position in the list
@ -281,27 +312,35 @@ int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsi
if (*ptr && offset+size > (*ptr)->offset)
size = (*ptr)->offset - offset;
if (offset == write_state->file_offset){
if (rhizome_write_buffer(write_state, buffer, size))
return -1;
if (should_write && offset == write_state->file_offset){
if (write_get_lock(write_state)){
ret=-1;
break;
}
if (write_data(write_state, offset, buffer, size)){
ret=-1;
break;
}
// we need to go around the loop again to re-test if this buffer can now be written
}else{
// impose a limit on the total amount of cached data
if (write_state->total_data_size + size > RHIZOME_BUFFER_MAXIMUM_SIZE)
size = RHIZOME_BUFFER_MAXIMUM_SIZE - write_state->total_data_size;
if (write_state->buffer_size + size > RHIZOME_BUFFER_MAXIMUM_SIZE)
size = RHIZOME_BUFFER_MAXIMUM_SIZE - write_state->buffer_size;
if (size<=0)
return 0;
break;
if (config.debug.rhizome)
DEBUGF("Caching block @%"PRId64", %"PRId64" received out of order", offset, size);
DEBUGF("Caching block @%"PRId64", %"PRId64, offset, size);
struct rhizome_write_buffer *i = emalloc(size + sizeof(struct rhizome_write_buffer));
if (!i)
return -1;
if (!i){
ret=-1;
break;
}
i->offset = offset;
i->buffer_size = i->data_size = size;
bcopy(buffer, i->data, size);
i->_next = *ptr;
write_state->total_data_size += size;
write_state->buffer_size += size;
*ptr = i;
// if there's any overlap of this buffer and the current one, we may need to add another buffer.
ptr = &((*ptr)->_next);
@ -315,7 +354,12 @@ int rhizome_random_write(struct rhizome_write *write_state, int64_t offset, unsi
last_offset = (*ptr)->offset + (*ptr)->data_size;
ptr = &((*ptr)->_next);
}
return 0;
write_release_lock(write_state);
return ret;
}
int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffer, int data_size){
return rhizome_random_write(write_state, write_state->file_offset, buffer, data_size);
}
/* Expects file to be at least file_length in size, ignoring anything longer than that */
@ -371,6 +415,11 @@ int rhizome_fail_write(struct rhizome_write *write){
rhizome_store_delete(write->id);
}
write_release_lock(write);
while(write->buffer_list){
struct rhizome_write_buffer *n=write->buffer_list;
write->buffer_list=n->_next;
free(n);
}
// don't worry too much about sql failures.
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (write->blob_rowid>=0){
@ -385,6 +434,19 @@ int rhizome_fail_write(struct rhizome_write *write){
}
int rhizome_finish_write(struct rhizome_write *write){
if (write->buffer_list){
if (rhizome_random_write(write, 0, NULL, 0))
goto failure;
if (write->buffer_list){
WHYF("Buffer was not cleared");
goto failure;
}
}
if (write->file_offset < write->file_length){
WHYF("Only processed %"PRId64" bytes, expected %"PRId64, write->file_offset, write->file_length);
}
int fd = write->blob_fd;
if (fd>=0){
if (config.debug.externalblobs)

View File

@ -29,7 +29,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MSG_TYPE_REQ 1
#define CACHE_BARS 60
#define BARS_PER_RESPONSE (400/RHIZOME_BAR_BYTES)
#define BARS_PER_RESPONSE ((int)400/RHIZOME_BAR_BYTES)
#define HEAD_FLAG INT64_MAX
@ -187,7 +187,8 @@ int rhizome_sync_bundle_inserted(const unsigned char *bar)
static int sync_cache_bar(struct rhizome_sync *state, unsigned char *bar, uint64_t token)
{
int ret=0;
if (state->bar_count>=CACHE_BARS)
return 0;
// check the database before adding the BAR to the list
if (token!=0 && rhizome_is_bar_interesting(bar)!=0){
bcopy(bar, state->bars[state->bar_count].bar, RHIZOME_BAR_BYTES);
@ -272,11 +273,11 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
// we require the list of BARs to be either ASC or DESC and include BARs for *all* manifests in that range
// TODO stop if we are taking too much CPU time.
int added=0;
for (i=mid_point; i<bar_count && state->bar_count < CACHE_BARS; i++){
for (i=mid_point; i<bar_count; i++){
if (sync_cache_bar(state, bars[i], bar_tokens[i]))
added=1;
}
for (i=mid_point -1; i>=0 && state->bar_count < CACHE_BARS; i--){
for (i=mid_point -1; i>=0; i--){
if (sync_cache_bar(state, bars[i], bar_tokens[i]))
added=1;
}