From a9d3a1387ca83cf3b404b62058fed7ef20d70cb3 Mon Sep 17 00:00:00 2001 From: Andrew Bettison Date: Tue, 23 Oct 2012 18:10:20 +1030 Subject: [PATCH] Issue #30, rewrite Rhizome fetch queues and slots Causes SEGV in rhizomestress test. --- overlay.c | 2 +- rhizome.h | 12 +- rhizome_direct_http.c | 2 +- rhizome_fetch.c | 790 ++++++++++++++++++++++-------------------- serval.h | 2 +- 5 files changed, 421 insertions(+), 387 deletions(-) diff --git a/overlay.c b/overlay.c index bc6d1330..a5f7bdf1 100644 --- a/overlay.c +++ b/overlay.c @@ -168,7 +168,7 @@ schedule(&_sched_##X); } /* Pick next rhizome files to grab every few seconds from the priority list continuously being built from observed bundle announcements */ - SCHEDULE(rhizome_enqueue_suggestions, rhizome_fetch_interval_ms, rhizome_fetch_interval_ms*3); + SCHEDULE(rhizome_start_next_queued_fetches, rhizome_fetch_interval_ms, rhizome_fetch_interval_ms*3); /* Periodically check for new interfaces */ SCHEDULE(overlay_interface_discover, 1, 100); diff --git a/rhizome.h b/rhizome.h index e8d823f5..96a88c27 100644 --- a/rhizome.h +++ b/rhizome.h @@ -284,7 +284,6 @@ int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found, int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar); long long rhizome_bar_version(unsigned char *bar); unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar); -int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip, int *manifest_kept); int rhizome_list_manifests(const char *service, const char *sender_sid, const char *recipient_sid, int limit, int offset); int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest **mp); int rhizome_retrieve_file(const char *fileid, const char *filepath, @@ -325,10 +324,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk, int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk, const unsigned char *pk); int rhizome_find_bundle_author(rhizome_manifest *m); -int rhizome_queue_ignore_manifest(rhizome_manifest *m, - struct sockaddr_in *peerip,int timeout); -int rhizome_ignore_manifest_check(rhizome_manifest *m, - struct sockaddr_in *peerip); +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout); +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip); /* one manifest is required per candidate, plus a few spare. so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES. @@ -336,8 +333,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, #define MAX_RHIZOME_MANIFESTS 24 #define MAX_CANDIDATES 16 -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, - struct sockaddr_in *peerip); +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip); typedef struct rhizome_http_request { struct sched_ent alarm; @@ -543,7 +539,7 @@ extern int favicon_len; int rhizome_import_from_files(const char *manifestpath,const char *filepath); int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length); -int rhizome_count_queued_imports(); +int rhizome_any_fetch_active(); struct http_response_parts { int code; diff --git a/rhizome_direct_http.c b/rhizome_direct_http.c index 9a5380e6..584b08db 100644 --- a/rhizome_direct_http.c +++ b/rhizome_direct_http.c @@ -881,7 +881,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r) /* Fetching the manifest, and then using it to see if we want to fetch the file for import is all handled asynchronously, so just wait for it to finish. */ - while (rhizome_count_queued_imports()) + while (rhizome_any_fetch_active()) fd_poll(); } diff --git a/rhizome_fetch.c b/rhizome_fetch.c index a2cef40a..c3a05051 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -24,72 +24,153 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include "str.h" -extern int sigPipeFlag; -extern int sigIoFlag; - - -typedef struct rhizome_file_fetch_record { - struct sched_ent alarm; +/* Represents a queued fetch of a bundle payload, for which the manifest is already known. + */ +struct rhizome_fetch_candidate { rhizome_manifest *manifest; - FILE *file; - char filename[1024]; - char request[1024]; - int request_len; - int request_ofs; - long long file_len; - long long file_ofs; + struct sockaddr_in peer; + int priority; +}; + +/* Represents an active fetch (in progress) of a bundle payload (.manifest != NULL) or of a bundle + * manifest (.manifest == NULL). + */ +struct rhizome_fetch_slot { + struct sched_ent alarm; // must be first element in struct + rhizome_manifest *manifest; + struct sockaddr_in peer; int state; #define RHIZOME_FETCH_FREE 0 #define RHIZOME_FETCH_CONNECTING 1 #define RHIZOME_FETCH_SENDINGHTTPREQUEST 2 #define RHIZOME_FETCH_RXHTTPHEADERS 3 #define RHIZOME_FETCH_RXFILE 4 - struct sockaddr_in peer; -} rhizome_file_fetch_record; + FILE *file; + char filename[1024]; + char request[1024]; + int request_len; + int request_ofs; + int64_t file_len; + int64_t file_ofs; +}; -/* List of queued transfers */ -#define MAX_QUEUED_FILES 4 -rhizome_file_fetch_record file_fetch_queue[MAX_QUEUED_FILES]; +/* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size + * is less than a given threshold. + */ +struct rhizome_fetch_queue { + struct rhizome_fetch_slot active; // must be first element in struct + int candidate_queue_size; + struct rhizome_fetch_candidate *candidate_queue; + long long size_threshold; // will only hold fetches of fewer than this many bytes +}; -#define QUEUED (0) -#define QSAMEPAYLOAD (1) -#define QDATED (2) -#define QBUSY (3) -#define QSAME (4) -#define QOLDER (5) -#define QNEWER (6) -#define QIMPORTED (7) +struct rhizome_fetch_candidate queue0[5]; +struct rhizome_fetch_candidate queue1[4]; +struct rhizome_fetch_candidate queue2[3]; +struct rhizome_fetch_candidate queue3[2]; +struct rhizome_fetch_candidate queue4[1]; -int rhizome_count_queued_imports() -{ - int count = 0; - int i; - for (i = 0; i < MAX_QUEUED_FILES; ++i) - if (file_fetch_queue[i].state != RHIZOME_FETCH_FREE) - ++count; - return count; -} +#define NELS(a) (sizeof (a) / sizeof *(a)) -static rhizome_file_fetch_record *rhizome_find_import_slot() -{ - // Find a free fetch queue slot. - int i; - for (i = 0; i < MAX_QUEUED_FILES; ++i) - if (file_fetch_queue[i].state == RHIZOME_FETCH_FREE) - return &file_fetch_queue[i]; - return NULL; -} +struct rhizome_fetch_queue rhizome_fetch_queues[] = { + // Must be in order of ascending size_threshold. + { .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .size_threshold = 10000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .size_threshold = 100000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .size_threshold = 1000000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .size_threshold = 10000000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .size_threshold = -1, .active = { .state = RHIZOME_FETCH_FREE } } +}; + +#define NQUEUES NELS(rhizome_fetch_queues) struct profile_total fetch_stats; -/* +/* Find a queue suitable for a fetch of the given number of bytes. If there is no suitable queue, + * return NULL. + * + * @author Andrew Bettison + */ +static struct rhizome_fetch_queue *rhizome_find_queue(long long size) +{ + int i; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; + if (q->size_threshold < 0 || size < q->size_threshold) + return q; + } + return NULL; +} + +/* Find a free fetch slot suitable for fetching the given number of bytes. This could be a slot in + * any queue that would accept the candidate, ie, with a larger size threshold. Returns NULL if + * there is no suitable free slot. + * + * @author Andrew Bettison + */ +static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size) +{ + int i; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; + if ((q->size_threshold < 0 || size < q->size_threshold) && q->active.state == RHIZOME_FETCH_FREE) + return &q->active; + } + return NULL; +} + +static struct rhizome_fetch_candidate *rhizome_fetch_insert(struct rhizome_fetch_queue *q, int i) +{ + assert(i >= 0 && i < q->candidate_queue_size); + struct rhizome_fetch_candidate *c = &q->candidate_queue[i]; + struct rhizome_fetch_candidate *e = &q->candidate_queue[q->candidate_queue_size - 1]; + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("insert queue[%d] candidate[%d]", q - rhizome_fetch_queues, i); + if (e->manifest) + rhizome_manifest_free(e->manifest); + for (; e > c; --e) + e[0] = e[-1]; + assert(e == c); + c->manifest = NULL; + return c; +} + +static void rhizome_fetch_unqueue(struct rhizome_fetch_queue *q, int i) +{ + assert(i >= 0 && i < q->candidate_queue_size); + struct rhizome_fetch_candidate *c = &q->candidate_queue[i]; + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("unqueue queue[%d] candidate[%d] manifest=%p", q - rhizome_fetch_queues, i, c->manifest); + if (c->manifest) { + rhizome_manifest_free(c->manifest); + c->manifest = NULL; + } + struct rhizome_fetch_candidate *e = &q->candidate_queue[q->candidate_queue_size - 1]; + for (; c < e && c[1].manifest; ++c) + c[0] = c[1]; + e->manifest = NULL; +} + +/* Return true if there are any active fetches currently in progress. + * + * @author Andrew Bettison + */ +int rhizome_any_fetch_active() +{ + int i; + for (i = 0; i < NQUEUES; ++i) + if (rhizome_fetch_queues[i].active.state != RHIZOME_FETCH_FREE) + return 1; + return 0; +} + +/* Queue a manifest for importing. There are three main cases that can occur here: - 1. The manifest has no associated file (filesize=0); - 2. The associated file is already in our database; or - 3. The associated file is not already in our database, and so we need + 1. The manifest has a nil payload (filesize=0); + 2. The associated payload is already in our database; or + 3. The associated payload is not already in our database, and so we need to fetch it before we can import it. Cases (1) and (2) are more or less identical, and all we need to do is to @@ -98,11 +179,11 @@ struct profile_total fetch_stats; Case (3) requires that we fetch the associated file. This is where life gets interesting. - + First, we need to make sure that we can free up enough space in the database for the file. - Second, we need to work out how we are going to get the file. + Second, we need to work out how we are going to get the file. If we are on an IPv4 wifi network, then HTTP is probably the way to go. If we are not on an IPv4 wifi network, then HTTP is not an option, and we need to use a Rhizome/Overlay protocol to fetch it. It might even be HTTP over MDP @@ -118,17 +199,28 @@ struct profile_total fetch_stats; advertisement. */ +static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip); +#define STARTED (0) +#define SLOTBUSY (1) +#define SAMEBUNDLE (2) +#define SAMEPAYLOAD (3) +#define SUPERSEDED (4) +#define OLDERBUNDLE (5) +#define NEWERBUNDLE (6) +#define IMPORTED (7) + /* As defined below uses 64KB */ #define RHIZOME_VERSION_CACHE_NYBLS 2 /* 256=2^8=2nybls */ #define RHIZOME_VERSION_CACHE_SHIFT 1 #define RHIZOME_VERSION_CACHE_SIZE 128 #define RHIZOME_VERSION_CACHE_ASSOCIATIVITY 16 -typedef struct rhizome_manifest_version_cache_slot { + +struct rhizome_manifest_version_cache_slot { unsigned char idprefix[24]; long long version; -} rhizome_manifest_version_cache_slot; -rhizome_manifest_version_cache_slot rhizome_manifest_version_cache -[RHIZOME_VERSION_CACHE_SIZE][RHIZOME_VERSION_CACHE_ASSOCIATIVITY]; +}; + +struct rhizome_manifest_version_cache_slot rhizome_manifest_version_cache[RHIZOME_VERSION_CACHE_SIZE][RHIZOME_VERSION_CACHE_ASSOCIATIVITY]; int rhizome_manifest_version_cache_store(rhizome_manifest *m) { @@ -148,7 +240,7 @@ int rhizome_manifest_version_cache_store(rhizome_manifest *m) bin=bin>>RHIZOME_VERSION_CACHE_SHIFT; slot=random()%RHIZOME_VERSION_CACHE_ASSOCIATIVITY; - rhizome_manifest_version_cache_slot *entry + struct rhizome_manifest_version_cache_slot *entry =&rhizome_manifest_version_cache[bin][slot]; unsigned long long manifest_version = rhizome_manifest_get_ll(m,"version"); @@ -195,7 +287,7 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) for(slot=0;slotversion=stored_version; for(i=0;i<24;i++) { @@ -315,8 +406,7 @@ typedef struct ignored_manifest_cache { a collision is exceedingly remote */ ignored_manifest_cache ignored; -int rhizome_ignore_manifest_check(rhizome_manifest *m, - struct sockaddr_in *peerip) +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip) { int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS); int slot; @@ -335,8 +425,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, return 0; } -int rhizome_queue_ignore_manifest(rhizome_manifest *m, - struct sockaddr_in *peerip,int timeout) +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout) { /* The supplied manifest from a given IP has errors, so remember that it isn't worth considering */ @@ -362,53 +451,7 @@ int rhizome_queue_ignore_manifest(rhizome_manifest *m, } -typedef struct rhizome_candidates { - rhizome_manifest *manifest; - struct sockaddr_in peer; - long long size; - /* XXX Need group memberships/priority level here */ - int priority; -} rhizome_candidates; - -rhizome_candidates candidates[MAX_CANDIDATES]; -int candidate_count=0; - -/* sort indicated candidate from starting position down - (or up) */ -int rhizome_position_candidate(int position) -{ - while(position=0) { - rhizome_candidates *c1=&candidates[position]; - rhizome_candidates *c2=&candidates[position+1]; - if (c1->priority>c2->priority - ||(c1->priority==c2->priority - &&c1->size>c2->size)) - { - rhizome_candidates c=*c1; - *c1=*c2; - *c2=c; - position++; - } - else { - /* doesn't need moving down, but does it need moving up? */ - if (!position) return 0; - rhizome_candidates *c0=&candidates[position-1]; - if (c1->prioritypriority - ||(c1->priority==c0->priority - &&c1->sizesize)) - { - rhizome_candidates c=*c1; - *c1=*c2; - *c2=c; - position--; - } - else return 0; - } - } - return 0; -} - -void rhizome_import_received_bundle(struct rhizome_manifest *m) +static int rhizome_import_received_bundle(struct rhizome_manifest *m) { m->finalised = 1; m->manifest_bytes = m->manifest_all_bytes; // store the signatures too @@ -416,19 +459,27 @@ void rhizome_import_received_bundle(struct rhizome_manifest *m) DEBUGF("manifest len=%d has %d signatories", m->manifest_bytes, m->sig_count); dump("manifest", m->manifestdata, m->manifest_all_bytes); } - rhizome_bundle_import(m, m->ttl - 1 /* TTL */); + return rhizome_bundle_import(m, m->ttl - 1 /* TTL */); } -/* Verifies manifests as late as possible to avoid wasting time. */ -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip) +/* Queue a fetch for the payload of the given manifest. If 'peerip' is not NULL, then it is used as + * the port and IP address of an HTTP server from which the fetch is performed. Otherwise the fetch + * is performed over MDP. + * + * If the fetch cannot be queued for any reason (error, queue full, no suitable queue) then the + * manifest is freed and returns -1. Otherwise, the pointer to the manifest is stored in the queue + * entry and the manifest is freed when the fetch has completed or is abandoned for any reason. + * + * Verifies manifests as late as possible to avoid wasting time. + */ +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip) { IN(); - /* must free manifest when done with it */ - char *id = rhizome_manifest_get(m, "id", NULL, 0); + const char *bid = alloca_tohex_bid(m->cryptoSignPublic); int priority=100; /* normal priority */ if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Considering manifest import bid=%s version=%lld size=%lld priority=%d:", id, m->version, m->fileLength, priority); + DEBUGF("Considering manifest import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority); if (rhizome_manifest_version_cache_lookup(m)) { if (debug & DEBUG_RHIZOME_RX) @@ -439,7 +490,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_i if (debug & DEBUG_RHIZOME_RX) { long long stored_version; - if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'",id) > 0) + if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'", bid) > 0) DEBUGF(" is new (have version %lld)", stored_version); } @@ -455,124 +506,112 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_i RETURN(0); } - /* work out where to put it in the list */ - int i; - for(i=0;i= m->version) { - /* this version is older than the one in the list, so don't list this one */ + // Find the proper queue for the payload. If there is none suitable, it is an error. + struct rhizome_fetch_queue *qi = rhizome_find_queue(m->fileLength); + if (!qi) { + WHYF("No suitable fetch queue for bundle size=%lld", m->fileLength); + rhizome_manifest_free(m); + RETURN(-1); + } + + // Search all the queues for the same manifest (it could be in any queue because its payload size + // may have changed between versions.) If a newer or the same version is already queued, then + // ignore this one. Otherwise, unqueue all older candidates. + int ci = -1; + int i, j; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; + for (j = 0; j < q->candidate_queue_size; ) { + struct rhizome_fetch_candidate *c = &q->candidate_queue[j]; + if (c->manifest) { + if (memcmp(m->cryptoSignPublic, c->manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) { + if (c->manifest->version >= m->version) { rhizome_manifest_free(m); RETURN(0); - } else { - /* replace listed version with this newer version */ - if (rhizome_manifest_verify(m)) { - WHY("Error verifying manifest when considering queuing for import"); - /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m,peerip,60000); - rhizome_manifest_free(m); - RETURN(-1); - } - - rhizome_manifest_free(candidates[i].manifest); - candidates[i].manifest=m; - /* update position in list */ - rhizome_position_candidate(i); - RETURN(0); } + if (!m->selfSigned && rhizome_manifest_verify(m)) { + WHY("Error verifying manifest when considering queuing for import"); + /* Don't waste time looking at this manifest again for a while */ + rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_manifest_free(m); + RETURN(-1); + } + rhizome_fetch_unqueue(q, j); + } else { + if (ci == -1 && q == qi && c->priority < priority) + ci = j; + ++j; } - - /* if we have a higher priority file than the one at this - point in the list, stop, and we will shuffle the rest of - the list down. */ - if (candidates[i].priority>priority - ||(candidates[i].priority==priority - &&candidates[i].size>m->fileLength)) + } else { + if (ci == -1 && q == qi) + ci = j; break; + } } - if (i>=MAX_CANDIDATES) { - /* our list is already full of higher-priority items */ + } + // No duplicate was found, so if no free queue place was found either then bail out. + if (ci == -1) { rhizome_manifest_free(m); - RETURN(-1); + RETURN(1); } - if (rhizome_manifest_verify(m)) { + if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m,peerip,60000); + rhizome_queue_ignore_manifest(m, peerip, 60000); rhizome_manifest_free(m); RETURN(-1); } - if (candidate_count==MAX_CANDIDATES) { - /* release manifest structure for whoever we are bumping from the list */ - rhizome_manifest_free(candidates[MAX_CANDIDATES-1].manifest); - candidates[MAX_CANDIDATES-1].manifest=NULL; - } else candidate_count++; - /* shuffle down */ - int bytes=(candidate_count-(i+1))*sizeof(rhizome_candidates); - if (0) DEBUGF("Moving slot %d to slot %d (%d bytes = %d slots)", - i,i+1,bytes,bytes/sizeof(rhizome_candidates)); - bcopy(&candidates[i], - &candidates[i+1], - bytes); - /* put new candidate in */ - candidates[i].manifest=m; - candidates[i].size=m->fileLength; - candidates[i].priority=priority; - candidates[i].peer=*peerip; + struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, j); + c->manifest = m; + c->peer = *peerip; + c->priority = priority; - int j; - if (1) { - DEBUG("Rhizome priorities fetch list now:"); - for(j=0;jcandidate_queue_size; ++j) { + struct rhizome_fetch_candidate *c = &q->candidate_queue[j]; + if (!c->manifest) + break; + DEBUGF("%d:%d manifest=%p bid=%s priority=%d size=%lld", i, j, + c->manifest, + alloca_tohex_bid(c->manifest->cryptoSignPublic), + c->priority, + (long long) c->manifest->fileLength + ); + } + } } RETURN(0); } -void rhizome_enqueue_suggestions(struct sched_ent *alarm) +static void rhizome_start_next_queued_fetch(struct rhizome_fetch_queue *q) { - int i; - for (i = 0; i < candidate_count; ++i) { - int manifest_kept = 0; - int result = rhizome_queue_manifest_import(candidates[i].manifest, &candidates[i].peer, &manifest_kept); - if (result == QBUSY) - break; - if (!manifest_kept) { - rhizome_manifest_free(candidates[i].manifest); - candidates[i].manifest = NULL; - } + struct rhizome_fetch_candidate *c = &q->candidate_queue[0]; + int result; + while (c->manifest && (result = rhizome_fetch(c->manifest, &c->peer)) != SLOTBUSY) { + if (result == STARTED) + c->manifest = NULL; + rhizome_fetch_unqueue(q, 0); } - if (i) { - /* now shuffle up */ - int bytes=(candidate_count-i)*sizeof(rhizome_candidates); - if (0) DEBUGF("Moving slot %d to slot 0 (%d bytes = %d slots)", i,bytes,bytes/sizeof(rhizome_candidates)); - bcopy(&candidates[i],&candidates[0],bytes); - candidate_count-=i; - } - if (alarm) { - alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms; - alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms*3; - schedule(alarm); - } - return; } -static int rhizome_queue_import(rhizome_file_fetch_record *q) +void rhizome_start_next_queued_fetches(struct sched_ent *alarm) +{ + int i; + for (i = 0; i < NQUEUES; ++i) + rhizome_start_next_queued_fetch(&rhizome_fetch_queues[i]); + alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms; + alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms*3; + schedule(alarm); +} + +static int rhizome_start_fetch(struct rhizome_fetch_slot *slot) { int sock = -1; FILE *file = NULL; @@ -580,11 +619,11 @@ static int rhizome_queue_import(rhizome_file_fetch_record *q) /* TODO We should stream file straight into the database */ if (create_rhizome_import_dir() == -1) goto bail; - if ((file = fopen(q->filename, "w")) == NULL) { - WHYF_perror("fopen(`%s`, \"w\")", q->filename); + if ((file = fopen(slot->filename, "w")) == NULL) { + WHYF_perror("fopen(`%s`, \"w\")", slot->filename); goto bail; } - if (q->peer.sin_family == AF_INET) { + if (slot->peer.sin_family == AF_INET) { /* Transfer via HTTP over IPv4 */ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { WHY_perror("socket"); @@ -593,38 +632,38 @@ static int rhizome_queue_import(rhizome_file_fetch_record *q) if (set_nonblock(sock) == -1) goto bail; char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &q->peer.sin_addr, buf, sizeof buf) == NULL) { + if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) { buf[0] = '*'; buf[1] = '\0'; } - if (connect(sock, (struct sockaddr*)&q->peer, sizeof q->peer) == -1) { + if (connect(sock, (struct sockaddr*)&slot->peer, sizeof slot->peer) == -1) { if (errno == EINPROGRESS) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("connect() returned EINPROGRESS"); } else { - WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(q->peer.sin_port)); + WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(slot->peer.sin_port)); goto bail; } } INFOF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s", - q->peer.sin_family, buf, ntohs(q->peer.sin_port), alloca_str_toprint(q->request) + slot->peer.sin_family, buf, ntohs(slot->peer.sin_port), alloca_str_toprint(slot->request) ); - q->alarm.poll.fd = sock; - q->request_ofs=0; - q->state=RHIZOME_FETCH_CONNECTING; - q->file = file; - q->file_len=-1; - q->file_ofs=0; + slot->alarm.poll.fd = sock; + slot->request_ofs=0; + slot->state=RHIZOME_FETCH_CONNECTING; + slot->file = file; + slot->file_len=-1; + slot->file_ofs=0; /* Watch for activity on the socket */ - q->alarm.function=rhizome_fetch_poll; + slot->alarm.function=rhizome_fetch_poll; fetch_stats.name="rhizome_fetch_poll"; - q->alarm.stats=&fetch_stats; - q->alarm.poll.events=POLLIN|POLLOUT; - watch(&q->alarm); + slot->alarm.stats=&fetch_stats; + slot->alarm.poll.events=POLLIN|POLLOUT; + watch(&slot->alarm); /* And schedule a timeout alarm */ - q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); + slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); return 0; } else { /* TODO: Fetch via overlay */ @@ -635,29 +674,28 @@ bail: close(sock); if (file != NULL) { fclose(file); - unlink(q->filename); + unlink(slot->filename); } return -1; } -/* Returns QUEUED (0) if the fetch was queued. - * Returns QSAMEPAYLOAD if a fetch of the same payload (file ID) is already queued. - * Returns QDATED if the fetch was not queued because a newer version of the same bundle is already - * present. - * Returns QBUSY if the fetch was not queued because the queue is full. - * Returns QSAME if the same fetch is already queued. - * Returns QOLDER if a fetch of an older version of the same bundle is already queued. - * Returns QNEWER if a fetch of a newer version of the same bundle is already queued. - * Returns QIMPORTED if a fetch was not queued because the payload is nil or already held, so the - * import was performed instead. +/* Returns STARTED (0) if the fetch was started (the caller should not free the manifest in this + * case, because the fetch slot now has a copy of the pointer, and the manifest will be freed once + * the fetch finishes or is terminated). + * Returns SAMEPAYLOAD if a fetch of the same payload (file ID) is already active. + * Returns SUPERSEDED if the fetch was not started because a newer version of the same bundle is + * already present. + * Returns SLOTBUSY if the fetch was not queued because no slots are available. + * Returns SAMEBUNDLE if a fetch of the same bundle is already active. + * Returns OLDERBUNDLE if a fetch of an older version of the same bundle is already active. + * Returns NEWERBUNDLE if a fetch of a newer version of the same bundle is already active. + * Returns IMPORTED if a fetch was not started because the payload is nil or already in the + * Rhizome store, so the import was performed instead. * Returns -1 on error. */ -int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip, int *manifest_kept) +static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip) { - *manifest_kept = 0; - const char *bid = alloca_tohex_bid(m->cryptoSignPublic); - long long filesize = rhizome_manifest_get_ll(m, "filesize"); /* Do the quick rejection tests first, before the more expensive ones, like querying the database for manifests. @@ -668,36 +706,36 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in collission attacks, but using 128bits or the full 256 bits would be safer. Let's make the cache use 256 bit (32byte) entries for power of two efficiency, and so use the last 64bits for version id, thus using 192 bits - for collission avoidance --- probably sufficient for many years yet (from + for collision avoidance --- probably sufficient for many years yet (from time of writing in 2012). We get a little more than 192 bits by using the cache slot number to implicitly store the first bits. */ if (1||(debug & DEBUG_RHIZOME_RX)) - DEBUGF("Fetching bundle bid=%s version=%lld size=%lld:", bid, m->version, filesize); + DEBUGF("Fetching bundle bid=%s version=%lld size=%lld peerip=%s", bid, m->version, m->fileLength, alloca_sockaddr(peerip)); // If the payload is empty, no need to fetch, so import now. - if (filesize == 0) { + if (m->fileLength == 0) { if (debug & DEBUG_RHIZOME_RX) DEBUGF(" manifest fetch not started -- nil payload, so importing instead"); - if (rhizome_bundle_import(m, m->ttl-1) == -1) + if (rhizome_import_received_bundle(m) == -1) return WHY("bundle import failed"); - return QIMPORTED; + return IMPORTED; } // Ensure there is a slot available before doing more expensive checks. - rhizome_file_fetch_record *q = rhizome_find_import_slot(); - if (q == NULL) { + struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(m->fileLength); + if (slot == NULL) { if (debug & DEBUG_RHIZOME_RX) DEBUG(" fetch not started - all slots full"); - return QBUSY; + return SLOTBUSY; } // If we already have this version or newer, do not fetch. if (rhizome_manifest_version_cache_lookup(m)) { if (debug & DEBUG_RHIZOME_RX) DEBUG(" fetch not started -- already have that version or newer"); - return QDATED; + return SUPERSEDED; } if (debug & DEBUG_RHIZOME_RX) DEBUGF(" is new"); @@ -708,31 +746,28 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in * being published faster than we can fetch them. */ int i; - for (i = 0; i < MAX_QUEUED_FILES; ++i) { - rhizome_manifest *qm = file_fetch_queue[i].manifest; - if (qm && memcmp(m->cryptoSignPublic, qm->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) { - if (qm->version < m->version) { + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_slot *as = &rhizome_fetch_queues[i].active; + const rhizome_manifest *am = as->manifest; + if (as->state != RHIZOME_FETCH_FREE && memcmp(m->cryptoSignPublic, am->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) { + if (am->version < m->version) { if (debug & DEBUG_RHIZOME_RX) DEBUGF(" fetch already in progress -- older version"); - return QOLDER; - } else if (qm->version > m->version) { + return OLDERBUNDLE; + } else if (am->version > m->version) { if (debug & DEBUG_RHIZOME_RX) DEBUGF(" fetch already in progress -- newer version"); - return QNEWER; + return NEWERBUNDLE; } else { if (debug & DEBUG_RHIZOME_RX) DEBUGF(" fetch already in progress -- same version"); - return QSAME; + return SAMEBUNDLE; } } } - if (!rhizome_manifest_get(m, "filehash", m->fileHexHash, sizeof m->fileHexHash)) + if (!m->fileHashedP) return WHY("Manifest missing filehash"); - if (!rhizome_str_is_file_hash(m->fileHexHash)) - return WHYF("Invalid file hash: %s", m->fileHexHash); - str_toupper_inplace(m->fileHexHash); - m->fileHashedP = 1; // If the payload is already available, no need to fetch, so import now. long long gotfile = 0; @@ -743,142 +778,146 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in DEBUGF(" fetch not started - payload already present, so importing instead"); if (rhizome_bundle_import(m, m->ttl-1) == -1) return WHY("bundle import failed"); - return QIMPORTED; + return IMPORTED; } // Fetch the file, unless already queued. - for (i = 0; i < MAX_QUEUED_FILES; ++i) { - if (file_fetch_queue[i].manifest && strcasecmp(m->fileHexHash, file_fetch_queue[i].manifest->fileHexHash) == 0) { + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_slot *as = &rhizome_fetch_queues[i].active; + const rhizome_manifest *am = as->manifest; + if (as->state != RHIZOME_FETCH_FREE && strcasecmp(m->fileHexHash, am->fileHexHash) == 0) { if (debug & DEBUG_RHIZOME_RX) DEBUGF(" fetch already in progress, slot=%d filehash=%s", i, m->fileHexHash); - return QSAMEPAYLOAD; + return SAMEPAYLOAD; } } - // Queue the fetch. + // Start the fetch. //dump("peerip", peerip, sizeof *peerip); - q->peer = *peerip; - strbuf r = strbuf_local(q->request, sizeof q->request); + slot->peer = *peerip; + strbuf r = strbuf_local(slot->request, sizeof slot->request); strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash); if (strbuf_overrun(r)) return WHY("request overrun"); - q->request_len = strbuf_len(r); - if (!FORM_RHIZOME_IMPORT_PATH(q->filename, "payload.%s", bid)) + slot->request_len = strbuf_len(r); + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid)) return -1; - m->dataFileName = strdup(q->filename); + m->dataFileName = strdup(slot->filename); m->dataFileUnlinkOnFree = 0; - q->manifest = m; - if (rhizome_queue_import(q) == -1) { - q->filename[0] = '\0'; + slot->manifest = m; + if (rhizome_start_fetch(slot) == -1) { + slot->filename[0] = '\0'; return -1; } - *manifest_kept = 1; if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" started fetch into %s, slot=%d filehash=%s", q->manifest->dataFileName, q - file_fetch_queue, m->fileHexHash); - return QUEUED; + DEBUGF(" started fetch into %s, slot=%d filehash=%s", slot->manifest->dataFileName, slot - &rhizome_fetch_queues[0].active, m->fileHexHash); + return STARTED; } int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length) { assert(peerip); - rhizome_file_fetch_record *q = rhizome_find_import_slot(); - if (q == NULL) - return QBUSY; - q->peer = *peerip; - q->manifest = NULL; - strbuf r = strbuf_local(q->request, sizeof q->request); + struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES); + if (slot == NULL) + return SLOTBUSY; + slot->peer = *peerip; + slot->manifest = NULL; + strbuf r = strbuf_local(slot->request, sizeof slot->request); strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length)); if (strbuf_overrun(r)) return WHY("request overrun"); - q->request_len = strbuf_len(r); - if (!FORM_RHIZOME_IMPORT_PATH(q->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) + slot->request_len = strbuf_len(r); + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) return -1; - if (rhizome_queue_import(q) == -1) { - q->filename[0] = '\0'; + if (rhizome_start_fetch(slot) == -1) { + slot->filename[0] = '\0'; return -1; } - return QUEUED; + return STARTED; } -int rhizome_fetch_close(rhizome_file_fetch_record *q) +static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Release Rhizome fetch slot=%d", q - file_fetch_queue); - assert(q->state != RHIZOME_FETCH_FREE); + DEBUGF("Close Rhizome fetch slot=%d", slot - &rhizome_fetch_queues[0].active); + assert(slot->state != RHIZOME_FETCH_FREE); /* close socket and stop watching it */ - unwatch(&q->alarm); - unschedule(&q->alarm); - close(q->alarm.poll.fd); - q->alarm.poll.fd = -1; + unwatch(&slot->alarm); + unschedule(&slot->alarm); + close(slot->alarm.poll.fd); + slot->alarm.poll.fd = -1; /* Free ephemeral data */ - if (q->file) - fclose(q->file); - q->file = NULL; - if (q->manifest) - rhizome_manifest_free(q->manifest); - q->manifest = NULL; - if (q->filename[0]) - unlink(q->filename); - q->filename[0] = '\0'; + if (slot->file) + fclose(slot->file); + slot->file = NULL; + if (slot->manifest) + rhizome_manifest_free(slot->manifest); + slot->manifest = NULL; + if (slot->filename[0]) + unlink(slot->filename); + slot->filename[0] = '\0'; - // Release the import queue slot. - q->state = RHIZOME_FETCH_FREE; + // Release the fetch slot. + slot->state = RHIZOME_FETCH_FREE; - // Fill the slot with the next suggestion. - rhizome_enqueue_suggestions(NULL); + // Activate the next queued fetch that is eligible for this slot. Try starting candidates from + // all queues with the same or smaller size thresholds until the slot is taken. + struct rhizome_fetch_queue *q; + for (q = (struct rhizome_fetch_queue *) slot; slot->state == RHIZOME_FETCH_FREE && q >= rhizome_fetch_queues; --q) + rhizome_start_next_queued_fetch(q); return 0; } -void rhizome_fetch_write(rhizome_file_fetch_record *q) +void rhizome_fetch_write(struct rhizome_fetch_slot *slot) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("write_nonblock(%d, %s)", q->alarm.poll.fd, alloca_toprint(-1, &q->request[q->request_ofs], q->request_len-q->request_ofs)); - int bytes = write_nonblock(q->alarm.poll.fd, &q->request[q->request_ofs], q->request_len-q->request_ofs); + DEBUGF("write_nonblock(%d, %s)", slot->alarm.poll.fd, alloca_toprint(-1, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs)); + int bytes = write_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs); if (bytes == -1) { WHY("Got error while sending HTTP request. Closing."); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); } else { // reset timeout - unschedule(&q->alarm); - q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); - q->request_ofs+=bytes; - if (q->request_ofs>=q->request_len) { + unschedule(&slot->alarm); + slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + slot->request_ofs+=bytes; + if (slot->request_ofs>=slot->request_len) { /* Sent all of request. Switch to listening for HTTP response headers. */ - q->request_len=0; q->request_ofs=0; - q->state=RHIZOME_FETCH_RXHTTPHEADERS; - q->alarm.poll.events=POLLIN; - watch(&q->alarm); - }else if(q->state==RHIZOME_FETCH_CONNECTING) - q->state = RHIZOME_FETCH_SENDINGHTTPREQUEST; + slot->request_len=0; slot->request_ofs=0; + slot->state=RHIZOME_FETCH_RXHTTPHEADERS; + slot->alarm.poll.events=POLLIN; + watch(&slot->alarm); + }else if(slot->state==RHIZOME_FETCH_CONNECTING) + slot->state = RHIZOME_FETCH_SENDINGHTTPREQUEST; } } -void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes) +void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) { - if (bytes>(q->file_len-q->file_ofs)) - bytes=q->file_len-q->file_ofs; - if (fwrite(buffer,bytes,1,q->file) != 1) { + if (bytes>(slot->file_len-slot->file_ofs)) + bytes=slot->file_len-slot->file_ofs; + if (fwrite(buffer,bytes,1,slot->file) != 1) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, q->file_ofs); - rhizome_fetch_close(q); + DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs); + rhizome_fetch_close(slot); return; } - q->file_ofs+=bytes; - if (q->file_ofs>=q->file_len) { + slot->file_ofs+=bytes; + if (slot->file_ofs>=slot->file_len) { /* got all of file */ if (debug & DEBUG_RHIZOME_RX) DEBUGF("Received all of file via rhizome -- now to import it"); - fclose(q->file); - q->file = NULL; - if (q->manifest) { + fclose(slot->file); + slot->file = NULL; + if (slot->manifest) { // Were fetching payload, now we have it. - rhizome_import_received_bundle(q->manifest); + rhizome_import_received_bundle(slot->manifest); } else { /* This was to fetch the manifest, so now fetch the file if needed */ DEBUGF("Received a manifest in response to supplying a manifest prefix."); @@ -886,56 +925,55 @@ void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes call schedule queued items. */ rhizome_manifest *m = rhizome_new_manifest(); if (m) { - if (rhizome_read_manifest_file(m, q->filename, 0) == -1) { - DEBUGF("Couldn't read manifest from %s",q->filename); + if (rhizome_read_manifest_file(m, slot->filename, 0) == -1) { + DEBUGF("Couldn't read manifest from %s",slot->filename); rhizome_manifest_free(m); } else { DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic)); - dump("q->peer",&q->peer,sizeof(q->peer)); - rhizome_suggest_queue_manifest_import(m, &q->peer); - //rhizome_enqueue_suggestions(NULL); XXX Now called in rhizome_fetch_close() + dump("slot->peer",&slot->peer,sizeof(slot->peer)); + rhizome_suggest_queue_manifest_import(m, &slot->peer); } } } - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } // reset inactivity timeout - unschedule(&q->alarm); - q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm+RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); + unschedule(&slot->alarm); + slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm+RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); } void rhizome_fetch_poll(struct sched_ent *alarm) { - rhizome_file_fetch_record *q=(rhizome_file_fetch_record *)alarm; + struct rhizome_fetch_slot *slot = (struct rhizome_fetch_slot *) alarm; if (alarm->poll.revents & (POLLIN | POLLOUT)) { - switch(q->state) { + switch (slot->state) { case RHIZOME_FETCH_CONNECTING: case RHIZOME_FETCH_SENDINGHTTPREQUEST: - rhizome_fetch_write(q); + rhizome_fetch_write(slot); return; case RHIZOME_FETCH_RXFILE: { /* Keep reading until we have the promised amount of data */ char buffer[8192]; sigPipeFlag = 0; - int bytes = read_nonblock(q->alarm.poll.fd, buffer, sizeof buffer); + int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); /* If we got some data, see if we have found the end of the HTTP request */ if (bytes > 0) { - rhizome_write_content(q, buffer, bytes); + rhizome_write_content(slot, buffer, bytes); return; } else { if (debug & DEBUG_RHIZOME_RX) DEBUG("Empty read, closing connection"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } if (sigPipeFlag) { if (debug & DEBUG_RHIZOME_RX) DEBUG("Received SIGPIPE, closing connection"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } } @@ -943,47 +981,47 @@ void rhizome_fetch_poll(struct sched_ent *alarm) case RHIZOME_FETCH_RXHTTPHEADERS: { /* Keep reading until we have two CR/LFs in a row */ sigPipeFlag = 0; - int bytes = read_nonblock(q->alarm.poll.fd, &q->request[q->request_len], 1024 - q->request_len - 1); + int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1); /* If we got some data, see if we have found the end of the HTTP reply */ if (bytes > 0) { // reset timeout - unschedule(&q->alarm); - q->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); - q->request_len += bytes; - if (http_header_complete(q->request, q->request_len, bytes)) { + unschedule(&slot->alarm); + slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + slot->request_len += bytes; + if (http_header_complete(slot->request, slot->request_len, bytes)) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Got HTTP reply: %s", alloca_toprint(160, q->request, q->request_len)); + DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len)); /* We have all the reply headers, so parse them, taking care of any following bytes of content. */ struct http_response_parts parts; - if (unpack_http_response(q->request, &parts) == -1) { + if (unpack_http_response(slot->request, &parts) == -1) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("Failed HTTP request: failed to unpack http response"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } if (parts.code != 200) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } if (parts.content_length == -1) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("Invalid HTTP reply: missing Content-Length header"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } - q->file_len = parts.content_length; + slot->file_len = parts.content_length; /* We have all we need. The file is already open, so just write out any initial bytes of the body we read. */ - q->state = RHIZOME_FETCH_RXFILE; - int content_bytes = q->request + q->request_len - parts.content_start; + slot->state = RHIZOME_FETCH_RXFILE; + int content_bytes = slot->request + slot->request_len - parts.content_start; if (content_bytes > 0){ - rhizome_write_content(q, parts.content_start, content_bytes); + rhizome_write_content(slot, parts.content_start, content_bytes); return; } } @@ -992,7 +1030,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) default: if (debug & DEBUG_RHIZOME_RX) DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } } @@ -1002,7 +1040,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) // timeout or socket error, close the socket if (debug & DEBUG_RHIZOME_RX) DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); } } diff --git a/serval.h b/serval.h index b3d2c493..9082e2d3 100644 --- a/serval.h +++ b/serval.h @@ -770,7 +770,7 @@ int fd_poll(); void overlay_interface_discover(struct sched_ent *alarm); void overlay_dummy_poll(struct sched_ent *alarm); void overlay_route_tick(struct sched_ent *alarm); -void rhizome_enqueue_suggestions(struct sched_ent *alarm); +void rhizome_start_next_queued_fetches(struct sched_ent *alarm); void server_shutdown_check(struct sched_ent *alarm); void overlay_mdp_poll(struct sched_ent *alarm); void fd_periodicstats(struct sched_ent *alarm);