added speed indication on completion of rhizome transfer.

split files table into files and fileblobs to avoid pathological
delays when updating datavalid flag in files table (sqlite copies
the whole table row, including possibly large blob).
This commit is contained in:
gardners 2012-12-04 16:03:56 +10:30
parent 23cace8455
commit fc0c134cbf
2 changed files with 88 additions and 32 deletions

View File

@ -206,8 +206,9 @@ int rhizome_opendb()
/* Create tables as required */
sqlite_exec_void_loglevel(loglevel, "PRAGMA auto_vacuum=2;");
if ( sqlite_exec_void("CREATE TABLE IF NOT EXISTS GROUPLIST(id text not null primary key, closed integer,ciphered integer,priority integer);") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS MANIFESTS(id text not null primary key, manifest blob, version integer,inserttime integer, bar blob, filesize integer, filehash text, author text);") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS FILES(id text not null primary key, data blob, length integer, highestpriority integer, datavalid integer, inserttime integer);") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS MANIFESTS(id text not null primary key, version integer,inserttime integer, filesize integer, filehash text, author text, bar blob, manifest blob);") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS FILES(id text not null primary key, length integer, highestpriority integer, datavalid integer, inserttime integer);") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS FILEBLOBS(id text not null primary key, data blob);") == -1
|| sqlite_exec_void("DROP TABLE IF EXISTS FILEMANIFESTS;") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS GROUPMEMBERSHIPS(manifestid text not null, groupid text not null);") == -1
|| sqlite_exec_void("CREATE TABLE IF NOT EXISTS VERIFICATIONS(sid text not null, did text, name text, starttime integer, endtime integer, signature blob);") == -1
@ -222,6 +223,7 @@ int rhizome_opendb()
/* Clean out half-finished entries from the database */
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM MANIFESTS WHERE filehash IS NULL;");
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM FILES WHERE NOT EXISTS( SELECT 1 FROM MANIFESTS WHERE MANIFESTS.filehash = FILES.id);");
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM FILEBLOBS WHERE NOT EXISTS( SELECT 1 FROM MANIFESTS WHERE FILEBLOBS.id = FILES.id);");
sqlite_exec_void_loglevel(LOG_LEVEL_WARN, "DELETE FROM MANIFESTS WHERE filehash != '' AND NOT EXISTS( SELECT 1 FROM FILES WHERE MANIFESTS.filehash = FILES.id);");
RETURN(0);
sqlite_exec_void("DELETE FROM FILES WHERE datavalid=0;");
@ -673,8 +675,10 @@ int rhizome_drop_stored_file(const char *id,int maximum_priority)
}
}
sqlite3_finalize(statement);
if (can_drop)
if (can_drop) {
sqlite_exec_void_retry(&retry, "delete from files where id='%s';",id);
sqlite_exec_void_retry(&retry, "delete from fileblobs where id='%s';",id);
}
return 0;
}
@ -770,16 +774,14 @@ int rhizome_store_bundle(rhizome_manifest *m)
// we might need to leave the old file around for a bit
// clean out unreferenced files first
if ((stmt = sqlite_prepare(&retry, "DELETE FROM FILES WHERE inserttime < ? AND NOT EXISTS( SELECT 1 FROM MANIFESTS WHERE MANIFESTS.filehash = FILES.id);")) == NULL)
goto rollback;
if (!sqlite_code_ok(sqlite3_bind_int64(stmt, 1, (long long)(gettime_ms() - 60000)))) {
WHYF("query failed, %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(stmt));
if (sqlite_exec_void("DELETE FROM FILES WHERE inserttime < ? AND NOT EXISTS( SELECT 1 FROM MANIFESTS WHERE MANIFESTS.filehash = FILES.id);")) {
WHYF("delete failed, %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(stmt));
goto rollback;
}
if (sqlite_step_retry(&retry, stmt) == -1)
if (sqlite_exec_void("DELETE FROM FILEBLOBS WHERE NOT EXISTS ( SELECT 1 FROM FILES WHERE FILES.id = FILEBLOBS.id );")) {
WHYF("delete failed, %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(stmt));
goto rollback;
sqlite3_finalize(stmt);
stmt = NULL;
}
if (rhizome_manifest_get(m,"isagroup",NULL,0)!=NULL) {
int closed=rhizome_manifest_get_ll(m,"closedgroup");
@ -952,6 +954,7 @@ int64_t rhizome_database_create_blob_for(const char *hashhex,int64_t fileLength,
/* Okay, so there are no records that match, but we should delete any half-baked record (with datavalid=0) so that the insert below doesn't fail.
Don't worry about the return result, since it might not delete any records. */
sqlite_exec_void("DELETE FROM FILES WHERE id='%s' AND datavalid=0;",hashhex);
sqlite_exec_void("DELETE FROM FILEBLOBS WHERE id='%s';",hashhex);
/* INSERT INTO FILES(id as text, data blob, length integer, highestpriority integer).
BUT, we have to do this incrementally so that we can handle blobs larger than available memory.
@ -963,15 +966,24 @@ int64_t rhizome_database_create_blob_for(const char *hashhex,int64_t fileLength,
*/
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "INSERT OR REPLACE INTO FILES(id,data,length,highestpriority,datavalid,inserttime) VALUES('%s',?,%lld,%d,0,%lld);",
hashhex, (long long)fileLength, priority, (long long)gettime_ms()
);
DEBUGF("INSERT OR REPLACE INTO FILES(id,data,length,highestpriority,datavalid,inserttime) VALUES('%s',?,%lld,%d,0,%lld);",
int ret=sqlite_exec_void("INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES('%s',%lld,%d,0,%lld);",
hashhex, (long long)fileLength, priority, (long long)gettime_ms()
);
if (ret!=SQLITE_OK) {
DEBUGF("insert or replace into files ... failed: %s",
sqlite3_errmsg(rhizome_db));
goto insert_row_fail;
}
DEBUGF("INSERT OR REPLACE INTO FILES(id,length,highestpriority,datavalid,inserttime) VALUES('%s',%lld,%d,0,%lld);",
hashhex, (long long)fileLength, priority, (long long)gettime_ms()
);
sqlite3_stmt *statement = sqlite_prepare(&retry,"INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",hashhex);
DEBUGF("INSERT OR REPLACE INTO FILEBLOBS(id,data) VALUES('%s',?)",hashhex);
if (!statement)
goto insert_row_fail;
/* Bind appropriate sized zero-filled blob to data field */
if (sqlite3_bind_zeroblob(statement, 1, fileLength) != SQLITE_OK) {
WHYF("sqlite3_bind_zeroblob() failed: %s: %s", sqlite3_errmsg(rhizome_db), sqlite3_sql(statement));
@ -1078,7 +1090,7 @@ int rhizome_store_file(rhizome_manifest *m,const unsigned char *key)
goto error;
sqlite3_blob *blob;
int ret;
do ret = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", rowid, 1 /* read/write */, &blob);
do ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", rowid, 1 /* read/write */, &blob);
while (sqlite_code_busy(ret) && sqlite_retry(&retry, "sqlite3_blob_open"));
if (ret != SQLITE_OK) {
WHYF("sqlite3_blob_open() failed, %s", sqlite3_errmsg(rhizome_db));
@ -1529,7 +1541,7 @@ int rhizome_retrieve_file(const char *fileid, const char *filepath, const unsign
return 0;
}
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT id, rowid, length FROM files WHERE id = ? AND datavalid != 0");
sqlite3_stmt *statement = sqlite_prepare(&retry, "SELECT id FROM fileblobs WHERE (SELECT 1 FROM files WHERE FILEBLOBS.id = FILES.id AND id = ? AND datavalid != 0)");
if (!statement)
return -1;
int ret = 0;
@ -1552,7 +1564,7 @@ int rhizome_retrieve_file(const char *fileid, const char *filepath, const unsign
int64_t rowid = sqlite3_column_int64(statement, 1);
sqlite3_blob *blob = NULL;
int code;
do code = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", rowid, 0 /* read only */, &blob);
do code = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", rowid, 0 /* read only */, &blob);
while (sqlite_code_busy(code) && sqlite_retry(&retry, "sqlite3_blob_open"));
if (!sqlite_code_ok(code)) {
ret = WHY("Could not open blob for reading");

View File

@ -65,9 +65,11 @@ struct rhizome_fetch_slot {
int64_t file_ofs;
int64_t last_write_time;
int64_t start_time;
#define RHIZOME_BLOB_BUFFER_SIZE 32768
unsigned char blob_buffer[RHIZOME_BLOB_BUFFER_SIZE];
#define RHIZOME_BLOB_BUFFER_MAXIMUM_SIZE (2*1024)
int blob_buffer_size;
unsigned char *blob_buffer;
int blob_buffer_bytes;
/* HTTP transport specific elements */
@ -495,7 +497,8 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m)
m->finalised = 1;
m->manifest_bytes = m->manifest_all_bytes; // store the signatures too
if (debug & DEBUG_RHIZOME_RX) {
DEBUGF("manifest len=%d has %d signatories", m->manifest_bytes, m->sig_count);
DEBUGF("manifest len=%d has %d signatories. Associated file = %lld bytes",
m->manifest_bytes, m->sig_count,(long long)m->fileLength);
dump("manifest", m->manifestdata, m->manifest_all_bytes);
}
return rhizome_bundle_import(m, m->ttl - 1 /* TTL */);
@ -506,6 +509,7 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
int sock = -1;
/* TODO Don't forget to implement resume */
/* TODO We should stream file straight into the database */
slot->start_time=gettime_ms();
if (create_rhizome_import_dir() == -1)
goto bail;
if (slot->manifest) {
@ -559,6 +563,12 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->file_len = -1;
slot->file_ofs = 0;
slot->blob_buffer_bytes = 0;
if (slot->blob_buffer) {
free(slot->blob_buffer);
slot->blob_buffer=NULL;
}
slot->blob_buffer_size=0;
SHA512_Init(&slot->sha512_context);
/* Watch for activity on the socket */
slot->alarm.function = rhizome_fetch_poll;
@ -1015,6 +1025,10 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
rhizome_manifest_free(slot->manifest);
slot->manifest = NULL;
if (slot->blob_buffer) free(slot->blob_buffer);
slot->blob_buffer=NULL;
slot->blob_buffer_size=0;
// Release the fetch slot.
slot->state = RHIZOME_FETCH_FREE;
@ -1168,7 +1182,8 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
which is the block size we will use. 200bytes allows for several blocks
to fit into a packet, and probably fit at least one any any outgoing packet
that is not otherwise full. */
slot->file_len=slot->manifest->fileLength;
slot->file_len=slot->manifest->fileLength;
slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds
slot->mdpRXBitmap=0x00000000; // no blocks received yet
slot->mdpRXBlockLength=500; // 200;
@ -1215,7 +1230,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
int rhizome_fetch_flush_blob_buffer(struct rhizome_fetch_slot *slot)
{
sqlite3_blob *blob;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", slot->rowid, 1 /* read/write */, &blob);
int ret = sqlite3_blob_open(rhizome_db, "main", "FILEBLOBS", "data", slot->rowid, 1 /* read/write */, &blob);
if (ret!=SQLITE_OK) {
if (blob) sqlite3_blob_close(blob);
return -1;
@ -1263,10 +1278,26 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
dump("buffer",buffer,bytes);
}
if (slot->blob_buffer_bytes+bytes>RHIZOME_BLOB_BUFFER_SIZE)
rhizome_fetch_flush_blob_buffer(slot);
bcopy(buffer,&slot->blob_buffer[slot->blob_buffer_bytes],bytes);
slot->blob_buffer_bytes+=bytes;
if (!slot->blob_buffer_size) {
/* Allocate an appropriately sized buffer so that we don't have to pay
the cost of blob_open() and blob_close() too often. */
slot->blob_buffer_size=slot->file_len;
if (slot->blob_buffer_size>RHIZOME_BLOB_BUFFER_MAXIMUM_SIZE)
slot->blob_buffer_size=RHIZOME_BLOB_BUFFER_MAXIMUM_SIZE;
slot->blob_buffer=malloc(slot->blob_buffer_size);
assert(slot->blob_buffer);
}
assert(slot->blob_buffer_size);
int bytesRemaining=bytes;
while(bytesRemaining>0) {
int count=slot->blob_buffer_size-slot->blob_buffer_bytes;
if (count>bytes) count=bytesRemaining;
bcopy(buffer,&slot->blob_buffer[slot->blob_buffer_bytes],count);
buffer+=count; bytesRemaining-=count;
if (slot->blob_buffer_bytes==slot->blob_buffer_size)
rhizome_fetch_flush_blob_buffer(slot);
}
}
slot->file_ofs+=bytes;
@ -1274,12 +1305,12 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
if (slot->file_ofs>=slot->file_len) {
/* got all of file */
// if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Received all of file via rhizome -- now to import it");
DEBUGF("Received all of file via rhizome -- now to import it");
if (slot->manifest) {
// Were fetching payload, now we have it.
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&slot->sha512_context, (char *)hash_out);
SHA512_End(&slot->sha512_context, (char *)hash_out);
if (slot->blob_buffer_bytes) rhizome_fetch_flush_blob_buffer(slot);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
@ -1289,18 +1320,28 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
DEBUGF("Expected hash=%s, got %s",
slot->manifest->fileHexHash,hash_out);
sqlite_exec_void_retry(&retry,
"DELETE FROM FILES WHERE rowid=%lld",slot->rowid);
"DELETE FROM FILEBLOBS WHERE rowid=%lld",slot->rowid);
sqlite_exec_void_retry(&retry,
"DELETE FROM FILES WHERE id='%s'",
slot->manifest->fileHexHash);
rhizome_fetch_close(slot);
return -1;
} else {
INFOF("Updating row status: UPDATE FILES SET datavalid=1 WHERE id='%s'",
slot->manifest->fileHexHash);
time_ms_t start=gettime_ms();
int ret=sqlite_exec_void_retry(&retry,
"UPDATE FILES SET datavalid=1 WHERE rowid=%lld",
slot->rowid);
"UPDATE FILES SET datavalid=1 WHERE id='%s'",
slot->manifest->fileHexHash);
if (ret!=SQLITE_OK)
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("error marking row valid: %s",sqlite3_errmsg(rhizome_db));
INFOF("Updated row status (took %lldms)",(long long)gettime_ms()-start);
}
INFOF("Calling rhizome_import_received_bundle() m->fileLength=%lld",
slot->manifest->fileLength);
if (!rhizome_import_received_bundle(slot->manifest)){
if (slot->state==RHIZOME_FETCH_RXFILE) {
char buf[INET_ADDRSTRLEN];
@ -1336,7 +1377,10 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
}
}
}
DEBUGF("Closing rhizome fetch slot = 0x%p",slot);
DEBUGF("Closing rhizome fetch slot = 0x%p. Received %lld bytes in %lldms (%lldKB/sec). Buffer size = %d",
slot,(long long)slot->file_ofs,(long long)gettime_ms()-slot->start_time,
(long long)slot->file_ofs/(gettime_ms()-slot->start_time),
slot->blob_buffer_size);
rhizome_fetch_close(slot);
return -1;
}