added buffer when streaming to database so that we don't waste

most of our time opening and closing blobs, and don't have to keep
the blob open all the time.
This commit is contained in:
gardners 2012-12-03 21:47:02 +10:30
parent de7a4ce2e3
commit 43316ade3a

View File

@ -66,6 +66,10 @@ struct rhizome_fetch_slot {
int64_t last_write_time;
#define RHIZOME_BLOB_BUFFER_SIZE 32768
unsigned char blob_buffer[RHIZOME_BLOB_BUFFER_SIZE];
int blob_buffer_bytes;
/* HTTP transport specific elements */
char request[1024];
int request_len;
@ -504,16 +508,21 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
/* TODO We should stream file straight into the database */
if (create_rhizome_import_dir() == -1)
goto bail;
slot->file_len=slot->manifest->fileLength;
slot->rowid=
rhizome_database_create_blob_for(slot->manifest->fileHexHash,
slot->file_len,
RHIZOME_PRIORITY_DEFAULT);
if (slot->rowid<0) {
WHYF_perror("Could not obtain rowid for blob for file '%s'",
alloca_tohex_sid(slot->bid));
goto bail;
if (slot->manifest) {
slot->file_len=slot->manifest->fileLength;
slot->rowid=
rhizome_database_create_blob_for(slot->manifest->fileHexHash,
slot->file_len,
RHIZOME_PRIORITY_DEFAULT);
if (slot->rowid<0) {
WHYF_perror("Could not obtain rowid for blob for file '%s'",
alloca_tohex_sid(slot->bid));
goto bail;
}
} else {
slot->rowid=-1;
}
SHA512_Init(&slot->sha512_context);
if (slot->peer_ipandport.sin_family == AF_INET) {
/* Transfer via HTTP over IPv4 */
@ -550,6 +559,7 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->state = RHIZOME_FETCH_CONNECTING;
slot->file_len = -1;
slot->file_ofs = 0;
slot->blob_buffer_bytes = 0;
/* Watch for activity on the socket */
slot->alarm.function = rhizome_fetch_poll;
fetch_stats.name = "rhizome_fetch_poll";
@ -1066,7 +1076,7 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap);
write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength);
if (0)
if (1)
DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x",
alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid),
slot->file_ofs);
@ -1200,6 +1210,30 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
}
}
int rhizome_fetch_flush_blob_buffer(struct rhizome_fetch_slot *slot)
{
sqlite3_blob *blob;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", slot->rowid, 1 /* read/write */, &blob);
if (ret!=SQLITE_OK) return -1;
ret=sqlite3_blob_write(blob, slot->blob_buffer, slot->blob_buffer_bytes,
slot->file_ofs-slot->blob_buffer_bytes);
if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_write(,,%d,%d) failed, %s",
slot->blob_buffer_bytes,slot->file_ofs-slot->blob_buffer_bytes,
sqlite3_errmsg(rhizome_db));
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d",
slot->blob_buffer_bytes,
slot->file_ofs - slot->blob_buffer_bytes);
rhizome_fetch_close(slot);
return -1;
}
sqlite3_blob_close(blob); blob=NULL;
slot->file_ofs+=slot->blob_buffer_bytes;
slot->blob_buffer_bytes=0;
return 0;
}
int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes)
{
// Truncate to known length of file (handy for reading from journal bundles that
@ -1214,36 +1248,31 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
bcopy(buffer,&slot->manifest_buffer[slot->manifest_bytes],count);
slot->manifest_bytes+=count;
} else {
/* We are reading a file. Stream it into the database. */
sqlite3_blob *blob;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", slot->rowid, 1 /* read/write */, &blob);
if (ret!=SQLITE_OK) return -1;
ret=sqlite3_blob_write(blob, buffer, bytes, slot->file_ofs);
if (ret!=SQLITE_OK) {
WHYF("sqlite3_blob_write(,,%d,%d) failed, %s",
bytes,slot->file_ofs,sqlite3_errmsg(rhizome_db));
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs);
rhizome_fetch_close(slot);
return -1;
}
/* We are reading a file. Stream it into the database. */
SHA512_Update(&slot->sha512_context,(unsigned char *)buffer,bytes);
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("slot->blob_buffer_bytes=%d, slot->file_ofs=%d",
slot->blob_buffer_bytes,slot->file_ofs);
sqlite3_blob_close(blob); blob=NULL;
if (slot->blob_buffer_bytes+bytes>RHIZOME_BLOB_BUFFER_SIZE)
rhizome_fetch_flush_blob_buffer(slot);
bcopy(buffer,&slot->blob_buffer[slot->blob_buffer_bytes],bytes);
slot->blob_buffer_bytes+=bytes;
}
slot->file_ofs+=bytes;
slot->last_write_time=gettime_ms();
if (slot->file_ofs>=slot->file_len) {
/* got all of file */
if (debug & DEBUG_RHIZOME_RX)
// if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Received all of file via rhizome -- now to import it");
if (slot->manifest) {
// Were fetching payload, now we have it.
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&slot->sha512_context, (char *)hash_out);
if (slot->blob_buffer_bytes) rhizome_fetch_flush_blob_buffer(slot);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (strcasecmp(hash_out,slot->manifest->fileHexHash)) {
@ -1301,11 +1330,6 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
rhizome_fetch_close(slot);
return -1;
}
// reset inactivity timeout
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
// slot is still open
return 0;
@ -1371,6 +1395,11 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes > 0) {
rhizome_write_content(slot, buffer, bytes);
// reset inactivity timeout
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
return;
} else {
if (debug & DEBUG_RHIZOME_RX)
@ -1430,6 +1459,12 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
int content_bytes = slot->request + slot->request_len - parts.content_start;
if (content_bytes > 0){
rhizome_write_content(slot, parts.content_start, content_bytes);
// reset inactivity timeout
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
return;
}
}