From 43316ade3a08d687f0578271061892b75ad84c34 Mon Sep 17 00:00:00 2001 From: gardners Date: Mon, 3 Dec 2012 21:47:02 +1030 Subject: [PATCH] added buffer when streaming to database so that we don't waste most of our time opening and closing blobs, and don't have to keep the blob open all the time. --- rhizome_fetch.c | 97 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 31 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index cdab7215..2842b737 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -66,6 +66,10 @@ struct rhizome_fetch_slot { int64_t last_write_time; +#define RHIZOME_BLOB_BUFFER_SIZE 32768 + unsigned char blob_buffer[RHIZOME_BLOB_BUFFER_SIZE]; + int blob_buffer_bytes; + /* HTTP transport specific elements */ char request[1024]; int request_len; @@ -504,16 +508,21 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) /* TODO We should stream file straight into the database */ if (create_rhizome_import_dir() == -1) goto bail; - slot->file_len=slot->manifest->fileLength; - slot->rowid= - rhizome_database_create_blob_for(slot->manifest->fileHexHash, - slot->file_len, - RHIZOME_PRIORITY_DEFAULT); - if (slot->rowid<0) { - WHYF_perror("Could not obtain rowid for blob for file '%s'", - alloca_tohex_sid(slot->bid)); - goto bail; + if (slot->manifest) { + slot->file_len=slot->manifest->fileLength; + slot->rowid= + rhizome_database_create_blob_for(slot->manifest->fileHexHash, + slot->file_len, + RHIZOME_PRIORITY_DEFAULT); + if (slot->rowid<0) { + WHYF_perror("Could not obtain rowid for blob for file '%s'", + alloca_tohex_sid(slot->bid)); + goto bail; + } + } else { + slot->rowid=-1; } + SHA512_Init(&slot->sha512_context); if (slot->peer_ipandport.sin_family == AF_INET) { /* Transfer via HTTP over IPv4 */ @@ -550,6 +559,7 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) slot->state = RHIZOME_FETCH_CONNECTING; slot->file_len = -1; slot->file_ofs = 0; + slot->blob_buffer_bytes = 0; /* Watch for activity on the socket */ slot->alarm.function = rhizome_fetch_poll; fetch_stats.name = "rhizome_fetch_poll"; @@ -1066,7 +1076,7 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); - if (0) + if (1) DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), slot->file_ofs); @@ -1200,6 +1210,30 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) } } +int rhizome_fetch_flush_blob_buffer(struct rhizome_fetch_slot *slot) +{ + sqlite3_blob *blob; + int ret = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", slot->rowid, 1 /* read/write */, &blob); + if (ret!=SQLITE_OK) return -1; + ret=sqlite3_blob_write(blob, slot->blob_buffer, slot->blob_buffer_bytes, + slot->file_ofs-slot->blob_buffer_bytes); + if (ret!=SQLITE_OK) { + WHYF("sqlite3_blob_write(,,%d,%d) failed, %s", + slot->blob_buffer_bytes,slot->file_ofs-slot->blob_buffer_bytes, + sqlite3_errmsg(rhizome_db)); + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Failed to write %d bytes to file @ offset %d", + slot->blob_buffer_bytes, + slot->file_ofs - slot->blob_buffer_bytes); + rhizome_fetch_close(slot); + return -1; + } + sqlite3_blob_close(blob); blob=NULL; + slot->file_ofs+=slot->blob_buffer_bytes; + slot->blob_buffer_bytes=0; + return 0; +} + int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) { // Truncate to known length of file (handy for reading from journal bundles that @@ -1214,36 +1248,31 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt bcopy(buffer,&slot->manifest_buffer[slot->manifest_bytes],count); slot->manifest_bytes+=count; } else { - /* We are reading a file. Stream it into the database. */ - sqlite3_blob *blob; - int ret = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", slot->rowid, 1 /* read/write */, &blob); - if (ret!=SQLITE_OK) return -1; - ret=sqlite3_blob_write(blob, buffer, bytes, slot->file_ofs); - if (ret!=SQLITE_OK) { - WHYF("sqlite3_blob_write(,,%d,%d) failed, %s", - bytes,slot->file_ofs,sqlite3_errmsg(rhizome_db)); - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs); - rhizome_fetch_close(slot); - return -1; - } - + /* We are reading a file. Stream it into the database. */ SHA512_Update(&slot->sha512_context,(unsigned char *)buffer,bytes); + + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("slot->blob_buffer_bytes=%d, slot->file_ofs=%d", + slot->blob_buffer_bytes,slot->file_ofs); - sqlite3_blob_close(blob); blob=NULL; + if (slot->blob_buffer_bytes+bytes>RHIZOME_BLOB_BUFFER_SIZE) + rhizome_fetch_flush_blob_buffer(slot); + bcopy(buffer,&slot->blob_buffer[slot->blob_buffer_bytes],bytes); + slot->blob_buffer_bytes+=bytes; } slot->file_ofs+=bytes; slot->last_write_time=gettime_ms(); if (slot->file_ofs>=slot->file_len) { /* got all of file */ - if (debug & DEBUG_RHIZOME_RX) + // if (debug & DEBUG_RHIZOME_RX) DEBUGF("Received all of file via rhizome -- now to import it"); if (slot->manifest) { // Were fetching payload, now we have it. char hash_out[SHA512_DIGEST_STRING_LENGTH+1]; SHA512_End(&slot->sha512_context, (char *)hash_out); + if (slot->blob_buffer_bytes) rhizome_fetch_flush_blob_buffer(slot); sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; if (strcasecmp(hash_out,slot->manifest->fileHexHash)) { @@ -1301,11 +1330,6 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt rhizome_fetch_close(slot); return -1; } - // reset inactivity timeout - unschedule(&slot->alarm); - slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; - slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&slot->alarm); // slot is still open return 0; @@ -1371,6 +1395,11 @@ void rhizome_fetch_poll(struct sched_ent *alarm) /* If we got some data, see if we have found the end of the HTTP request */ if (bytes > 0) { rhizome_write_content(slot, buffer, bytes); + // reset inactivity timeout + unschedule(&slot->alarm); + slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); return; } else { if (debug & DEBUG_RHIZOME_RX) @@ -1430,6 +1459,12 @@ void rhizome_fetch_poll(struct sched_ent *alarm) int content_bytes = slot->request + slot->request_len - parts.content_start; if (content_bytes > 0){ rhizome_write_content(slot, parts.content_start, content_bytes); + // reset inactivity timeout + unschedule(&slot->alarm); + slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + return; } }