stream manifest to buffer, and files to blob in database instead

of via files.
NOT COMPLETE - needs to calculate progressive hash and decide
if file is ok at end of import.
This commit is contained in:
gardners 2012-12-03 16:52:08 +10:30
parent b39c71b206
commit e08593b466
2 changed files with 48 additions and 40 deletions

View File

@ -432,6 +432,8 @@ struct http_response {
int rhizome_received_content(unsigned char *bidprefix,uint64_t version,
uint64_t offset,int count,unsigned char *bytes,
int type);
int64_t rhizome_database_create_blob_for(const char *hashhex,int64_t fileLength,
int priority);
int rhizome_server_set_response(rhizome_http_request *r, const struct http_response *h);
int rhizome_server_free_http_request(rhizome_http_request *r);
int rhizome_server_http_send_bytes(rhizome_http_request *r);

View File

@ -57,8 +57,7 @@ struct rhizome_fetch_slot {
#define RHIZOME_FETCH_RXHTTPHEADERS 3
#define RHIZOME_FETCH_RXFILE 4
#define RHIZOME_FETCH_RXFILEMDP 5
FILE *file;
char filename[1024];
int64_t rowid;
int64_t file_len;
int64_t file_ofs;
int64_t last_write_time;
@ -68,6 +67,10 @@ struct rhizome_fetch_slot {
int request_len;
int request_ofs;
/* HTTP streaming reception of manifests */
char manifest_buffer[1024];
int manifest_bytes;
/* MDP transport specific elements */
unsigned char bid[RHIZOME_MANIFEST_ID_BYTES];
int64_t bidVersion;
@ -493,13 +496,17 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m)
static int schedule_fetch(struct rhizome_fetch_slot *slot)
{
int sock = -1;
FILE *file = NULL;
/* TODO Don't forget to implement resume */
/* TODO We should stream file straight into the database */
if (create_rhizome_import_dir() == -1)
goto bail;
if ((file = fopen(slot->filename, "w")) == NULL) {
WHYF_perror("fopen(`%s`, \"w\")", slot->filename);
slot->file_len=slot->manifest->fileLength;
slot->rowid=rhizome_database_create_blob_for(alloca_tohex_sid(slot->bid),
slot->file_len,
RHIZOME_PRIORITY_DEFAULT);
if (slot->rowid<0) {
WHYF_perror("Could not obtain rowid for blob for file '%s'",
alloca_tohex_sid(slot->bid));
goto bail;
}
if (slot->peer_ipandport.sin_family == AF_INET) {
@ -535,7 +542,6 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->alarm.poll.fd = sock;
slot->request_ofs = 0;
slot->state = RHIZOME_FETCH_CONNECTING;
slot->file = file;
slot->file_len = -1;
slot->file_ofs = 0;
/* Watch for activity on the socket */
@ -562,10 +568,6 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
bail:
if (sock != -1)
close(sock);
if (file != NULL) {
fclose(file);
unlink(slot->filename);
}
return -1;
}
@ -728,13 +730,12 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
alloca_tohex_bid(slot->bid),slot->bidVersion);
slot->bidP=1;
if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid))
return -1;
m->dataFileName = strdup(slot->filename);
/* Don't provide a filename, because we will stream the file straight into
the database. */
m->dataFileName = NULL;
m->dataFileUnlinkOnFree = 0;
slot->manifest = m;
if (schedule_fetch(slot) == -1) {
slot->filename[0] = '\0';
return -1;
}
if (debug & DEBUG_RHIZOME_RX)
@ -771,10 +772,14 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip,
slot->prefix_length=prefix_length;
slot->bidP=0;
if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length)))
return -1;
/* Don't stream into a file blob in the database, because it is a manifest.
We do need to cache it in the slot structure, though, and then offer it
for inserting into the database, but we can avoid the temporary file in
the process. */
slot->rowid=-1;
slot->manifest_bytes=0;
if (schedule_fetch(slot) == -1) {
slot->filename[0] = '\0';
return -1;
}
return STARTED;
@ -988,15 +993,9 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
slot->alarm.function=NULL;
/* Free ephemeral data */
if (slot->file)
fclose(slot->file);
slot->file = NULL;
if (slot->manifest)
rhizome_manifest_free(slot->manifest);
slot->manifest = NULL;
if (slot->filename[0])
unlink(slot->filename);
slot->filename[0] = '\0';
// Release the fetch slot.
slot->state = RHIZOME_FETCH_FREE;
@ -1198,20 +1197,35 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
{
if (bytes>(slot->file_len-slot->file_ofs))
bytes=slot->file_len-slot->file_ofs;
if (fwrite(buffer,bytes,1,slot->file) != 1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs);
rhizome_fetch_close(slot);
if (slot->rowid==-1) {
/* We are reading a manifest. Read it into a buffer. */
int count=bytes;
if (count+slot->manifest_bytes>1024) count=1024-slot->manifest_bytes;
bcopy(buffer,&slot->manifest_buffer[slot->manifest_bytes],count);
slot->manifest_bytes+=count;
} else {
/* We are reading a file. Stream it into the database. */
sqlite3_blob *blob;
int ret = sqlite3_blob_open(rhizome_db, "main", "FILES", "data", slot->rowid, 1 /* read/write */, &blob);
if (ret!=SQLITE_OK) return -1;
ret=sqlite3_blob_write(blob, buffer, bytes, slot->file_ofs);
if (ret) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs);
rhizome_fetch_close(slot);
return -1;
}
DEBUGF("XXX - Do progressive hash of content here");
return -1;
}
slot->file_ofs+=bytes;
slot->last_write_time=gettime_ms();
if (slot->file_ofs>=slot->file_len) {
/* got all of file */
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Received all of file via rhizome -- now to import it");
fclose(slot->file);
slot->file = NULL;
if (slot->manifest) {
// Were fetching payload, now we have it.
if (!rhizome_import_received_bundle(slot->manifest)){
@ -1236,8 +1250,9 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int byt
call schedule queued items. */
rhizome_manifest *m = rhizome_new_manifest();
if (m) {
if (rhizome_read_manifest_file(m, slot->filename, 0) == -1) {
DEBUGF("Couldn't read manifest from %s",slot->filename);
if (rhizome_read_manifest_file(m, slot->manifest_buffer,
slot->manifest_bytes) == -1) {
DEBUGF("Couldn't read manifest");
rhizome_manifest_free(m);
} else {
DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic));
@ -1274,15 +1289,6 @@ int rhizome_received_content(unsigned char *bidprefix,
if (!bcmp(slot->bid,bidprefix,16))
{
if (slot->file_ofs==offset) {
/* We don't know the file length until we receive the last
block. If it isn't the last block, lie, and claim the end of
file is yet to come. */
if (type=='T') slot->file_len=offset+count;
else {
slot->file_len=offset+count+1;
}
if (!rhizome_write_content(slot,(char *)bytes,count))
{
slot->mdpResponsesOutstanding--;