Fix bugs revealed by Rhizome stress test

Overhauled the file fetch queue logic in rhizome_fetch.c.

Now the 'rhizomeprotocol' stress test passes in approximately 5 minutes on my
2009-vintage Dell laptop.

Added a call to rhizome_enqueue_suggestions() in rhizome_fetch_close() so that
a new Rhizome GET request is sent as soon as a fetch slot becomes free, instead
of waiting for the (default 5 second) timer to trigger the next GET.
This commit is contained in:
Andrew Bettison 2012-10-18 17:30:22 +10:30
parent 877f76557a
commit 65ea612e27
4 changed files with 274 additions and 291 deletions

View File

@ -117,6 +117,8 @@ typedef struct rhizome_manifest {
int fileHighestPriority;
/* Absolute path of the file associated with the manifest */
char *dataFileName;
/* If set, unlink(2) the associated file when freeing the manifest */
int dataFileUnlinkOnFree;
/* Whether the paylaod is encrypted or not */
int payloadEncryption;
@ -281,7 +283,7 @@ int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found,
int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar);
long long rhizome_bar_version(unsigned char *bar);
unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar);
int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip, int *manifest_kept);
int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip, int *manifest_kept);
int rhizome_list_manifests(const char *service, const char *sender_sid, const char *recipient_sid, int limit, int offset);
int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest **mp);
int rhizome_retrieve_file(const char *fileid, const char *filepath,
@ -520,11 +522,8 @@ extern unsigned char favicon_bytes[];
extern int favicon_len;
int rhizome_import_from_files(const char *manifestpath,const char *filepath);
int rhizome_fetch_request_manifest_by_prefix(struct sockaddr_in *peerip,
unsigned char *prefix,
int prefix_length,
int importP);
extern int rhizome_file_fetch_queue_count;
int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length);
int rhizome_count_queued_imports();
struct http_response_parts {
int code;

View File

@ -525,8 +525,12 @@ void _rhizome_manifest_free(struct __sourceloc __whence, rhizome_manifest *m)
m->signatories[i]=NULL;
}
if (m->dataFileName) free(m->dataFileName);
m->dataFileName=NULL;
if (m->dataFileName) {
if (m->dataFileUnlinkOnFree && unlink(m->dataFileName) == -1)
WARNF_perror("unlink(%s)", alloca_str_toprint(m->dataFileName));
free(m->dataFileName);
m->dataFileName = NULL;
}
manifest_free[mid]=1;
manifest_free_whence[mid]=__whence;

View File

@ -876,14 +876,13 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
Then as noted above, we can use that to pull the file down using
existing routines.
*/
if (!rhizome_fetch_request_manifest_by_prefix
(&addr,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES,
1 /* import, getting file if needed */))
if (!rhizome_fetch_request_manifest_by_prefix(&addr,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES))
{
/* Fetching the manifest, and then using it to see if we want to
fetch the file for import is all handled asynchronously, so just
wait for it to finish. */
while(rhizome_file_fetch_queue_count) fd_poll();
while (rhizome_count_queued_imports())
fd_poll();
}
} else if (type==1&&r->pushP) {

View File

@ -31,7 +31,6 @@ extern int sigIoFlag;
typedef struct rhizome_file_fetch_record {
struct sched_ent alarm;
rhizome_manifest *manifest;
char fileid[RHIZOME_FILEHASH_STRLEN + 1];
FILE *file;
char filename[1024];
char request[1024];
@ -40,6 +39,7 @@ typedef struct rhizome_file_fetch_record {
long long file_len;
long long file_ofs;
int state;
#define RHIZOME_FETCH_FREE 0
#define RHIZOME_FETCH_CONNECTING 1
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 2
#define RHIZOME_FETCH_RXHTTPHEADERS 3
@ -47,12 +47,41 @@ typedef struct rhizome_file_fetch_record {
struct sockaddr_in peer;
} rhizome_file_fetch_record;
struct profile_total fetch_stats;
/* List of queued transfers */
#define MAX_QUEUED_FILES 4
rhizome_file_fetch_record file_fetch_queue[MAX_QUEUED_FILES];
#define QUEUED (0)
#define QSAMEPAYLOAD (1)
#define QDATED (2)
#define QBUSY (3)
#define QSAME (4)
#define QOLDER (5)
#define QNEWER (6)
#define QIMPORTED (7)
int rhizome_count_queued_imports()
{
int count = 0;
int i;
for (i = 0; i < MAX_QUEUED_FILES; ++i)
if (file_fetch_queue[i].state != RHIZOME_FETCH_FREE)
++count;
return count;
}
static rhizome_file_fetch_record *rhizome_find_import_slot()
{
// Find a free fetch queue slot.
int i;
for (i = 0; i < MAX_QUEUED_FILES; ++i)
if (file_fetch_queue[i].state == RHIZOME_FETCH_FREE)
return &file_fetch_queue[i];
return NULL;
}
struct profile_total fetch_stats;
/*
Queue a manifest for importing.
@ -443,7 +472,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_i
if (list_version >= m->version) {
/* this version is older than the one in the list, so don't list this one */
rhizome_manifest_free(m);
RETURN(0);
RETURN(0);
} else {
/* replace listed version with this newer version */
if (rhizome_manifest_verify(m)) {
@ -518,21 +547,20 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_i
void rhizome_enqueue_suggestions(struct sched_ent *alarm)
{
int i;
for(i=0;i<candidate_count;i++)
{
int manifest_kept = 0;
if (rhizome_queue_manifest_import(candidates[i].manifest,&candidates[i].peer, &manifest_kept) == )
break;
if (!manifest_kept) {
rhizome_manifest_free(candidates[i].manifest);
candidates[i].manifest = NULL;
}
for (i = 0; i < candidate_count; ++i) {
int manifest_kept = 0;
int result = rhizome_queue_manifest_import(candidates[i].manifest, &candidates[i].peer, &manifest_kept);
if (result == QBUSY)
break;
if (!manifest_kept) {
rhizome_manifest_free(candidates[i].manifest);
candidates[i].manifest = NULL;
}
}
if (i) {
/* now shuffle up */
int bytes=(candidate_count-i)*sizeof(rhizome_candidates);
if (0) DEBUGF("Moving slot %d to slot 0 (%d bytes = %d slots)",
i,bytes,bytes/sizeof(rhizome_candidates));
if (0) DEBUGF("Moving slot %d to slot 0 (%d bytes = %d slots)", i,bytes,bytes/sizeof(rhizome_candidates));
bcopy(&candidates[i],&candidates[0],bytes);
candidate_count-=i;
}
@ -544,22 +572,95 @@ void rhizome_enqueue_suggestions(struct sched_ent *alarm)
return;
}
/* Returns 0 if the fetch was queued.
* Returns 1 if the fetch was not queued because a newer version of the same bundle is already
static int rhizome_queue_import(rhizome_file_fetch_record *q)
{
int sock = -1;
FILE *file = NULL;
/* TODO Don't forget to implement resume */
/* TODO We should stream file straight into the database */
if (create_rhizome_import_dir() == -1)
goto bail;
if ((file = fopen(q->filename, "w")) == NULL) {
WHYF_perror("fopen(`%s`, \"w\")", q->filename);
goto bail;
}
if (q->peer.sin_family == AF_INET) {
/* Transfer via HTTP over IPv4 */
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
WHY_perror("socket");
goto bail;
}
if (set_nonblock(sock) == -1)
goto bail;
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &q->peer.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
if (connect(sock, (struct sockaddr*)&q->peer, sizeof q->peer) == -1) {
if (errno == EINPROGRESS) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("connect() returned EINPROGRESS");
} else {
WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(q->peer.sin_port));
goto bail;
}
}
INFOF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s",
q->peer.sin_family, buf, ntohs(q->peer.sin_port), alloca_str_toprint(q->request)
);
q->alarm.poll.fd = sock;
q->request_ofs=0;
q->state=RHIZOME_FETCH_CONNECTING;
q->file = file;
q->file_len=-1;
q->file_ofs=0;
/* Watch for activity on the socket */
q->alarm.function=rhizome_fetch_poll;
fetch_stats.name="rhizome_fetch_poll";
q->alarm.stats=&fetch_stats;
q->alarm.poll.events=POLLIN|POLLOUT;
watch(&q->alarm);
/* And schedule a timeout alarm */
q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
return 0;
} else {
/* TODO: Fetch via overlay */
WHY("Rhizome fetching via overlay not implemented");
}
bail:
if (sock != -1)
close(sock);
if (file != NULL) {
fclose(file);
unlink(q->filename);
}
return -1;
}
/* Returns QUEUED (0) if the fetch was queued.
* Returns QSAMEPAYLOAD if a fetch of the same payload (file ID) is already queued.
* Returns QDATED if the fetch was not queued because a newer version of the same bundle is already
* present.
* Returns 2 if the fetch was not queued because the queue is full.
* Returns 3 if the same fetch was already queued.
* Returns QBUSY if the fetch was not queued because the queue is full.
* Returns QSAME if the same fetch is already queued.
* Returns QOLDER if a fetch of an older version of the same bundle is already queued.
* Returns QNEWER if a fetch of a newer version of the same bundle is already queued.
* Returns QIMPORTED if a fetch was not queued because the payload is nil or already held, so the
* import was performed instead.
* Returns -1 on error.
*/
int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip, int *manifest_kept)
int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip, int *manifest_kept)
{
*manifest_kept = 0;
const char *bid = alloca_tohex_bid(m->cryptoSignPublic);
long long filesize = rhizome_manifest_get_ll(m, "filesize");
/* Do the quick rejection tests first, before the more expensive once,
like querying the database for manifests.
/* Do the quick rejection tests first, before the more expensive ones,
like querying the database for manifests.
We probably need a cache of recently rejected manifestid:versionid
pairs so that we can avoid database lookups in most cases. Probably
@ -573,39 +674,55 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
*/
if (1||(debug & DEBUG_RHIZOME_RX))
DEBUGF("Fetching manifest bid=%s version=%lld size=%lld:", bid, m->version, filesize);
DEBUGF("Fetching bundle bid=%s version=%lld size=%lld:", bid, m->version, filesize);
if (rhizome_manifest_version_cache_lookup(m)) {
/* We already have this version or newer */
// If the payload is empty, no need to fetch, so import now.
if (filesize == 0) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG(" already have that version or newer");
return 1;
DEBUGF(" manifest fetch not started -- nil payload, so importing instead");
if (rhizome_bundle_import(m, m->ttl-1) == -1)
return WHY("bundle import failed");
return QIMPORTED;
}
// Ensure there is a slot available before doing more expensive checks.
rhizome_file_fetch_record *q = rhizome_find_import_slot();
if (q == NULL) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG(" fetch not started - all slots full");
return QBUSY;
}
// If we already have this version or newer, do not fetch.
if (rhizome_manifest_version_cache_lookup(m)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG(" fetch not started -- already have that version or newer");
return QDATED;
}
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" is new");
#if 0
/* Don't queue if queue slots already full */
if (rhizome_file_fetch_queue_count >= MAX_QUEUED_FILES) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG(" all fetch queue slots full");
return 2;
}
#endif
/* Don't queue if already queued. If a fetch of an older version is already queued, then this
* logic will let it run to completion before the fetch of the newer version is queued. This
* avoids the problem of indefinite postponement of fetching if new versions are constantly being
* published faster than we can fetch them.
/* Don't fetch if already in progress. If a fetch of an older version is already in progress,
* then this logic will let it run to completion before the fetch of the newer version is queued.
* This avoids the problem of indefinite postponement of fetching if new versions are constantly
* being published faster than we can fetch them.
*/
int i;
for (i = 0; i < MAX_QUEUED_FILES; ++i) {
if ( file_fetch_queue[i].manifest
&& memcmp(m->cryptoSignPublic, file_fetch_queue[i].manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0
) {
rhizome_manifest *qm = file_fetch_queue[i].manifest;
if (qm && memcmp(m->cryptoSignPublic, qm->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) {
if (qm->version < m->version) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" manifest fetch already queued");
return 3;
DEBUGF(" fetch already in progress -- older version");
return QOLDER;
} else if (qm->version > m->version) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" fetch already in progress -- newer version");
return QNEWER;
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" fetch already in progress -- same version");
return QSAME;
}
}
}
@ -617,149 +734,106 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
str_toupper_inplace(m->fileHexHash);
m->fileHashedP = 1;
if (filesize > 0 && m->fileHexHash[0]) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" Getting ready to fetch filehash=%s for bid=%s", m->fileHexHash, bid);
// If the payload is already available, no need to fetch, so import now.
long long gotfile = 0;
if (sqlite_exec_int64(&gotfile, "SELECT COUNT(*) FROM FILES WHERE ID='%s' and datavalid=1;", m->fileHexHash) != 1)
return WHY("select failed");
if (gotfile) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" fetch not started - payload already present, so importing instead");
if (rhizome_bundle_import(m, m->ttl-1) == -1)
return WHY("bundle import failed");
return QIMPORTED;
}
long long gotfile = 0;
if (sqlite_exec_int64(&gotfile, "SELECT COUNT(*) FROM FILES WHERE ID='%s' and datavalid=1;", m->fileHexHash) != 1)
return WHY("select failed");
if (gotfile == 0) {
/* We need to get the file, unless already queued */
int i;
for (i = 0; i < MAX_QUEUED_FILES; ++i) {
if (m->manifest && strcasecmp(m->fileHexHash, file_fetch_queue[i].fileid) == 0) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Payload fetch already queued, slot %d filehash=%s", m->fileHexHash);
return 0;
}
}
if (peerip) {
/* Transfer via HTTP over IPv4 */
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1)
return WHY_perror("socket");
if (set_nonblock(sock) == -1) {
close(sock);
return -1;
}
dump("copying peerip",peerip,sizeof(*peerip));
struct sockaddr_in addr = *peerip;
addr.sin_family = AF_INET;
char buf[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, &addr.sin_addr, buf, sizeof buf) == NULL) {
buf[0] = '*';
buf[1] = '\0';
}
INFOF("RHIZOME HTTP REQUEST, CONNECT family=%u port=%u addr=%s", addr.sin_family, ntohs(addr.sin_port), buf);
if (connect(sock, (struct sockaddr*)&addr, sizeof addr) == -1) {
if (errno == EINPROGRESS) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("connect() returned EINPROGRESS");
} else {
WHY_perror("connect");
WHY("Failed to open socket to peer's rhizome web server");
close(sock);
return -1;
}
}
// Find an unused
rhizome_file_fetch_record *q=&file_fetch_queue[rhizome_file_fetch_queue_count];
q->manifest = m;
bcopy(&addr,&q->peer,sizeof(q->peer));
q->alarm.poll.fd=sock;
strncpy(q->fileid, m->fileHexHash, RHIZOME_FILEHASH_STRLEN + 1);
q->request_len = snprintf(q->request, sizeof q->request, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", q->fileid);
q->request_ofs=0;
q->state=RHIZOME_FETCH_CONNECTING;
q->file_len=-1;
q->file_ofs=0;
/* XXX Don't forget to implement resume */
/* XXX We should stream file straight into the database */
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL) {
close(sock);
return WHY("Manifest missing ID");
}
if (create_rhizome_import_dir() == -1)
return -1;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename, "file.%s", id)) {
close(sock);
return -1;
}
q->manifest->dataFileName = strdup(filename);
if ((q->file = fopen(q->manifest->dataFileName, "w")) == NULL) {
WHY_perror("fopen");
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Could not open '%s' to write received file", q->manifest->dataFileName);
close(sock);
return -1;
}
INFOF("RHIZOME HTTP REQUEST, GET \"/rhizome/file/%s\"", q->fileid);
/* Watch for activity on the socket */
q->alarm.function=rhizome_fetch_poll;
fetch_stats.name="rhizome_fetch_poll";
q->alarm.stats=&fetch_stats;
q->alarm.poll.events=POLLIN|POLLOUT;
watch(&q->alarm);
/* And schedule a timeout alarm */
q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
rhizome_file_fetch_queue_count++;
*manifest_kept = 1;
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Queued file for fetching into %s (%d in queue)",
q->manifest->dataFileName, rhizome_file_fetch_queue_count);
return 0;
} else {
/* TODO: fetch via overlay */
return WHY("Rhizome fetching via overlay not implemented");
}
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("We already have the file for this manifest; importing from manifest alone.");
rhizome_bundle_import(m, m->ttl-1);
// Fetch the file, unless already queued.
for (i = 0; i < MAX_QUEUED_FILES; ++i) {
if (file_fetch_queue[i].manifest && strcasecmp(m->fileHexHash, file_fetch_queue[i].manifest->fileHexHash) == 0) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" fetch already in progress, slot=%d filehash=%s", i, m->fileHexHash);
return QSAMEPAYLOAD;
}
}
return 0;
// Queue the fetch.
//dump("peerip", peerip, sizeof *peerip);
q->peer = *peerip;
strbuf r = strbuf_local(q->request, sizeof q->request);
strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash);
if (strbuf_overrun(r))
return WHY("request overrun");
q->request_len = strbuf_len(r);
if (!FORM_RHIZOME_IMPORT_PATH(q->filename, "payload.%s", bid))
return -1;
m->dataFileName = strdup(q->filename);
m->dataFileUnlinkOnFree = 0;
q->manifest = m;
if (rhizome_queue_import(q) == -1) {
q->filename[0] = '\0';
return -1;
}
*manifest_kept = 1;
if (debug & DEBUG_RHIZOME_RX)
DEBUGF(" started fetch into %s, slot=%d filehash=%s", q->manifest->dataFileName, q - file_fetch_queue, m->fileHexHash);
return QUEUED;
}
int rhizome_fetch_close(rhizome_file_fetch_record *q){
/* Free ephemeral data */
if (q->file)
fclose(q->file);
q->file=NULL;
if (q->manifest)
rhizome_manifest_free(q->manifest);
q->manifest=NULL;
int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length)
{
assert(peerip);
rhizome_file_fetch_record *q = rhizome_find_import_slot();
if (q == NULL)
return QBUSY;
q->peer = *peerip;
q->manifest = NULL;
strbuf r = strbuf_local(q->request, sizeof q->request);
strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length));
if (strbuf_overrun(r))
return WHY("request overrun");
q->request_len = strbuf_len(r);
if (!FORM_RHIZOME_IMPORT_PATH(q->filename, "manifest.%s", alloca_tohex(prefix, prefix_length)))
return -1;
if (rhizome_queue_import(q) == -1) {
q->filename[0] = '\0';
return -1;
}
return QUEUED;
}
int rhizome_fetch_close(rhizome_file_fetch_record *q)
{
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Release Rhizome fetch slot=%d", q - file_fetch_queue);
assert(q->state != RHIZOME_FETCH_FREE);
/* close socket and stop watching it */
unwatch(&q->alarm);
unschedule(&q->alarm);
close(q->alarm.poll.fd);
q->alarm.poll.fd=-1;
/* Reduce count of open connections */
if (rhizome_file_fetch_queue_count>0)
rhizome_file_fetch_queue_count--;
else
WHY("rhizome_file_fetch_queue_count is already zero, is a fetch record being double freed?");
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Released rhizome fetch slot (%d used)", rhizome_file_fetch_queue_count);
q->alarm.poll.fd = -1;
/* Free ephemeral data */
if (q->file)
fclose(q->file);
q->file = NULL;
if (q->manifest)
rhizome_manifest_free(q->manifest);
q->manifest = NULL;
if (q->filename[0])
unlink(q->filename);
q->filename[0] = '\0';
// Release the import queue slot.
q->state = RHIZOME_FETCH_FREE;
// Fill the slot with the next suggestion.
rhizome_enqueue_suggestions(NULL);
return 0;
}
void rhizome_fetch_write(rhizome_file_fetch_record *q){
void rhizome_fetch_write(rhizome_file_fetch_record *q)
{
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("write_nonblock(%d, %s)", q->alarm.poll.fd, alloca_toprint(-1, &q->request[q->request_ofs], q->request_len-q->request_ofs));
int bytes = write_nonblock(q->alarm.poll.fd, &q->request[q->request_ofs], q->request_len-q->request_ofs);
@ -785,54 +859,48 @@ void rhizome_fetch_write(rhizome_file_fetch_record *q){
}
}
void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes){
void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes)
{
if (bytes>(q->file_len-q->file_ofs))
bytes=q->file_len-q->file_ofs;
if (fwrite(buffer,bytes,1,q->file)!=1)
{
if (fwrite(buffer,bytes,1,q->file) != 1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, q->file_ofs);
rhizome_fetch_close(q);
return;
}
q->file_ofs+=bytes;
if (q->file_ofs>=q->file_len)
{
if (q->file_ofs>=q->file_len) {
/* got all of file */
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Received all of file via rhizome -- now to import it");
fclose(q->file);
q->file = NULL;
if (q->manifest) {
// Were fetching payload, now we have it.
rhizome_import_received_bundle(q->manifest);
rhizome_manifest_free(q->manifest);
q->manifest = NULL;
} else {
/* This was to fetch the manifest, so now fetch the file if needed */
DEBUGF("Received a manifest in response to supplying a manifest prefix.");
DEBUGF("XXX Not implemented scheduling the fetching of the file itself.");
/* Read the manifest and add it to suggestion queue, then immediately
call schedule queued items. */
rhizome_manifest *m=rhizome_new_manifest();
rhizome_manifest *m = rhizome_new_manifest();
if (m) {
if (rhizome_read_manifest_file(m,q->filename,0)==-1) {
if (rhizome_read_manifest_file(m, q->filename, 0) == -1) {
DEBUGF("Couldn't read manifest from %s",q->filename);
rhizome_manifest_free(m);
} else {
DEBUGF("All looks good for importing manifest %p",m);
DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic));
dump("q->peer",&q->peer,sizeof(q->peer));
rhizome_suggest_queue_manifest_import(m,&q->peer);
rhizome_enqueue_suggestions(NULL);
rhizome_suggest_queue_manifest_import(m, &q->peer);
//rhizome_enqueue_suggestions(NULL); XXX Now called in rhizome_fetch_close()
}
}
}
rhizome_fetch_close(q);
return;
}
// reset timeout due to activity
// reset inactivity timeout
unschedule(&q->alarm);
q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
@ -1001,90 +1069,3 @@ int unpack_http_response(char *response, struct http_response_parts *parts)
parts->content_start = p;
return 0;
}
int rhizome_fetch_request_manifest_by_prefix(struct sockaddr_in *peerip,
unsigned char *prefix,
int prefix_length,
int importP)
{
assert(peerip);
/* Transfer via HTTP over IPv4 */
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1)
return WHY_perror("socket");
if (set_nonblock(sock) == -1) {
close(sock);
return -1;
}
struct sockaddr_in addr = *peerip;
addr.sin_family = AF_INET;
INFOF("RHIZOME HTTP REQUEST, CONNECT family=%u port=%u addr=%u.%u.%u.%u",
addr.sin_family, ntohs(addr.sin_port),
((unsigned char*)&addr.sin_addr.s_addr)[0],
((unsigned char*)&addr.sin_addr.s_addr)[1],
((unsigned char*)&addr.sin_addr.s_addr)[2],
((unsigned char*)&addr.sin_addr.s_addr)[3]
);
if (connect(sock, (struct sockaddr*)&addr, sizeof addr) == -1) {
if (errno == EINPROGRESS) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("connect() returned EINPROGRESS");
} else {
WHY_perror("connect");
WHY("Failed to open socket to peer's rhizome web server");
close(sock);
return -1;
}
}
rhizome_file_fetch_record *q=&file_fetch_queue[rhizome_file_fetch_queue_count];
q->manifest = NULL;
q->alarm.poll.fd=sock;
bzero(q->fileid, sizeof(q->fileid));
q->peer=*peerip;
q->request_len = snprintf(q->request, sizeof q->request, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix,prefix_length));
q->request_ofs=0;
q->state=RHIZOME_FETCH_CONNECTING;
q->file_len=-1;
q->file_ofs=0;
if (create_rhizome_import_dir() == -1)
return -1;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename, "file.%s",
alloca_tohex(prefix,prefix_length))) {
close(sock);
return -1;
}
if (strlen(q->filename)<sizeof(q->filename))
strcpy(q->filename,filename);
else q->filename[0]=0;
if ((q->file = fopen(filename, "w")) == NULL) {
WHY_perror("fopen");
// if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Could not open '%s' to write received file", filename);
close(sock);
return -1;
}
INFOF("RHIZOME HTTP REQUEST, GET \"/rhizome/manifestbyprefix/%s\"",
alloca_tohex(prefix,prefix_length));
/* Watch for activity on the socket */
q->alarm.function=rhizome_fetch_poll;
fetch_stats.name="rhizome_fetch_poll";
q->alarm.stats=&fetch_stats;
q->alarm.poll.events=POLLIN|POLLOUT;
watch(&q->alarm);
/* And schedule a timeout alarm */
q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
rhizome_file_fetch_queue_count++;
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Queued file for fetching into %s (%d in queue)",
filename, rhizome_file_fetch_queue_count);
return 0;
}