diff --git a/rhizome.h b/rhizome.h index 96a88c27..48360251 100644 --- a/rhizome.h +++ b/rhizome.h @@ -538,7 +538,19 @@ extern unsigned char favicon_bytes[]; extern int favicon_len; int rhizome_import_from_files(const char *manifestpath,const char *filepath); -int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length); + +enum rhizome_start_fetch_result { + STARTED = 0, + SAMEBUNDLE, + SAMEPAYLOAD, + SUPERSEDED, + OLDERBUNDLE, + NEWERBUNDLE, + IMPORTED, + SLOTBUSY +}; + +enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length); int rhizome_any_fetch_active(); struct http_response_parts { diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 42582863..f58f7503 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -56,6 +56,11 @@ struct rhizome_fetch_slot { /* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size * is less than a given threshold. + * + * TODO: If the queues ever get much larger, use pointer-linked queue instead of physically ordered + * in memory, to avoid the need for memory copies when deleting or inserting queue entries. + * + * @author Andrew Bettison */ struct rhizome_fetch_queue { struct rhizome_fetch_slot active; // must be first element in struct @@ -64,6 +69,8 @@ struct rhizome_fetch_queue { long long size_threshold; // will only hold fetches of fewer than this many bytes }; +/* Static allocation of the candidate queues. + */ struct rhizome_fetch_candidate queue0[5]; struct rhizome_fetch_candidate queue1[4]; struct rhizome_fetch_candidate queue2[3]; @@ -72,8 +79,9 @@ struct rhizome_fetch_candidate queue4[1]; #define NELS(a) (sizeof (a) / sizeof *(a)) +/* Static allocation of the queue structures. Must be in order of ascending size_threshold. + */ struct rhizome_fetch_queue rhizome_fetch_queues[] = { - // Must be in order of ascending size_threshold. { .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .size_threshold = 10000, .active = { .state = RHIZOME_FETCH_FREE } }, { .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .size_threshold = 100000, .active = { .state = RHIZOME_FETCH_FREE } }, { .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .size_threshold = 1000000, .active = { .state = RHIZOME_FETCH_FREE } }, @@ -118,15 +126,25 @@ static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size) return NULL; } +/* Insert a candidate into a given queue at a given position. All candidates succeeding the given + * position are copied backward in the queue to open up an empty element at the given position. If + * the queue was full, then the tail element is discarded, freeing the manifest it points to. + * + * @author Andrew Bettison + */ static struct rhizome_fetch_candidate *rhizome_fetch_insert(struct rhizome_fetch_queue *q, int i) { + struct rhizome_fetch_candidate * const c = &q->candidate_queue[i]; + struct rhizome_fetch_candidate * e = &q->candidate_queue[q->candidate_queue_size - 1]; assert(i >= 0 && i < q->candidate_queue_size); - struct rhizome_fetch_candidate *c = &q->candidate_queue[i]; - struct rhizome_fetch_candidate *e = &q->candidate_queue[q->candidate_queue_size - 1]; + assert(i == 0 || c[-1].manifest); if (debug & DEBUG_RHIZOME_RX) DEBUGF("insert queue[%d] candidate[%d]", q - rhizome_fetch_queues, i); - if (e->manifest) + if (e->manifest) // queue is full rhizome_manifest_free(e->manifest); + else + while (e > c && !e[-1].manifest) + --e; for (; e > c; --e) e[0] = e[-1]; assert(e == c); @@ -134,6 +152,12 @@ static struct rhizome_fetch_candidate *rhizome_fetch_insert(struct rhizome_fetch return c; } +/* Remove the given candidate from a given queue. If the element points to a manifest structure, + * then frees the manifest. All succeeding candidates are copied forward in the queue to close up + * the gap, leaving an empty element at the tail of the queue. + * + * @author Andrew Bettison + */ static void rhizome_fetch_unqueue(struct rhizome_fetch_queue *q, int i) { assert(i >= 0 && i < q->candidate_queue_size); @@ -163,52 +187,6 @@ int rhizome_any_fetch_active() return 0; } -/* - Queue a manifest for importing. - - There are three main cases that can occur here: - - 1. The manifest has a nil payload (filesize=0); - 2. The associated payload is already in our database; or - 3. The associated payload is not already in our database, and so we need - to fetch it before we can import it. - - Cases (1) and (2) are more or less identical, and all we need to do is to - import the manifest into the database. - - Case (3) requires that we fetch the associated file. - - This is where life gets interesting. - - First, we need to make sure that we can free up enough space in the database - for the file. - - Second, we need to work out how we are going to get the file. - If we are on an IPv4 wifi network, then HTTP is probably the way to go. - If we are not on an IPv4 wifi network, then HTTP is not an option, and we need - to use a Rhizome/Overlay protocol to fetch it. It might even be HTTP over MDP - (Serval Mesh Datagram Protocol) or MTCP (Serval Mesh Transmission Control Protocol - -- yet to be specified). - - For efficiency, the MDP transfer protocol should allow multiple listeners to - receive the data. In contrast, it would be nice to have the data auth-crypted, if - only to deal with packet errors (but also naughty people who might want to mess - with the transfer. - - For HTTP over IPv4, the IPv4 address and port number of the sender is sent as part of the - advertisement. -*/ - -static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip); -#define STARTED (0) -#define SLOTBUSY (1) -#define SAMEBUNDLE (2) -#define SAMEPAYLOAD (3) -#define SUPERSEDED (4) -#define OLDERBUNDLE (5) -#define NEWERBUNDLE (6) -#define IMPORTED (7) - /* As defined below uses 64KB */ #define RHIZOME_VERSION_CACHE_NYBLS 2 /* 256=2^8=2nybls */ #define RHIZOME_VERSION_CACHE_SHIFT 1 @@ -471,6 +449,8 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m) * entry and the manifest is freed when the fetch has completed or is abandoned for any reason. * * Verifies manifests as late as possible to avoid wasting time. + * + * @author Andrew Bettison */ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip) { @@ -479,7 +459,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock int priority=100; /* normal priority */ if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Considering manifest import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority); + DEBUGF("Considering import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority); if (rhizome_manifest_version_cache_lookup(m)) { if (debug & DEBUG_RHIZOME_RX) @@ -590,28 +570,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock RETURN(0); } -static void rhizome_start_next_queued_fetch(struct rhizome_fetch_queue *q) -{ - struct rhizome_fetch_candidate *c = &q->candidate_queue[0]; - int result; - while (c->manifest && (result = rhizome_fetch(c->manifest, &c->peer)) != SLOTBUSY) { - if (result == STARTED) - c->manifest = NULL; - rhizome_fetch_unqueue(q, 0); - } -} - -void rhizome_start_next_queued_fetches(struct sched_ent *alarm) -{ - int i; - for (i = 0; i < NQUEUES; ++i) - rhizome_start_next_queued_fetch(&rhizome_fetch_queues[i]); - alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms; - alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms*3; - schedule(alarm); -} - -static int rhizome_start_fetch(struct rhizome_fetch_slot *slot) +static int schedule_fetch(struct rhizome_fetch_slot *slot) { int sock = -1; FILE *file = NULL; @@ -679,22 +638,51 @@ bail: return -1; } -/* Returns STARTED (0) if the fetch was started (the caller should not free the manifest in this - * case, because the fetch slot now has a copy of the pointer, and the manifest will be freed once - * the fetch finishes or is terminated). +/* Start fetching a bundle's payload ready for importing. + * + * Three main cases that can occur here: + * 1) The manifest has a nil payload (filesize=0); + * 2) The payload is already in the database; or + * 3) The payload is not in the database. + * + * Cases (1) and (2) are more or less identical: the bundle can be imported into the database + * immediately. Case (3) requires the payload to be fetched from a remote node. + * + * First, obtain enough space in the database for the file. + * + * Second, work out how we are going to get the file. + * - On an IPv4 WiFi network, HTTP can be used. The IP address and port number are sent in the + * bundle advertisement packet. + * - On a non-IPv4 WiFi network, HTTP is not an option, so MDP must be used. + * + * For efficiency, the MDP transfer protocol could allow multiple listeners to receive the payload + * by eavesdropping on the transfer. In contrast, sending the payload auth-crypted would detect + * packet errors and hostile parties trying to inject false data into the transfer. + * + * Returns STARTED (0) if the fetch was started. + * Returns IMPORTED if a fetch was not started because the payload is nil or already in the + * Rhizome store, so the import was performed instead. * Returns SAMEPAYLOAD if a fetch of the same payload (file ID) is already active. * Returns SUPERSEDED if the fetch was not started because a newer version of the same bundle is * already present. - * Returns SLOTBUSY if the fetch was not queued because no slots are available. * Returns SAMEBUNDLE if a fetch of the same bundle is already active. * Returns OLDERBUNDLE if a fetch of an older version of the same bundle is already active. * Returns NEWERBUNDLE if a fetch of a newer version of the same bundle is already active. - * Returns IMPORTED if a fetch was not started because the payload is nil or already in the - * Rhizome store, so the import was performed instead. + * Returns SLOTBUSY if the given slot is currently being used for another fetch. * Returns -1 on error. + * + * In the STARTED case, the caller should not free the manifest because the fetch slot now has a + * copy of the pointer, and the manifest will be freed once the fetch finishes or is terminated. In + * all other cases, the caller is responsible for freeing the manifest. + * + * @author Andrew Bettison */ -static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip) +static enum rhizome_start_fetch_result +rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip) { + if (slot->state != RHIZOME_FETCH_FREE) + return SLOTBUSY; + const char *bid = alloca_tohex_bid(m->cryptoSignPublic); /* Do the quick rejection tests first, before the more expensive ones, @@ -712,7 +700,13 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip) */ if (1||(debug & DEBUG_RHIZOME_RX)) - DEBUGF("Fetching bundle bid=%s version=%lld size=%lld peerip=%s", bid, m->version, m->fileLength, alloca_sockaddr(peerip)); + DEBUGF("Fetching bundle slot=%d bid=%s version=%lld size=%lld peerip=%s", + slot - &rhizome_fetch_queues[0].active, + bid, + m->version, + m->fileLength, + alloca_sockaddr(peerip) + ); // If the payload is empty, no need to fetch, so import now. if (m->fileLength == 0) { @@ -723,14 +717,6 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip) return IMPORTED; } - // Ensure there is a slot available before doing more expensive checks. - struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(m->fileLength); - if (slot == NULL) { - if (debug & DEBUG_RHIZOME_RX) - DEBUG(" fetch not started - all slots full"); - return SLOTBUSY; - } - // If we already have this version or newer, do not fetch. if (rhizome_manifest_version_cache_lookup(m)) { if (debug & DEBUG_RHIZOME_RX) @@ -783,9 +769,9 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip) // Fetch the file, unless already queued. for (i = 0; i < NQUEUES; ++i) { - struct rhizome_fetch_slot *as = &rhizome_fetch_queues[i].active; - const rhizome_manifest *am = as->manifest; - if (as->state != RHIZOME_FETCH_FREE && strcasecmp(m->fileHexHash, am->fileHexHash) == 0) { + struct rhizome_fetch_slot *s = &rhizome_fetch_queues[i].active; + const rhizome_manifest *sm = s->manifest; + if (s->state != RHIZOME_FETCH_FREE && strcasecmp(m->fileHexHash, sm->fileHexHash) == 0) { if (debug & DEBUG_RHIZOME_RX) DEBUGF(" fetch already in progress, slot=%d filehash=%s", i, m->fileHexHash); return SAMEPAYLOAD; @@ -805,7 +791,7 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip) m->dataFileName = strdup(slot->filename); m->dataFileUnlinkOnFree = 0; slot->manifest = m; - if (rhizome_start_fetch(slot) == -1) { + if (schedule_fetch(slot) == -1) { slot->filename[0] = '\0'; return -1; } @@ -814,7 +800,12 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip) return STARTED; } -int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length) +/* Returns STARTED (0) if the fetch was started. + * Returns SLOTBUSY if there is no available fetch slot for performing the fetch. + * Returns -1 on error. + */ +enum rhizome_start_fetch_result +rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length) { assert(peerip); struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES); @@ -829,16 +820,71 @@ int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, c slot->request_len = strbuf_len(r); if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) return -1; - if (rhizome_start_fetch(slot) == -1) { + if (schedule_fetch(slot) == -1) { slot->filename[0] = '\0'; return -1; } return STARTED; } +/* Activate the next fetch for the given slot. This takes the next job from the head of the slot's + * own queue. If there is none, then takes jobs from other queues. + * + * @author Andrew Bettison + */ +static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot) +{ + struct rhizome_fetch_queue *q; + for (q = (struct rhizome_fetch_queue *) slot; q >= rhizome_fetch_queues; --q) { + int i = 0; + struct rhizome_fetch_candidate *c; + while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) { + int result = rhizome_fetch(slot, c->manifest, &c->peer); + switch (result) { + case SLOTBUSY: + return; + case STARTED: + c->manifest = NULL; + rhizome_fetch_unqueue(q, i); + return; + case IMPORTED: + case SAMEBUNDLE: + case SAMEPAYLOAD: + case SUPERSEDED: + case NEWERBUNDLE: + // Discard the candidate fetch and loop to try the next in queue. + rhizome_fetch_unqueue(q, i); + break; + case OLDERBUNDLE: + // Do not un-queue, so that when the fetch of the older bundle finishes, we will start + // fetching a newer one. + ++i; + break; + } + } + } +} + +/* Called periodically to start any queued fetches that may not have been started as soon as the + * slot became available, for some unknown reason. + * + * This method is probably unnecessary. + * + * @author Andrew Bettison + */ +void rhizome_start_next_queued_fetches(struct sched_ent *alarm) +{ + int i; + for (i = 0; i < NQUEUES; ++i) + rhizome_start_next_queued_fetch(&rhizome_fetch_queues[i].active); + alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms; + alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms * 3; + schedule(alarm); +} + static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { - if (debug & DEBUG_RHIZOME_RX) + if (debug & DEBUG_RHIZOME_RX) DEBUGF("Close Rhizome fetch slot=%d", slot - &rhizome_fetch_queues[0].active); assert(slot->state != RHIZOME_FETCH_FREE); @@ -864,9 +910,7 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) // Activate the next queued fetch that is eligible for this slot. Try starting candidates from // all queues with the same or smaller size thresholds until the slot is taken. - struct rhizome_fetch_queue *q; - for (q = (struct rhizome_fetch_queue *) slot; slot->state == RHIZOME_FETCH_FREE && q >= rhizome_fetch_queues; --q) - rhizome_start_next_queued_fetch(q); + rhizome_start_next_queued_fetch(slot); return 0; } @@ -1035,14 +1079,12 @@ void rhizome_fetch_poll(struct sched_ent *alarm) } } } - if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){ // timeout or socket error, close the socket if (debug & DEBUG_RHIZOME_RX) DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR); rhizome_fetch_close(slot); } - } /*