first cut at progressive hashing and checking of hash when streaming

file into database via rhizome.
This commit is contained in:
gardners 2012-12-03 17:05:12 +10:30
parent e08593b466
commit 5d24ba7db9

View File

@ -57,9 +57,13 @@ struct rhizome_fetch_slot {
#define RHIZOME_FETCH_RXHTTPHEADERS 3 #define RHIZOME_FETCH_RXHTTPHEADERS 3
#define RHIZOME_FETCH_RXFILE 4 #define RHIZOME_FETCH_RXFILE 4
#define RHIZOME_FETCH_RXFILEMDP 5 #define RHIZOME_FETCH_RXFILEMDP 5
/* Keep track of how much of the file we have read */
SHA512_CTX sha512_context;
int64_t rowid; int64_t rowid;
int64_t file_len; int64_t file_len;
int64_t file_ofs; int64_t file_ofs;
int64_t last_write_time; int64_t last_write_time;
/* HTTP transport specific elements */ /* HTTP transport specific elements */
@ -509,6 +513,7 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
alloca_tohex_sid(slot->bid)); alloca_tohex_sid(slot->bid));
goto bail; goto bail;
} }
SHA512_Init(&slot->sha512_context);
if (slot->peer_ipandport.sin_family == AF_INET) { if (slot->peer_ipandport.sin_family == AF_INET) {
/* Transfer via HTTP over IPv4 */ /* Transfer via HTTP over IPv4 */
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
@ -1195,6 +1200,8 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes)
{ {
// Truncate to known length of file (handy for reading from journal bundles that
// might grow while we are reading from them).
if (bytes>(slot->file_len-slot->file_ofs)) if (bytes>(slot->file_len-slot->file_ofs))
bytes=slot->file_len-slot->file_ofs; bytes=slot->file_len-slot->file_ofs;
@ -1216,8 +1223,10 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
rhizome_fetch_close(slot); rhizome_fetch_close(slot);
return -1; return -1;
} }
DEBUGF("XXX - Do progressive hash of content here");
return -1; SHA512_Update(&slot->sha512_context,(unsigned char *)buffer,bytes);
sqlite3_blob_close(blob); blob=NULL;
} }
slot->file_ofs+=bytes; slot->file_ofs+=bytes;
@ -1228,6 +1237,27 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
DEBUGF("Received all of file via rhizome -- now to import it"); DEBUGF("Received all of file via rhizome -- now to import it");
if (slot->manifest) { if (slot->manifest) {
// Were fetching payload, now we have it. // Were fetching payload, now we have it.
char hash_out[SHA512_DIGEST_STRING_LENGTH+1];
SHA512_End(&slot->sha512_context, (char *)hash_out);
DEBUGF("Hash of received file = %s",hash_out);
DEBUGF("Hash I was looking for = %s",slot->manifest->fileHexHash);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (strcasecmp(hash_out,slot->manifest->fileHexHash)) {
DEBUGF("Hash mismatch -- dropping row from table.");
sqlite_exec_void_retry(&retry,
"DROP FROM FILES WHERE rowid=%lld",slot->rowid);
rhizome_fetch_close(slot);
return -1;
} else {
sqlite_exec_void_retry(&retry,
"UPDATE FILES SET datavalid=1 WHERE rowid=%lld",
slot->rowid);
DEBUGF("Marked row valid");
}
if (!rhizome_import_received_bundle(slot->manifest)){ if (!rhizome_import_received_bundle(slot->manifest)){
if (slot->state==RHIZOME_FETCH_RXFILE) { if (slot->state==RHIZOME_FETCH_RXFILE) {
char buf[INET_ADDRSTRLEN]; char buf[INET_ADDRSTRLEN];