mirror of
https://github.com/servalproject/serval-dna.git
synced 2024-12-20 05:37:57 +00:00
Issue #30, fix Rhizome fetch slot allocation logic
Also improve the block comment documentation of many functions.
This commit is contained in:
parent
9ee9b63aad
commit
935a545ac7
14
rhizome.h
14
rhizome.h
@ -538,7 +538,19 @@ extern unsigned char favicon_bytes[];
|
||||
extern int favicon_len;
|
||||
|
||||
int rhizome_import_from_files(const char *manifestpath,const char *filepath);
|
||||
int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length);
|
||||
|
||||
enum rhizome_start_fetch_result {
|
||||
STARTED = 0,
|
||||
SAMEBUNDLE,
|
||||
SAMEPAYLOAD,
|
||||
SUPERSEDED,
|
||||
OLDERBUNDLE,
|
||||
NEWERBUNDLE,
|
||||
IMPORTED,
|
||||
SLOTBUSY
|
||||
};
|
||||
|
||||
enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length);
|
||||
int rhizome_any_fetch_active();
|
||||
|
||||
struct http_response_parts {
|
||||
|
244
rhizome_fetch.c
244
rhizome_fetch.c
@ -56,6 +56,11 @@ struct rhizome_fetch_slot {
|
||||
|
||||
/* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size
|
||||
* is less than a given threshold.
|
||||
*
|
||||
* TODO: If the queues ever get much larger, use pointer-linked queue instead of physically ordered
|
||||
* in memory, to avoid the need for memory copies when deleting or inserting queue entries.
|
||||
*
|
||||
* @author Andrew Bettison <andrew@servalproject.com>
|
||||
*/
|
||||
struct rhizome_fetch_queue {
|
||||
struct rhizome_fetch_slot active; // must be first element in struct
|
||||
@ -64,6 +69,8 @@ struct rhizome_fetch_queue {
|
||||
long long size_threshold; // will only hold fetches of fewer than this many bytes
|
||||
};
|
||||
|
||||
/* Static allocation of the candidate queues.
|
||||
*/
|
||||
struct rhizome_fetch_candidate queue0[5];
|
||||
struct rhizome_fetch_candidate queue1[4];
|
||||
struct rhizome_fetch_candidate queue2[3];
|
||||
@ -72,8 +79,9 @@ struct rhizome_fetch_candidate queue4[1];
|
||||
|
||||
#define NELS(a) (sizeof (a) / sizeof *(a))
|
||||
|
||||
/* Static allocation of the queue structures. Must be in order of ascending size_threshold.
|
||||
*/
|
||||
struct rhizome_fetch_queue rhizome_fetch_queues[] = {
|
||||
// Must be in order of ascending size_threshold.
|
||||
{ .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .size_threshold = 10000, .active = { .state = RHIZOME_FETCH_FREE } },
|
||||
{ .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .size_threshold = 100000, .active = { .state = RHIZOME_FETCH_FREE } },
|
||||
{ .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .size_threshold = 1000000, .active = { .state = RHIZOME_FETCH_FREE } },
|
||||
@ -118,15 +126,25 @@ static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Insert a candidate into a given queue at a given position. All candidates succeeding the given
|
||||
* position are copied backward in the queue to open up an empty element at the given position. If
|
||||
* the queue was full, then the tail element is discarded, freeing the manifest it points to.
|
||||
*
|
||||
* @author Andrew Bettison <andrew@servalproject.com>
|
||||
*/
|
||||
static struct rhizome_fetch_candidate *rhizome_fetch_insert(struct rhizome_fetch_queue *q, int i)
|
||||
{
|
||||
struct rhizome_fetch_candidate * const c = &q->candidate_queue[i];
|
||||
struct rhizome_fetch_candidate * e = &q->candidate_queue[q->candidate_queue_size - 1];
|
||||
assert(i >= 0 && i < q->candidate_queue_size);
|
||||
struct rhizome_fetch_candidate *c = &q->candidate_queue[i];
|
||||
struct rhizome_fetch_candidate *e = &q->candidate_queue[q->candidate_queue_size - 1];
|
||||
assert(i == 0 || c[-1].manifest);
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
DEBUGF("insert queue[%d] candidate[%d]", q - rhizome_fetch_queues, i);
|
||||
if (e->manifest)
|
||||
if (e->manifest) // queue is full
|
||||
rhizome_manifest_free(e->manifest);
|
||||
else
|
||||
while (e > c && !e[-1].manifest)
|
||||
--e;
|
||||
for (; e > c; --e)
|
||||
e[0] = e[-1];
|
||||
assert(e == c);
|
||||
@ -134,6 +152,12 @@ static struct rhizome_fetch_candidate *rhizome_fetch_insert(struct rhizome_fetch
|
||||
return c;
|
||||
}
|
||||
|
||||
/* Remove the given candidate from a given queue. If the element points to a manifest structure,
|
||||
* then frees the manifest. All succeeding candidates are copied forward in the queue to close up
|
||||
* the gap, leaving an empty element at the tail of the queue.
|
||||
*
|
||||
* @author Andrew Bettison <andrew@servalproject.com>
|
||||
*/
|
||||
static void rhizome_fetch_unqueue(struct rhizome_fetch_queue *q, int i)
|
||||
{
|
||||
assert(i >= 0 && i < q->candidate_queue_size);
|
||||
@ -163,52 +187,6 @@ int rhizome_any_fetch_active()
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
Queue a manifest for importing.
|
||||
|
||||
There are three main cases that can occur here:
|
||||
|
||||
1. The manifest has a nil payload (filesize=0);
|
||||
2. The associated payload is already in our database; or
|
||||
3. The associated payload is not already in our database, and so we need
|
||||
to fetch it before we can import it.
|
||||
|
||||
Cases (1) and (2) are more or less identical, and all we need to do is to
|
||||
import the manifest into the database.
|
||||
|
||||
Case (3) requires that we fetch the associated file.
|
||||
|
||||
This is where life gets interesting.
|
||||
|
||||
First, we need to make sure that we can free up enough space in the database
|
||||
for the file.
|
||||
|
||||
Second, we need to work out how we are going to get the file.
|
||||
If we are on an IPv4 wifi network, then HTTP is probably the way to go.
|
||||
If we are not on an IPv4 wifi network, then HTTP is not an option, and we need
|
||||
to use a Rhizome/Overlay protocol to fetch it. It might even be HTTP over MDP
|
||||
(Serval Mesh Datagram Protocol) or MTCP (Serval Mesh Transmission Control Protocol
|
||||
-- yet to be specified).
|
||||
|
||||
For efficiency, the MDP transfer protocol should allow multiple listeners to
|
||||
receive the data. In contrast, it would be nice to have the data auth-crypted, if
|
||||
only to deal with packet errors (but also naughty people who might want to mess
|
||||
with the transfer.
|
||||
|
||||
For HTTP over IPv4, the IPv4 address and port number of the sender is sent as part of the
|
||||
advertisement.
|
||||
*/
|
||||
|
||||
static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip);
|
||||
#define STARTED (0)
|
||||
#define SLOTBUSY (1)
|
||||
#define SAMEBUNDLE (2)
|
||||
#define SAMEPAYLOAD (3)
|
||||
#define SUPERSEDED (4)
|
||||
#define OLDERBUNDLE (5)
|
||||
#define NEWERBUNDLE (6)
|
||||
#define IMPORTED (7)
|
||||
|
||||
/* As defined below uses 64KB */
|
||||
#define RHIZOME_VERSION_CACHE_NYBLS 2 /* 256=2^8=2nybls */
|
||||
#define RHIZOME_VERSION_CACHE_SHIFT 1
|
||||
@ -471,6 +449,8 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m)
|
||||
* entry and the manifest is freed when the fetch has completed or is abandoned for any reason.
|
||||
*
|
||||
* Verifies manifests as late as possible to avoid wasting time.
|
||||
*
|
||||
* @author Andrew Bettison <andrew@servalproject.com>
|
||||
*/
|
||||
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
{
|
||||
@ -479,7 +459,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
|
||||
int priority=100; /* normal priority */
|
||||
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
DEBUGF("Considering manifest import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority);
|
||||
DEBUGF("Considering import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority);
|
||||
|
||||
if (rhizome_manifest_version_cache_lookup(m)) {
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
@ -590,28 +570,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
|
||||
RETURN(0);
|
||||
}
|
||||
|
||||
static void rhizome_start_next_queued_fetch(struct rhizome_fetch_queue *q)
|
||||
{
|
||||
struct rhizome_fetch_candidate *c = &q->candidate_queue[0];
|
||||
int result;
|
||||
while (c->manifest && (result = rhizome_fetch(c->manifest, &c->peer)) != SLOTBUSY) {
|
||||
if (result == STARTED)
|
||||
c->manifest = NULL;
|
||||
rhizome_fetch_unqueue(q, 0);
|
||||
}
|
||||
}
|
||||
|
||||
void rhizome_start_next_queued_fetches(struct sched_ent *alarm)
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < NQUEUES; ++i)
|
||||
rhizome_start_next_queued_fetch(&rhizome_fetch_queues[i]);
|
||||
alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms;
|
||||
alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms*3;
|
||||
schedule(alarm);
|
||||
}
|
||||
|
||||
static int rhizome_start_fetch(struct rhizome_fetch_slot *slot)
|
||||
static int schedule_fetch(struct rhizome_fetch_slot *slot)
|
||||
{
|
||||
int sock = -1;
|
||||
FILE *file = NULL;
|
||||
@ -679,22 +638,51 @@ bail:
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Returns STARTED (0) if the fetch was started (the caller should not free the manifest in this
|
||||
* case, because the fetch slot now has a copy of the pointer, and the manifest will be freed once
|
||||
* the fetch finishes or is terminated).
|
||||
/* Start fetching a bundle's payload ready for importing.
|
||||
*
|
||||
* Three main cases that can occur here:
|
||||
* 1) The manifest has a nil payload (filesize=0);
|
||||
* 2) The payload is already in the database; or
|
||||
* 3) The payload is not in the database.
|
||||
*
|
||||
* Cases (1) and (2) are more or less identical: the bundle can be imported into the database
|
||||
* immediately. Case (3) requires the payload to be fetched from a remote node.
|
||||
*
|
||||
* First, obtain enough space in the database for the file.
|
||||
*
|
||||
* Second, work out how we are going to get the file.
|
||||
* - On an IPv4 WiFi network, HTTP can be used. The IP address and port number are sent in the
|
||||
* bundle advertisement packet.
|
||||
* - On a non-IPv4 WiFi network, HTTP is not an option, so MDP must be used.
|
||||
*
|
||||
* For efficiency, the MDP transfer protocol could allow multiple listeners to receive the payload
|
||||
* by eavesdropping on the transfer. In contrast, sending the payload auth-crypted would detect
|
||||
* packet errors and hostile parties trying to inject false data into the transfer.
|
||||
*
|
||||
* Returns STARTED (0) if the fetch was started.
|
||||
* Returns IMPORTED if a fetch was not started because the payload is nil or already in the
|
||||
* Rhizome store, so the import was performed instead.
|
||||
* Returns SAMEPAYLOAD if a fetch of the same payload (file ID) is already active.
|
||||
* Returns SUPERSEDED if the fetch was not started because a newer version of the same bundle is
|
||||
* already present.
|
||||
* Returns SLOTBUSY if the fetch was not queued because no slots are available.
|
||||
* Returns SAMEBUNDLE if a fetch of the same bundle is already active.
|
||||
* Returns OLDERBUNDLE if a fetch of an older version of the same bundle is already active.
|
||||
* Returns NEWERBUNDLE if a fetch of a newer version of the same bundle is already active.
|
||||
* Returns IMPORTED if a fetch was not started because the payload is nil or already in the
|
||||
* Rhizome store, so the import was performed instead.
|
||||
* Returns SLOTBUSY if the given slot is currently being used for another fetch.
|
||||
* Returns -1 on error.
|
||||
*
|
||||
* In the STARTED case, the caller should not free the manifest because the fetch slot now has a
|
||||
* copy of the pointer, and the manifest will be freed once the fetch finishes or is terminated. In
|
||||
* all other cases, the caller is responsible for freeing the manifest.
|
||||
*
|
||||
* @author Andrew Bettison <andrew@servalproject.com>
|
||||
*/
|
||||
static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
static enum rhizome_start_fetch_result
|
||||
rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
{
|
||||
if (slot->state != RHIZOME_FETCH_FREE)
|
||||
return SLOTBUSY;
|
||||
|
||||
const char *bid = alloca_tohex_bid(m->cryptoSignPublic);
|
||||
|
||||
/* Do the quick rejection tests first, before the more expensive ones,
|
||||
@ -712,7 +700,13 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
*/
|
||||
|
||||
if (1||(debug & DEBUG_RHIZOME_RX))
|
||||
DEBUGF("Fetching bundle bid=%s version=%lld size=%lld peerip=%s", bid, m->version, m->fileLength, alloca_sockaddr(peerip));
|
||||
DEBUGF("Fetching bundle slot=%d bid=%s version=%lld size=%lld peerip=%s",
|
||||
slot - &rhizome_fetch_queues[0].active,
|
||||
bid,
|
||||
m->version,
|
||||
m->fileLength,
|
||||
alloca_sockaddr(peerip)
|
||||
);
|
||||
|
||||
// If the payload is empty, no need to fetch, so import now.
|
||||
if (m->fileLength == 0) {
|
||||
@ -723,14 +717,6 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
return IMPORTED;
|
||||
}
|
||||
|
||||
// Ensure there is a slot available before doing more expensive checks.
|
||||
struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(m->fileLength);
|
||||
if (slot == NULL) {
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
DEBUG(" fetch not started - all slots full");
|
||||
return SLOTBUSY;
|
||||
}
|
||||
|
||||
// If we already have this version or newer, do not fetch.
|
||||
if (rhizome_manifest_version_cache_lookup(m)) {
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
@ -783,9 +769,9 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
|
||||
// Fetch the file, unless already queued.
|
||||
for (i = 0; i < NQUEUES; ++i) {
|
||||
struct rhizome_fetch_slot *as = &rhizome_fetch_queues[i].active;
|
||||
const rhizome_manifest *am = as->manifest;
|
||||
if (as->state != RHIZOME_FETCH_FREE && strcasecmp(m->fileHexHash, am->fileHexHash) == 0) {
|
||||
struct rhizome_fetch_slot *s = &rhizome_fetch_queues[i].active;
|
||||
const rhizome_manifest *sm = s->manifest;
|
||||
if (s->state != RHIZOME_FETCH_FREE && strcasecmp(m->fileHexHash, sm->fileHexHash) == 0) {
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
DEBUGF(" fetch already in progress, slot=%d filehash=%s", i, m->fileHexHash);
|
||||
return SAMEPAYLOAD;
|
||||
@ -805,7 +791,7 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
m->dataFileName = strdup(slot->filename);
|
||||
m->dataFileUnlinkOnFree = 0;
|
||||
slot->manifest = m;
|
||||
if (rhizome_start_fetch(slot) == -1) {
|
||||
if (schedule_fetch(slot) == -1) {
|
||||
slot->filename[0] = '\0';
|
||||
return -1;
|
||||
}
|
||||
@ -814,7 +800,12 @@ static int rhizome_fetch(rhizome_manifest *m, const struct sockaddr_in *peerip)
|
||||
return STARTED;
|
||||
}
|
||||
|
||||
int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length)
|
||||
/* Returns STARTED (0) if the fetch was started.
|
||||
* Returns SLOTBUSY if there is no available fetch slot for performing the fetch.
|
||||
* Returns -1 on error.
|
||||
*/
|
||||
enum rhizome_start_fetch_result
|
||||
rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length)
|
||||
{
|
||||
assert(peerip);
|
||||
struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES);
|
||||
@ -829,16 +820,71 @@ int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, c
|
||||
slot->request_len = strbuf_len(r);
|
||||
if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length)))
|
||||
return -1;
|
||||
if (rhizome_start_fetch(slot) == -1) {
|
||||
if (schedule_fetch(slot) == -1) {
|
||||
slot->filename[0] = '\0';
|
||||
return -1;
|
||||
}
|
||||
return STARTED;
|
||||
}
|
||||
|
||||
/* Activate the next fetch for the given slot. This takes the next job from the head of the slot's
|
||||
* own queue. If there is none, then takes jobs from other queues.
|
||||
*
|
||||
* @author Andrew Bettison <andrew@servalproject.com>
|
||||
*/
|
||||
static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot)
|
||||
{
|
||||
struct rhizome_fetch_queue *q;
|
||||
for (q = (struct rhizome_fetch_queue *) slot; q >= rhizome_fetch_queues; --q) {
|
||||
int i = 0;
|
||||
struct rhizome_fetch_candidate *c;
|
||||
while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) {
|
||||
int result = rhizome_fetch(slot, c->manifest, &c->peer);
|
||||
switch (result) {
|
||||
case SLOTBUSY:
|
||||
return;
|
||||
case STARTED:
|
||||
c->manifest = NULL;
|
||||
rhizome_fetch_unqueue(q, i);
|
||||
return;
|
||||
case IMPORTED:
|
||||
case SAMEBUNDLE:
|
||||
case SAMEPAYLOAD:
|
||||
case SUPERSEDED:
|
||||
case NEWERBUNDLE:
|
||||
// Discard the candidate fetch and loop to try the next in queue.
|
||||
rhizome_fetch_unqueue(q, i);
|
||||
break;
|
||||
case OLDERBUNDLE:
|
||||
// Do not un-queue, so that when the fetch of the older bundle finishes, we will start
|
||||
// fetching a newer one.
|
||||
++i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Called periodically to start any queued fetches that may not have been started as soon as the
|
||||
* slot became available, for some unknown reason.
|
||||
*
|
||||
* This method is probably unnecessary.
|
||||
*
|
||||
* @author Andrew Bettison <andrew@servalproject.com>
|
||||
*/
|
||||
void rhizome_start_next_queued_fetches(struct sched_ent *alarm)
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < NQUEUES; ++i)
|
||||
rhizome_start_next_queued_fetch(&rhizome_fetch_queues[i].active);
|
||||
alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms;
|
||||
alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms * 3;
|
||||
schedule(alarm);
|
||||
}
|
||||
|
||||
static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
|
||||
{
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
DEBUGF("Close Rhizome fetch slot=%d", slot - &rhizome_fetch_queues[0].active);
|
||||
assert(slot->state != RHIZOME_FETCH_FREE);
|
||||
|
||||
@ -864,9 +910,7 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
|
||||
|
||||
// Activate the next queued fetch that is eligible for this slot. Try starting candidates from
|
||||
// all queues with the same or smaller size thresholds until the slot is taken.
|
||||
struct rhizome_fetch_queue *q;
|
||||
for (q = (struct rhizome_fetch_queue *) slot; slot->state == RHIZOME_FETCH_FREE && q >= rhizome_fetch_queues; --q)
|
||||
rhizome_start_next_queued_fetch(q);
|
||||
rhizome_start_next_queued_fetch(slot);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -1035,14 +1079,12 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){
|
||||
// timeout or socket error, close the socket
|
||||
if (debug & DEBUG_RHIZOME_RX)
|
||||
DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR);
|
||||
rhizome_fetch_close(slot);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
|
Loading…
Reference in New Issue
Block a user