diff --git a/fdqueue.c b/fdqueue.c index da1c609f..5914e073 100644 --- a/fdqueue.c +++ b/fdqueue.c @@ -74,6 +74,11 @@ int deadline(struct sched_ent *alarm) return 0; } +int is_scheduled(const struct sched_ent *alarm) +{ + return alarm->_next || alarm->_prev || alarm == next_alarm || alarm == next_deadline; +} + // add an alarm to the list of scheduled function calls. // simply populate .alarm with the absolute time, and .function with the method to call. // on calling .poll.revents will be zero. @@ -84,8 +89,8 @@ int _schedule(struct __sourceloc __whence, struct sched_ent *alarm) struct sched_ent *node = next_alarm, *last = NULL; - if (alarm->_next || alarm->_prev || alarm==next_alarm || alarm==next_deadline) - FATAL("Attempted to schedule an alarm that is still scheduled."); + if (is_scheduled(alarm)) + FATAL("Scheduling an alarm that is already scheduled"); if (!alarm->function) return WHY("Can't schedule if you haven't set the function pointer"); diff --git a/log.h b/log.h index 83234e3e..d3a667a2 100644 --- a/log.h +++ b/log.h @@ -137,6 +137,7 @@ void set_log_implementation(void (*log_function)(int level, struct strbuf *buf)) #define alloca_toprint(dstlen,buf,len) toprint((char *)alloca((dstlen) == -1 ? toprint_len((const char *)(buf),(len), "``") + 1 : (dstlen)), (dstlen), (const char *)(buf), (len), "``") #define alloca_str_toprint(str) toprint_str((char *)alloca(toprint_str_len(str, "``") + 1), -1, (str), "``") +#define alloca_sockaddr(addr) strbuf_str(strbuf_append_sockaddr(strbuf_alloca(40), (const struct sockaddr *)(addr))) #define __HERE__ ((struct __sourceloc){ .file = __FILE__, .line = __LINE__, .function = __FUNCTION__ }) #define __NOWHERE__ ((struct __sourceloc){ .file = NULL, .line = 0, .function = NULL }) diff --git a/overlay.c b/overlay.c index bc6d1330..70db7069 100644 --- a/overlay.c +++ b/overlay.c @@ -84,10 +84,6 @@ int overlayServerMode() send periodic traffic. This means we need to */ INFO("Running in overlay mode."); - /* Make sure rhizome configured settings are known. */ - if (rhizome_fetch_interval_ms < 1) - rhizome_configure(); - /* Get keyring available for use. Required for MDP, and very soon as a complete replacement for the HLR for DNA lookups, even in non-overlay mode. */ @@ -165,11 +161,6 @@ schedule(&_sched_##X); } // preload directory service information directory_service_init(); - /* Pick next rhizome files to grab every few seconds - from the priority list continuously being built from observed - bundle announcements */ - SCHEDULE(rhizome_enqueue_suggestions, rhizome_fetch_interval_ms, rhizome_fetch_interval_ms*3); - /* Periodically check for new interfaces */ SCHEDULE(overlay_interface_discover, 1, 100); diff --git a/rhizome.c b/rhizome.c index 642d6ba6..732bfa83 100644 --- a/rhizome.c +++ b/rhizome.c @@ -21,24 +21,31 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include -static int rhizome_enabled_flag = -1; // unknown -int rhizome_fetch_interval_ms = -1; +static int _rhizome_enabled = -1; +static int _rhizome_fetch_delay_ms = -1; /* Configure rhizome. @author Andrew Bettison */ int rhizome_configure() { - rhizome_enabled_flag = confValueGetBoolean("rhizome.enable", 1); - rhizome_fetch_interval_ms = (int) confValueGetInt64Range("rhizome.fetch_interval_ms", 3000, 1, 3600000); + _rhizome_enabled = confValueGetBoolean("rhizome.enable", 1); + _rhizome_fetch_delay_ms = (int) confValueGetInt64Range("rhizome.fetch_delay_ms", 50, 1, 3600000); return 0; } int rhizome_enabled() { - if (rhizome_enabled_flag < 0) + if (_rhizome_enabled < 0) rhizome_configure(); - return rhizome_enabled_flag; + return _rhizome_enabled; +} + +int rhizome_fetch_delay_ms() +{ + if (_rhizome_fetch_delay_ms < 1) + rhizome_configure(); + return _rhizome_fetch_delay_ms; } /* Import a bundle from a pair of files, one containing the manifest and the optional other diff --git a/rhizome.h b/rhizome.h index e8d823f5..ffca1047 100644 --- a/rhizome.h +++ b/rhizome.h @@ -150,10 +150,11 @@ typedef struct rhizome_manifest { #define RHIZOME_SERVICE_MESHMS "MeshMS1" extern long long rhizome_space; -extern int rhizome_fetch_interval_ms; extern unsigned short rhizome_http_server_port; int rhizome_configure(); +int rhizome_enabled(); +int rhizome_fetch_delay_ms(); int rhizome_set_datastore_path(const char *path); @@ -284,7 +285,6 @@ int rhizome_find_duplicate(const rhizome_manifest *m, rhizome_manifest **found, int rhizome_manifest_to_bar(rhizome_manifest *m,unsigned char *bar); long long rhizome_bar_version(unsigned char *bar); unsigned long long rhizome_bar_bidprefix_ll(unsigned char *bar); -int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip, int *manifest_kept); int rhizome_list_manifests(const char *service, const char *sender_sid, const char *recipient_sid, int limit, int offset); int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest **mp); int rhizome_retrieve_file(const char *fileid, const char *filepath, @@ -325,10 +325,8 @@ int rhizome_sign_hash_with_key(rhizome_manifest *m,const unsigned char *sk, int rhizome_verify_bundle_privatekey(rhizome_manifest *m, const unsigned char *sk, const unsigned char *pk); int rhizome_find_bundle_author(rhizome_manifest *m); -int rhizome_queue_ignore_manifest(rhizome_manifest *m, - struct sockaddr_in *peerip,int timeout); -int rhizome_ignore_manifest_check(rhizome_manifest *m, - struct sockaddr_in *peerip); +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout); +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip); /* one manifest is required per candidate, plus a few spare. so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES. @@ -336,8 +334,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, #define MAX_RHIZOME_MANIFESTS 24 #define MAX_CANDIDATES 16 -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, - struct sockaddr_in *peerip); +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip); typedef struct rhizome_http_request { struct sched_ent alarm; @@ -542,8 +539,20 @@ extern unsigned char favicon_bytes[]; extern int favicon_len; int rhizome_import_from_files(const char *manifestpath,const char *filepath); -int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length); -int rhizome_count_queued_imports(); + +enum rhizome_start_fetch_result { + STARTED = 0, + SAMEBUNDLE, + SAMEPAYLOAD, + SUPERSEDED, + OLDERBUNDLE, + NEWERBUNDLE, + IMPORTED, + SLOTBUSY +}; + +enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length); +int rhizome_any_fetch_active(); struct http_response_parts { int code; diff --git a/rhizome_direct_http.c b/rhizome_direct_http.c index 9a5380e6..584b08db 100644 --- a/rhizome_direct_http.c +++ b/rhizome_direct_http.c @@ -881,7 +881,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r) /* Fetching the manifest, and then using it to see if we want to fetch the file for import is all handled asynchronously, so just wait for it to finish. */ - while (rhizome_count_queued_imports()) + while (rhizome_any_fetch_active()) fd_poll(); } diff --git a/rhizome_fetch.c b/rhizome_fetch.c index a2cef40a..b0c6be66 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -24,111 +24,182 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include "str.h" -extern int sigPipeFlag; -extern int sigIoFlag; - - -typedef struct rhizome_file_fetch_record { - struct sched_ent alarm; +/* Represents a queued fetch of a bundle payload, for which the manifest is already known. + */ +struct rhizome_fetch_candidate { rhizome_manifest *manifest; - FILE *file; - char filename[1024]; - char request[1024]; - int request_len; - int request_ofs; - long long file_len; - long long file_ofs; + struct sockaddr_in peer; + int priority; +}; + +/* Represents an active fetch (in progress) of a bundle payload (.manifest != NULL) or of a bundle + * manifest (.manifest == NULL). + */ +struct rhizome_fetch_slot { + struct sched_ent alarm; // must be first element in struct + rhizome_manifest *manifest; + struct sockaddr_in peer; int state; #define RHIZOME_FETCH_FREE 0 #define RHIZOME_FETCH_CONNECTING 1 #define RHIZOME_FETCH_SENDINGHTTPREQUEST 2 #define RHIZOME_FETCH_RXHTTPHEADERS 3 #define RHIZOME_FETCH_RXFILE 4 - struct sockaddr_in peer; -} rhizome_file_fetch_record; + FILE *file; + char filename[1024]; + char request[1024]; + int request_len; + int request_ofs; + int64_t file_len; + int64_t file_ofs; +}; -/* List of queued transfers */ -#define MAX_QUEUED_FILES 4 -rhizome_file_fetch_record file_fetch_queue[MAX_QUEUED_FILES]; +/* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size + * is less than a given threshold. + * + * TODO: If the queues ever get much larger, use pointer-linked queue instead of physically ordered + * in memory, to avoid the need for memory copies when deleting or inserting queue entries. + * + * @author Andrew Bettison + */ +struct rhizome_fetch_queue { + struct rhizome_fetch_slot active; // must be first element in struct + int candidate_queue_size; + struct rhizome_fetch_candidate *candidate_queue; + long long size_threshold; // will only hold fetches of fewer than this many bytes +}; -#define QUEUED (0) -#define QSAMEPAYLOAD (1) -#define QDATED (2) -#define QBUSY (3) -#define QSAME (4) -#define QOLDER (5) -#define QNEWER (6) -#define QIMPORTED (7) +/* Static allocation of the candidate queues. + */ +struct rhizome_fetch_candidate queue0[5]; +struct rhizome_fetch_candidate queue1[4]; +struct rhizome_fetch_candidate queue2[3]; +struct rhizome_fetch_candidate queue3[2]; +struct rhizome_fetch_candidate queue4[1]; -int rhizome_count_queued_imports() +#define NELS(a) (sizeof (a) / sizeof *(a)) + +/* Static allocation of the queue structures. Must be in order of ascending size_threshold. + */ +struct rhizome_fetch_queue rhizome_fetch_queues[] = { + { .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .size_threshold = 10000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .size_threshold = 100000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .size_threshold = 1000000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .size_threshold = 10000000, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .size_threshold = -1, .active = { .state = RHIZOME_FETCH_FREE } } +}; + +#define NQUEUES NELS(rhizome_fetch_queues) + +static struct sched_ent sched_activate = STRUCT_SCHED_ENT_UNUSED; +static struct profile_total fetch_stats; + +/* Find a queue suitable for a fetch of the given number of bytes. If there is no suitable queue, + * return NULL. + * + * @author Andrew Bettison + */ +static struct rhizome_fetch_queue *rhizome_find_queue(long long size) { - int count = 0; int i; - for (i = 0; i < MAX_QUEUED_FILES; ++i) - if (file_fetch_queue[i].state != RHIZOME_FETCH_FREE) - ++count; - return count; -} - -static rhizome_file_fetch_record *rhizome_find_import_slot() -{ - // Find a free fetch queue slot. - int i; - for (i = 0; i < MAX_QUEUED_FILES; ++i) - if (file_fetch_queue[i].state == RHIZOME_FETCH_FREE) - return &file_fetch_queue[i]; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; + if (q->size_threshold < 0 || size < q->size_threshold) + return q; + } return NULL; } -struct profile_total fetch_stats; +/* Find a free fetch slot suitable for fetching the given number of bytes. This could be a slot in + * any queue that would accept the candidate, ie, with a larger size threshold. Returns NULL if + * there is no suitable free slot. + * + * @author Andrew Bettison + */ +static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size) +{ + int i; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; + if ((q->size_threshold < 0 || size < q->size_threshold) && q->active.state == RHIZOME_FETCH_FREE) + return &q->active; + } + return NULL; +} -/* - Queue a manifest for importing. +/* Insert a candidate into a given queue at a given position. All candidates succeeding the given + * position are copied backward in the queue to open up an empty element at the given position. If + * the queue was full, then the tail element is discarded, freeing the manifest it points to. + * + * @author Andrew Bettison + */ +static struct rhizome_fetch_candidate *rhizome_fetch_insert(struct rhizome_fetch_queue *q, int i) +{ + struct rhizome_fetch_candidate * const c = &q->candidate_queue[i]; + struct rhizome_fetch_candidate * e = &q->candidate_queue[q->candidate_queue_size - 1]; + assert(i >= 0 && i < q->candidate_queue_size); + assert(i == 0 || c[-1].manifest); + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("insert queue[%d] candidate[%d]", q - rhizome_fetch_queues, i); + if (e->manifest) // queue is full + rhizome_manifest_free(e->manifest); + else + while (e > c && !e[-1].manifest) + --e; + for (; e > c; --e) + e[0] = e[-1]; + assert(e == c); + c->manifest = NULL; + return c; +} - There are three main cases that can occur here: +/* Remove the given candidate from a given queue. If the element points to a manifest structure, + * then frees the manifest. All succeeding candidates are copied forward in the queue to close up + * the gap, leaving an empty element at the tail of the queue. + * + * @author Andrew Bettison + */ +static void rhizome_fetch_unqueue(struct rhizome_fetch_queue *q, int i) +{ + assert(i >= 0 && i < q->candidate_queue_size); + struct rhizome_fetch_candidate *c = &q->candidate_queue[i]; + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("unqueue queue[%d] candidate[%d] manifest=%p", q - rhizome_fetch_queues, i, c->manifest); + if (c->manifest) { + rhizome_manifest_free(c->manifest); + c->manifest = NULL; + } + struct rhizome_fetch_candidate *e = &q->candidate_queue[q->candidate_queue_size - 1]; + for (; c < e && c[1].manifest; ++c) + c[0] = c[1]; + c->manifest = NULL; +} - 1. The manifest has no associated file (filesize=0); - 2. The associated file is already in our database; or - 3. The associated file is not already in our database, and so we need - to fetch it before we can import it. - - Cases (1) and (2) are more or less identical, and all we need to do is to - import the manifest into the database. - - Case (3) requires that we fetch the associated file. - - This is where life gets interesting. - - First, we need to make sure that we can free up enough space in the database - for the file. - - Second, we need to work out how we are going to get the file. - If we are on an IPv4 wifi network, then HTTP is probably the way to go. - If we are not on an IPv4 wifi network, then HTTP is not an option, and we need - to use a Rhizome/Overlay protocol to fetch it. It might even be HTTP over MDP - (Serval Mesh Datagram Protocol) or MTCP (Serval Mesh Transmission Control Protocol - -- yet to be specified). - - For efficiency, the MDP transfer protocol should allow multiple listeners to - receive the data. In contrast, it would be nice to have the data auth-crypted, if - only to deal with packet errors (but also naughty people who might want to mess - with the transfer. - - For HTTP over IPv4, the IPv4 address and port number of the sender is sent as part of the - advertisement. -*/ +/* Return true if there are any active fetches currently in progress. + * + * @author Andrew Bettison + */ +int rhizome_any_fetch_active() +{ + int i; + for (i = 0; i < NQUEUES; ++i) + if (rhizome_fetch_queues[i].active.state != RHIZOME_FETCH_FREE) + return 1; + return 0; +} /* As defined below uses 64KB */ #define RHIZOME_VERSION_CACHE_NYBLS 2 /* 256=2^8=2nybls */ #define RHIZOME_VERSION_CACHE_SHIFT 1 #define RHIZOME_VERSION_CACHE_SIZE 128 #define RHIZOME_VERSION_CACHE_ASSOCIATIVITY 16 -typedef struct rhizome_manifest_version_cache_slot { + +struct rhizome_manifest_version_cache_slot { unsigned char idprefix[24]; long long version; -} rhizome_manifest_version_cache_slot; -rhizome_manifest_version_cache_slot rhizome_manifest_version_cache -[RHIZOME_VERSION_CACHE_SIZE][RHIZOME_VERSION_CACHE_ASSOCIATIVITY]; +}; + +struct rhizome_manifest_version_cache_slot rhizome_manifest_version_cache[RHIZOME_VERSION_CACHE_SIZE][RHIZOME_VERSION_CACHE_ASSOCIATIVITY]; int rhizome_manifest_version_cache_store(rhizome_manifest *m) { @@ -148,7 +219,7 @@ int rhizome_manifest_version_cache_store(rhizome_manifest *m) bin=bin>>RHIZOME_VERSION_CACHE_SHIFT; slot=random()%RHIZOME_VERSION_CACHE_ASSOCIATIVITY; - rhizome_manifest_version_cache_slot *entry + struct rhizome_manifest_version_cache_slot *entry =&rhizome_manifest_version_cache[bin][slot]; unsigned long long manifest_version = rhizome_manifest_get_ll(m,"version"); @@ -195,7 +266,7 @@ int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) for(slot=0;slotversion=stored_version; for(i=0;i<24;i++) { @@ -315,8 +385,7 @@ typedef struct ignored_manifest_cache { a collision is exceedingly remote */ ignored_manifest_cache ignored; -int rhizome_ignore_manifest_check(rhizome_manifest *m, - struct sockaddr_in *peerip) +int rhizome_ignore_manifest_check(rhizome_manifest *m, const struct sockaddr_in *peerip) { int bin = m->cryptoSignPublic[0]>>(8-IGNORED_BIN_BITS); int slot; @@ -335,8 +404,7 @@ int rhizome_ignore_manifest_check(rhizome_manifest *m, return 0; } -int rhizome_queue_ignore_manifest(rhizome_manifest *m, - struct sockaddr_in *peerip,int timeout) +int rhizome_queue_ignore_manifest(rhizome_manifest *m, const struct sockaddr_in *peerip, int timeout) { /* The supplied manifest from a given IP has errors, so remember that it isn't worth considering */ @@ -362,53 +430,7 @@ int rhizome_queue_ignore_manifest(rhizome_manifest *m, } -typedef struct rhizome_candidates { - rhizome_manifest *manifest; - struct sockaddr_in peer; - long long size; - /* XXX Need group memberships/priority level here */ - int priority; -} rhizome_candidates; - -rhizome_candidates candidates[MAX_CANDIDATES]; -int candidate_count=0; - -/* sort indicated candidate from starting position down - (or up) */ -int rhizome_position_candidate(int position) -{ - while(position=0) { - rhizome_candidates *c1=&candidates[position]; - rhizome_candidates *c2=&candidates[position+1]; - if (c1->priority>c2->priority - ||(c1->priority==c2->priority - &&c1->size>c2->size)) - { - rhizome_candidates c=*c1; - *c1=*c2; - *c2=c; - position++; - } - else { - /* doesn't need moving down, but does it need moving up? */ - if (!position) return 0; - rhizome_candidates *c0=&candidates[position-1]; - if (c1->prioritypriority - ||(c1->priority==c0->priority - &&c1->sizesize)) - { - rhizome_candidates c=*c1; - *c1=*c2; - *c2=c; - position--; - } - else return 0; - } - } - return 0; -} - -void rhizome_import_received_bundle(struct rhizome_manifest *m) +static int rhizome_import_received_bundle(struct rhizome_manifest *m) { m->finalised = 1; m->manifest_bytes = m->manifest_all_bytes; // store the signatures too @@ -416,19 +438,340 @@ void rhizome_import_received_bundle(struct rhizome_manifest *m) DEBUGF("manifest len=%d has %d signatories", m->manifest_bytes, m->sig_count); dump("manifest", m->manifestdata, m->manifest_all_bytes); } - rhizome_bundle_import(m, m->ttl - 1 /* TTL */); + return rhizome_bundle_import(m, m->ttl - 1 /* TTL */); } -/* Verifies manifests as late as possible to avoid wasting time. */ -int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip) +static int schedule_fetch(struct rhizome_fetch_slot *slot) +{ + int sock = -1; + FILE *file = NULL; + /* TODO Don't forget to implement resume */ + /* TODO We should stream file straight into the database */ + if (create_rhizome_import_dir() == -1) + goto bail; + if ((file = fopen(slot->filename, "w")) == NULL) { + WHYF_perror("fopen(`%s`, \"w\")", slot->filename); + goto bail; + } + if (slot->peer.sin_family == AF_INET) { + /* Transfer via HTTP over IPv4 */ + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + WHY_perror("socket"); + goto bail; + } + if (set_nonblock(sock) == -1) + goto bail; + char buf[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &slot->peer.sin_addr, buf, sizeof buf) == NULL) { + buf[0] = '*'; + buf[1] = '\0'; + } + if (connect(sock, (struct sockaddr*)&slot->peer, sizeof slot->peer) == -1) { + if (errno == EINPROGRESS) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("connect() returned EINPROGRESS"); + } else { + WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(slot->peer.sin_port)); + goto bail; + } + } + INFOF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s", + slot->peer.sin_family, buf, ntohs(slot->peer.sin_port), alloca_str_toprint(slot->request) + ); + slot->alarm.poll.fd = sock; + slot->request_ofs = 0; + slot->state = RHIZOME_FETCH_CONNECTING; + slot->file = file; + slot->file_len = -1; + slot->file_ofs = 0; + /* Watch for activity on the socket */ + slot->alarm.function = rhizome_fetch_poll; + fetch_stats.name = "rhizome_fetch_poll"; + slot->alarm.stats = &fetch_stats; + slot->alarm.poll.events = POLLIN|POLLOUT; + watch(&slot->alarm); + /* And schedule a timeout alarm */ + slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + return 0; + } else { + /* TODO: Fetch via overlay */ + WHY("Rhizome fetching via overlay not implemented"); + } +bail: + if (sock != -1) + close(sock); + if (file != NULL) { + fclose(file); + unlink(slot->filename); + } + return -1; +} + +/* Start fetching a bundle's payload ready for importing. + * + * Three main cases that can occur here: + * 1) The manifest has a nil payload (filesize=0); + * 2) The payload is already in the database; or + * 3) The payload is not in the database. + * + * Cases (1) and (2) are more or less identical: the bundle can be imported into the database + * immediately. Case (3) requires the payload to be fetched from a remote node. + * + * First, obtain enough space in the database for the file. + * + * Second, work out how we are going to get the file. + * - On an IPv4 WiFi network, HTTP can be used. The IP address and port number are sent in the + * bundle advertisement packet. + * - On a non-IPv4 WiFi network, HTTP is not an option, so MDP must be used. + * + * For efficiency, the MDP transfer protocol could allow multiple listeners to receive the payload + * by eavesdropping on the transfer. In contrast, sending the payload auth-crypted would detect + * packet errors and hostile parties trying to inject false data into the transfer. + * + * Returns STARTED (0) if the fetch was started. + * Returns IMPORTED if a fetch was not started because the payload is nil or already in the + * Rhizome store, so the import was performed instead. + * Returns SAMEPAYLOAD if a fetch of the same payload (file ID) is already active. + * Returns SUPERSEDED if the fetch was not started because a newer version of the same bundle is + * already present. + * Returns SAMEBUNDLE if a fetch of the same bundle is already active. + * Returns OLDERBUNDLE if a fetch of an older version of the same bundle is already active. + * Returns NEWERBUNDLE if a fetch of a newer version of the same bundle is already active. + * Returns SLOTBUSY if the given slot is currently being used for another fetch. + * Returns -1 on error. + * + * In the STARTED case, the caller should not free the manifest because the fetch slot now has a + * copy of the pointer, and the manifest will be freed once the fetch finishes or is terminated. In + * all other cases, the caller is responsible for freeing the manifest. + * + * @author Andrew Bettison + */ +static enum rhizome_start_fetch_result +rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip) +{ + if (slot->state != RHIZOME_FETCH_FREE) + return SLOTBUSY; + + const char *bid = alloca_tohex_bid(m->cryptoSignPublic); + + /* Do the quick rejection tests first, before the more expensive ones, + like querying the database for manifests. + + We probably need a cache of recently rejected manifestid:versionid + pairs so that we can avoid database lookups in most cases. Probably + the first 64bits of manifestid is sufficient to make it resistant to + collission attacks, but using 128bits or the full 256 bits would be safer. + Let's make the cache use 256 bit (32byte) entries for power of two + efficiency, and so use the last 64bits for version id, thus using 192 bits + for collision avoidance --- probably sufficient for many years yet (from + time of writing in 2012). We get a little more than 192 bits by using + the cache slot number to implicitly store the first bits. + */ + + if (1||(debug & DEBUG_RHIZOME_RX)) + DEBUGF("Fetching bundle slot=%d bid=%s version=%lld size=%lld peerip=%s", + slot - &rhizome_fetch_queues[0].active, + bid, + m->version, + m->fileLength, + alloca_sockaddr(peerip) + ); + + // If the payload is empty, no need to fetch, so import now. + if (m->fileLength == 0) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" manifest fetch not started -- nil payload, so importing instead"); + if (rhizome_import_received_bundle(m) == -1) + return WHY("bundle import failed"); + return IMPORTED; + } + + // If we already have this version or newer, do not fetch. + if (rhizome_manifest_version_cache_lookup(m)) { + if (debug & DEBUG_RHIZOME_RX) + DEBUG(" fetch not started -- already have that version or newer"); + return SUPERSEDED; + } + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" is new"); + + /* Don't fetch if already in progress. If a fetch of an older version is already in progress, + * then this logic will let it run to completion before the fetch of the newer version is queued. + * This avoids the problem of indefinite postponement of fetching if new versions are constantly + * being published faster than we can fetch them. + */ + int i; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_slot *as = &rhizome_fetch_queues[i].active; + const rhizome_manifest *am = as->manifest; + if (as->state != RHIZOME_FETCH_FREE && memcmp(m->cryptoSignPublic, am->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) { + if (am->version < m->version) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" fetch already in progress -- older version"); + return OLDERBUNDLE; + } else if (am->version > m->version) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" fetch already in progress -- newer version"); + return NEWERBUNDLE; + } else { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" fetch already in progress -- same version"); + return SAMEBUNDLE; + } + } + } + + if (!m->fileHashedP) + return WHY("Manifest missing filehash"); + + // If the payload is already available, no need to fetch, so import now. + long long gotfile = 0; + if (sqlite_exec_int64(&gotfile, "SELECT COUNT(*) FROM FILES WHERE ID='%s' and datavalid=1;", m->fileHexHash) != 1) + return WHY("select failed"); + if (gotfile) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" fetch not started - payload already present, so importing instead"); + if (rhizome_bundle_import(m, m->ttl-1) == -1) + return WHY("bundle import failed"); + return IMPORTED; + } + + // Fetch the file, unless already queued. + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_slot *s = &rhizome_fetch_queues[i].active; + const rhizome_manifest *sm = s->manifest; + if (s->state != RHIZOME_FETCH_FREE && strcasecmp(m->fileHexHash, sm->fileHexHash) == 0) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" fetch already in progress, slot=%d filehash=%s", i, m->fileHexHash); + return SAMEPAYLOAD; + } + } + + // Start the fetch. + //dump("peerip", peerip, sizeof *peerip); + slot->peer = *peerip; + strbuf r = strbuf_local(slot->request, sizeof slot->request); + strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash); + if (strbuf_overrun(r)) + return WHY("request overrun"); + slot->request_len = strbuf_len(r); + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "payload.%s", bid)) + return -1; + m->dataFileName = strdup(slot->filename); + m->dataFileUnlinkOnFree = 0; + slot->manifest = m; + if (schedule_fetch(slot) == -1) { + slot->filename[0] = '\0'; + return -1; + } + if (debug & DEBUG_RHIZOME_RX) + DEBUGF(" started fetch into %s, slot=%d filehash=%s", slot->manifest->dataFileName, slot - &rhizome_fetch_queues[0].active, m->fileHexHash); + return STARTED; +} + +/* Returns STARTED (0) if the fetch was started. + * Returns SLOTBUSY if there is no available fetch slot for performing the fetch. + * Returns -1 on error. + */ +enum rhizome_start_fetch_result +rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length) +{ + assert(peerip); + struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES); + if (slot == NULL) + return SLOTBUSY; + slot->peer = *peerip; + slot->manifest = NULL; + strbuf r = strbuf_local(slot->request, sizeof slot->request); + strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length)); + if (strbuf_overrun(r)) + return WHY("request overrun"); + slot->request_len = strbuf_len(r); + if (!FORM_RHIZOME_IMPORT_PATH(slot->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) + return -1; + if (schedule_fetch(slot) == -1) { + slot->filename[0] = '\0'; + return -1; + } + return STARTED; +} + +/* Activate the next fetch for the given slot. This takes the next job from the head of the slot's + * own queue. If there is none, then takes jobs from other queues. + * + * @author Andrew Bettison + */ +static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot) +{ + struct rhizome_fetch_queue *q; + for (q = (struct rhizome_fetch_queue *) slot; q >= rhizome_fetch_queues; --q) { + int i = 0; + struct rhizome_fetch_candidate *c; + while (i < q->candidate_queue_size && (c = &q->candidate_queue[i])->manifest) { + int result = rhizome_fetch(slot, c->manifest, &c->peer); + switch (result) { + case SLOTBUSY: + return; + case STARTED: + c->manifest = NULL; + rhizome_fetch_unqueue(q, i); + return; + case IMPORTED: + case SAMEBUNDLE: + case SAMEPAYLOAD: + case SUPERSEDED: + case NEWERBUNDLE: + // Discard the candidate fetch and loop to try the next in queue. + rhizome_fetch_unqueue(q, i); + break; + case OLDERBUNDLE: + // Do not un-queue, so that when the fetch of the older bundle finishes, we will start + // fetching a newer one. + ++i; + break; + } + } + } +} + +/* Called soon after any fetch candidate is queued, to start any queued fetches. + * + * @author Andrew Bettison + */ +static void rhizome_start_next_queued_fetches(struct sched_ent *alarm) +{ + int i; + for (i = 0; i < NQUEUES; ++i) + rhizome_start_next_queued_fetch(&rhizome_fetch_queues[i].active); +} + +/* Queue a fetch for the payload of the given manifest. If 'peerip' is not NULL, then it is used as + * the port and IP address of an HTTP server from which the fetch is performed. Otherwise the fetch + * is performed over MDP. + * + * If the fetch cannot be queued for any reason (error, queue full, no suitable queue) then the + * manifest is freed and returns -1. Otherwise, the pointer to the manifest is stored in the queue + * entry and the manifest is freed when the fetch has completed or is abandoned for any reason. + * + * Verifies manifests as late as possible to avoid wasting time. + * + * This function does not activate any fetches, it just queues the fetch candidates and sets an + * alarm that will trip as soon as there is no pending I/O, or at worst, in 500ms. This allows a + * full packet's worth of Rhizome advertisements to be processed, queued and prioritised before + * deciding which fetches to perform first. + * + * @author Andrew Bettison + */ +int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip) { IN(); - /* must free manifest when done with it */ - char *id = rhizome_manifest_get(m, "id", NULL, 0); + const char *bid = alloca_tohex_bid(m->cryptoSignPublic); int priority=100; /* normal priority */ if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Considering manifest import bid=%s version=%lld size=%lld priority=%d:", id, m->version, m->fileLength, priority); + DEBUGF("Considering import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority); if (rhizome_manifest_version_cache_lookup(m)) { if (debug & DEBUG_RHIZOME_RX) @@ -439,7 +782,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_i if (debug & DEBUG_RHIZOME_RX) { long long stored_version; - if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'",id) > 0) + if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'", bid) > 0) DEBUGF(" is new (have version %lld)", stored_version); } @@ -455,430 +798,178 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, struct sockaddr_i RETURN(0); } - /* work out where to put it in the list */ - int i; - for(i=0;i= m->version) { - /* this version is older than the one in the list, so don't list this one */ + // Find the proper queue for the payload. If there is none suitable, it is an error. + struct rhizome_fetch_queue *qi = rhizome_find_queue(m->fileLength); + if (!qi) { + WHYF("No suitable fetch queue for bundle size=%lld", m->fileLength); + rhizome_manifest_free(m); + RETURN(-1); + } + + // Search all the queues for the same manifest (it could be in any queue because its payload size + // may have changed between versions.) If a newer or the same version is already queued, then + // ignore this one. Otherwise, unqueue all older candidates. + int ci = -1; + int i, j; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; + for (j = 0; j < q->candidate_queue_size; ) { + struct rhizome_fetch_candidate *c = &q->candidate_queue[j]; + if (c->manifest) { + if (memcmp(m->cryptoSignPublic, c->manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) { + if (c->manifest->version >= m->version) { rhizome_manifest_free(m); RETURN(0); - } else { - /* replace listed version with this newer version */ - if (rhizome_manifest_verify(m)) { - WHY("Error verifying manifest when considering queuing for import"); - /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m,peerip,60000); - rhizome_manifest_free(m); - RETURN(-1); - } - - rhizome_manifest_free(candidates[i].manifest); - candidates[i].manifest=m; - /* update position in list */ - rhizome_position_candidate(i); - RETURN(0); } + if (!m->selfSigned && rhizome_manifest_verify(m)) { + WHY("Error verifying manifest when considering queuing for import"); + /* Don't waste time looking at this manifest again for a while */ + rhizome_queue_ignore_manifest(m, peerip, 60000); + rhizome_manifest_free(m); + RETURN(-1); + } + rhizome_fetch_unqueue(q, j); + } else { + if (ci == -1 && q == qi && c->priority < priority) + ci = j; + ++j; } - - /* if we have a higher priority file than the one at this - point in the list, stop, and we will shuffle the rest of - the list down. */ - if (candidates[i].priority>priority - ||(candidates[i].priority==priority - &&candidates[i].size>m->fileLength)) + } else { + if (ci == -1 && q == qi) + ci = j; break; + } } - if (i>=MAX_CANDIDATES) { - /* our list is already full of higher-priority items */ + } + // No duplicate was found, so if no free queue place was found either then bail out. + if (ci == -1) { rhizome_manifest_free(m); - RETURN(-1); + RETURN(1); } - if (rhizome_manifest_verify(m)) { + if (!m->selfSigned && rhizome_manifest_verify(m)) { WHY("Error verifying manifest when considering queuing for import"); /* Don't waste time looking at this manifest again for a while */ - rhizome_queue_ignore_manifest(m,peerip,60000); + rhizome_queue_ignore_manifest(m, peerip, 60000); rhizome_manifest_free(m); RETURN(-1); } - if (candidate_count==MAX_CANDIDATES) { - /* release manifest structure for whoever we are bumping from the list */ - rhizome_manifest_free(candidates[MAX_CANDIDATES-1].manifest); - candidates[MAX_CANDIDATES-1].manifest=NULL; - } else candidate_count++; - /* shuffle down */ - int bytes=(candidate_count-(i+1))*sizeof(rhizome_candidates); - if (0) DEBUGF("Moving slot %d to slot %d (%d bytes = %d slots)", - i,i+1,bytes,bytes/sizeof(rhizome_candidates)); - bcopy(&candidates[i], - &candidates[i+1], - bytes); - /* put new candidate in */ - candidates[i].manifest=m; - candidates[i].size=m->fileLength; - candidates[i].priority=priority; - candidates[i].peer=*peerip; + struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, j); + c->manifest = m; + c->peer = *peerip; + c->priority = priority; - int j; - if (1) { - DEBUG("Rhizome priorities fetch list now:"); - for(j=0;jcandidate_queue_size; ++j) { + struct rhizome_fetch_candidate *c = &q->candidate_queue[j]; + if (!c->manifest) + break; + DEBUGF("%d:%d manifest=%p bid=%s priority=%d size=%lld", i, j, + c->manifest, + alloca_tohex_bid(c->manifest->cryptoSignPublic), + c->priority, + (long long) c->manifest->fileLength + ); + } + } + } + + if (!is_scheduled(&sched_activate)) { + sched_activate.function = rhizome_start_next_queued_fetches; + sched_activate.stats = NULL; + sched_activate.alarm = gettime_ms() + rhizome_fetch_delay_ms(); + sched_activate.deadline = sched_activate.alarm + 5000; + schedule(&sched_activate); } RETURN(0); } -void rhizome_enqueue_suggestions(struct sched_ent *alarm) +static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { - int i; - for (i = 0; i < candidate_count; ++i) { - int manifest_kept = 0; - int result = rhizome_queue_manifest_import(candidates[i].manifest, &candidates[i].peer, &manifest_kept); - if (result == QBUSY) - break; - if (!manifest_kept) { - rhizome_manifest_free(candidates[i].manifest); - candidates[i].manifest = NULL; - } - } - if (i) { - /* now shuffle up */ - int bytes=(candidate_count-i)*sizeof(rhizome_candidates); - if (0) DEBUGF("Moving slot %d to slot 0 (%d bytes = %d slots)", i,bytes,bytes/sizeof(rhizome_candidates)); - bcopy(&candidates[i],&candidates[0],bytes); - candidate_count-=i; - } - if (alarm) { - alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms; - alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms*3; - schedule(alarm); - } - return; -} - -static int rhizome_queue_import(rhizome_file_fetch_record *q) -{ - int sock = -1; - FILE *file = NULL; - /* TODO Don't forget to implement resume */ - /* TODO We should stream file straight into the database */ - if (create_rhizome_import_dir() == -1) - goto bail; - if ((file = fopen(q->filename, "w")) == NULL) { - WHYF_perror("fopen(`%s`, \"w\")", q->filename); - goto bail; - } - if (q->peer.sin_family == AF_INET) { - /* Transfer via HTTP over IPv4 */ - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - WHY_perror("socket"); - goto bail; - } - if (set_nonblock(sock) == -1) - goto bail; - char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &q->peer.sin_addr, buf, sizeof buf) == NULL) { - buf[0] = '*'; - buf[1] = '\0'; - } - if (connect(sock, (struct sockaddr*)&q->peer, sizeof q->peer) == -1) { - if (errno == EINPROGRESS) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("connect() returned EINPROGRESS"); - } else { - WHYF_perror("connect(%d, %s:%u)", sock, buf, ntohs(q->peer.sin_port)); - goto bail; - } - } - INFOF("RHIZOME HTTP REQUEST family=%u addr=%s port=%u %s", - q->peer.sin_family, buf, ntohs(q->peer.sin_port), alloca_str_toprint(q->request) - ); - q->alarm.poll.fd = sock; - q->request_ofs=0; - q->state=RHIZOME_FETCH_CONNECTING; - q->file = file; - q->file_len=-1; - q->file_ofs=0; - /* Watch for activity on the socket */ - q->alarm.function=rhizome_fetch_poll; - fetch_stats.name="rhizome_fetch_poll"; - q->alarm.stats=&fetch_stats; - q->alarm.poll.events=POLLIN|POLLOUT; - watch(&q->alarm); - /* And schedule a timeout alarm */ - q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); - return 0; - } else { - /* TODO: Fetch via overlay */ - WHY("Rhizome fetching via overlay not implemented"); - } -bail: - if (sock != -1) - close(sock); - if (file != NULL) { - fclose(file); - unlink(q->filename); - } - return -1; -} - -/* Returns QUEUED (0) if the fetch was queued. - * Returns QSAMEPAYLOAD if a fetch of the same payload (file ID) is already queued. - * Returns QDATED if the fetch was not queued because a newer version of the same bundle is already - * present. - * Returns QBUSY if the fetch was not queued because the queue is full. - * Returns QSAME if the same fetch is already queued. - * Returns QOLDER if a fetch of an older version of the same bundle is already queued. - * Returns QNEWER if a fetch of a newer version of the same bundle is already queued. - * Returns QIMPORTED if a fetch was not queued because the payload is nil or already held, so the - * import was performed instead. - * Returns -1 on error. - */ -int rhizome_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip, int *manifest_kept) -{ - *manifest_kept = 0; - - const char *bid = alloca_tohex_bid(m->cryptoSignPublic); - long long filesize = rhizome_manifest_get_ll(m, "filesize"); - - /* Do the quick rejection tests first, before the more expensive ones, - like querying the database for manifests. - - We probably need a cache of recently rejected manifestid:versionid - pairs so that we can avoid database lookups in most cases. Probably - the first 64bits of manifestid is sufficient to make it resistant to - collission attacks, but using 128bits or the full 256 bits would be safer. - Let's make the cache use 256 bit (32byte) entries for power of two - efficiency, and so use the last 64bits for version id, thus using 192 bits - for collission avoidance --- probably sufficient for many years yet (from - time of writing in 2012). We get a little more than 192 bits by using - the cache slot number to implicitly store the first bits. - */ - - if (1||(debug & DEBUG_RHIZOME_RX)) - DEBUGF("Fetching bundle bid=%s version=%lld size=%lld:", bid, m->version, filesize); - - // If the payload is empty, no need to fetch, so import now. - if (filesize == 0) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" manifest fetch not started -- nil payload, so importing instead"); - if (rhizome_bundle_import(m, m->ttl-1) == -1) - return WHY("bundle import failed"); - return QIMPORTED; - } - - // Ensure there is a slot available before doing more expensive checks. - rhizome_file_fetch_record *q = rhizome_find_import_slot(); - if (q == NULL) { - if (debug & DEBUG_RHIZOME_RX) - DEBUG(" fetch not started - all slots full"); - return QBUSY; - } - - // If we already have this version or newer, do not fetch. - if (rhizome_manifest_version_cache_lookup(m)) { - if (debug & DEBUG_RHIZOME_RX) - DEBUG(" fetch not started -- already have that version or newer"); - return QDATED; - } if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" is new"); - - /* Don't fetch if already in progress. If a fetch of an older version is already in progress, - * then this logic will let it run to completion before the fetch of the newer version is queued. - * This avoids the problem of indefinite postponement of fetching if new versions are constantly - * being published faster than we can fetch them. - */ - int i; - for (i = 0; i < MAX_QUEUED_FILES; ++i) { - rhizome_manifest *qm = file_fetch_queue[i].manifest; - if (qm && memcmp(m->cryptoSignPublic, qm->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) { - if (qm->version < m->version) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" fetch already in progress -- older version"); - return QOLDER; - } else if (qm->version > m->version) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" fetch already in progress -- newer version"); - return QNEWER; - } else { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" fetch already in progress -- same version"); - return QSAME; - } - } - } - - if (!rhizome_manifest_get(m, "filehash", m->fileHexHash, sizeof m->fileHexHash)) - return WHY("Manifest missing filehash"); - if (!rhizome_str_is_file_hash(m->fileHexHash)) - return WHYF("Invalid file hash: %s", m->fileHexHash); - str_toupper_inplace(m->fileHexHash); - m->fileHashedP = 1; - - // If the payload is already available, no need to fetch, so import now. - long long gotfile = 0; - if (sqlite_exec_int64(&gotfile, "SELECT COUNT(*) FROM FILES WHERE ID='%s' and datavalid=1;", m->fileHexHash) != 1) - return WHY("select failed"); - if (gotfile) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" fetch not started - payload already present, so importing instead"); - if (rhizome_bundle_import(m, m->ttl-1) == -1) - return WHY("bundle import failed"); - return QIMPORTED; - } - - // Fetch the file, unless already queued. - for (i = 0; i < MAX_QUEUED_FILES; ++i) { - if (file_fetch_queue[i].manifest && strcasecmp(m->fileHexHash, file_fetch_queue[i].manifest->fileHexHash) == 0) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" fetch already in progress, slot=%d filehash=%s", i, m->fileHexHash); - return QSAMEPAYLOAD; - } - } - - // Queue the fetch. - //dump("peerip", peerip, sizeof *peerip); - q->peer = *peerip; - strbuf r = strbuf_local(q->request, sizeof q->request); - strbuf_sprintf(r, "GET /rhizome/file/%s HTTP/1.0\r\n\r\n", m->fileHexHash); - if (strbuf_overrun(r)) - return WHY("request overrun"); - q->request_len = strbuf_len(r); - if (!FORM_RHIZOME_IMPORT_PATH(q->filename, "payload.%s", bid)) - return -1; - m->dataFileName = strdup(q->filename); - m->dataFileUnlinkOnFree = 0; - q->manifest = m; - if (rhizome_queue_import(q) == -1) { - q->filename[0] = '\0'; - return -1; - } - *manifest_kept = 1; - if (debug & DEBUG_RHIZOME_RX) - DEBUGF(" started fetch into %s, slot=%d filehash=%s", q->manifest->dataFileName, q - file_fetch_queue, m->fileHexHash); - return QUEUED; -} - -int rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char *prefix, size_t prefix_length) -{ - assert(peerip); - rhizome_file_fetch_record *q = rhizome_find_import_slot(); - if (q == NULL) - return QBUSY; - q->peer = *peerip; - q->manifest = NULL; - strbuf r = strbuf_local(q->request, sizeof q->request); - strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix, prefix_length)); - if (strbuf_overrun(r)) - return WHY("request overrun"); - q->request_len = strbuf_len(r); - if (!FORM_RHIZOME_IMPORT_PATH(q->filename, "manifest.%s", alloca_tohex(prefix, prefix_length))) - return -1; - if (rhizome_queue_import(q) == -1) { - q->filename[0] = '\0'; - return -1; - } - return QUEUED; -} - -int rhizome_fetch_close(rhizome_file_fetch_record *q) -{ - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Release Rhizome fetch slot=%d", q - file_fetch_queue); - assert(q->state != RHIZOME_FETCH_FREE); + DEBUGF("Close Rhizome fetch slot=%d", slot - &rhizome_fetch_queues[0].active); + assert(slot->state != RHIZOME_FETCH_FREE); /* close socket and stop watching it */ - unwatch(&q->alarm); - unschedule(&q->alarm); - close(q->alarm.poll.fd); - q->alarm.poll.fd = -1; + unwatch(&slot->alarm); + unschedule(&slot->alarm); + close(slot->alarm.poll.fd); + slot->alarm.poll.fd = -1; /* Free ephemeral data */ - if (q->file) - fclose(q->file); - q->file = NULL; - if (q->manifest) - rhizome_manifest_free(q->manifest); - q->manifest = NULL; - if (q->filename[0]) - unlink(q->filename); - q->filename[0] = '\0'; + if (slot->file) + fclose(slot->file); + slot->file = NULL; + if (slot->manifest) + rhizome_manifest_free(slot->manifest); + slot->manifest = NULL; + if (slot->filename[0]) + unlink(slot->filename); + slot->filename[0] = '\0'; - // Release the import queue slot. - q->state = RHIZOME_FETCH_FREE; + // Release the fetch slot. + slot->state = RHIZOME_FETCH_FREE; - // Fill the slot with the next suggestion. - rhizome_enqueue_suggestions(NULL); + // Activate the next queued fetch that is eligible for this slot. Try starting candidates from + // all queues with the same or smaller size thresholds until the slot is taken. + rhizome_start_next_queued_fetch(slot); return 0; } -void rhizome_fetch_write(rhizome_file_fetch_record *q) +void rhizome_fetch_write(struct rhizome_fetch_slot *slot) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("write_nonblock(%d, %s)", q->alarm.poll.fd, alloca_toprint(-1, &q->request[q->request_ofs], q->request_len-q->request_ofs)); - int bytes = write_nonblock(q->alarm.poll.fd, &q->request[q->request_ofs], q->request_len-q->request_ofs); + DEBUGF("write_nonblock(%d, %s)", slot->alarm.poll.fd, alloca_toprint(-1, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs)); + int bytes = write_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs); if (bytes == -1) { WHY("Got error while sending HTTP request. Closing."); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); } else { // reset timeout - unschedule(&q->alarm); - q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); - q->request_ofs+=bytes; - if (q->request_ofs>=q->request_len) { + unschedule(&slot->alarm); + slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + slot->request_ofs+=bytes; + if (slot->request_ofs>=slot->request_len) { /* Sent all of request. Switch to listening for HTTP response headers. */ - q->request_len=0; q->request_ofs=0; - q->state=RHIZOME_FETCH_RXHTTPHEADERS; - q->alarm.poll.events=POLLIN; - watch(&q->alarm); - }else if(q->state==RHIZOME_FETCH_CONNECTING) - q->state = RHIZOME_FETCH_SENDINGHTTPREQUEST; + slot->request_len=0; slot->request_ofs=0; + slot->state=RHIZOME_FETCH_RXHTTPHEADERS; + slot->alarm.poll.events=POLLIN; + watch(&slot->alarm); + }else if(slot->state==RHIZOME_FETCH_CONNECTING) + slot->state = RHIZOME_FETCH_SENDINGHTTPREQUEST; } } -void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes) +void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) { - if (bytes>(q->file_len-q->file_ofs)) - bytes=q->file_len-q->file_ofs; - if (fwrite(buffer,bytes,1,q->file) != 1) { + if (bytes>(slot->file_len-slot->file_ofs)) + bytes=slot->file_len-slot->file_ofs; + if (fwrite(buffer,bytes,1,slot->file) != 1) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, q->file_ofs); - rhizome_fetch_close(q); + DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs); + rhizome_fetch_close(slot); return; } - q->file_ofs+=bytes; - if (q->file_ofs>=q->file_len) { + slot->file_ofs+=bytes; + if (slot->file_ofs>=slot->file_len) { /* got all of file */ if (debug & DEBUG_RHIZOME_RX) DEBUGF("Received all of file via rhizome -- now to import it"); - fclose(q->file); - q->file = NULL; - if (q->manifest) { + fclose(slot->file); + slot->file = NULL; + if (slot->manifest) { // Were fetching payload, now we have it. - rhizome_import_received_bundle(q->manifest); + rhizome_import_received_bundle(slot->manifest); } else { /* This was to fetch the manifest, so now fetch the file if needed */ DEBUGF("Received a manifest in response to supplying a manifest prefix."); @@ -886,56 +977,55 @@ void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes call schedule queued items. */ rhizome_manifest *m = rhizome_new_manifest(); if (m) { - if (rhizome_read_manifest_file(m, q->filename, 0) == -1) { - DEBUGF("Couldn't read manifest from %s",q->filename); + if (rhizome_read_manifest_file(m, slot->filename, 0) == -1) { + DEBUGF("Couldn't read manifest from %s",slot->filename); rhizome_manifest_free(m); } else { DEBUGF("All looks good for importing manifest id=%s", alloca_tohex_bid(m->cryptoSignPublic)); - dump("q->peer",&q->peer,sizeof(q->peer)); - rhizome_suggest_queue_manifest_import(m, &q->peer); - //rhizome_enqueue_suggestions(NULL); XXX Now called in rhizome_fetch_close() + dump("slot->peer",&slot->peer,sizeof(slot->peer)); + rhizome_suggest_queue_manifest_import(m, &slot->peer); } } } - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } // reset inactivity timeout - unschedule(&q->alarm); - q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm+RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); + unschedule(&slot->alarm); + slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); } void rhizome_fetch_poll(struct sched_ent *alarm) { - rhizome_file_fetch_record *q=(rhizome_file_fetch_record *)alarm; + struct rhizome_fetch_slot *slot = (struct rhizome_fetch_slot *) alarm; if (alarm->poll.revents & (POLLIN | POLLOUT)) { - switch(q->state) { + switch (slot->state) { case RHIZOME_FETCH_CONNECTING: case RHIZOME_FETCH_SENDINGHTTPREQUEST: - rhizome_fetch_write(q); + rhizome_fetch_write(slot); return; case RHIZOME_FETCH_RXFILE: { /* Keep reading until we have the promised amount of data */ char buffer[8192]; sigPipeFlag = 0; - int bytes = read_nonblock(q->alarm.poll.fd, buffer, sizeof buffer); + int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); /* If we got some data, see if we have found the end of the HTTP request */ if (bytes > 0) { - rhizome_write_content(q, buffer, bytes); + rhizome_write_content(slot, buffer, bytes); return; } else { if (debug & DEBUG_RHIZOME_RX) DEBUG("Empty read, closing connection"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } if (sigPipeFlag) { if (debug & DEBUG_RHIZOME_RX) DEBUG("Received SIGPIPE, closing connection"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } } @@ -943,47 +1033,47 @@ void rhizome_fetch_poll(struct sched_ent *alarm) case RHIZOME_FETCH_RXHTTPHEADERS: { /* Keep reading until we have two CR/LFs in a row */ sigPipeFlag = 0; - int bytes = read_nonblock(q->alarm.poll.fd, &q->request[q->request_len], 1024 - q->request_len - 1); + int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1); /* If we got some data, see if we have found the end of the HTTP reply */ if (bytes > 0) { // reset timeout - unschedule(&q->alarm); - q->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); - q->request_len += bytes; - if (http_header_complete(q->request, q->request_len, bytes)) { + unschedule(&slot->alarm); + slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; + slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&slot->alarm); + slot->request_len += bytes; + if (http_header_complete(slot->request, slot->request_len, bytes)) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Got HTTP reply: %s", alloca_toprint(160, q->request, q->request_len)); + DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len)); /* We have all the reply headers, so parse them, taking care of any following bytes of content. */ struct http_response_parts parts; - if (unpack_http_response(q->request, &parts) == -1) { + if (unpack_http_response(slot->request, &parts) == -1) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("Failed HTTP request: failed to unpack http response"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } if (parts.code != 200) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } if (parts.content_length == -1) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("Invalid HTTP reply: missing Content-Length header"); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } - q->file_len = parts.content_length; + slot->file_len = parts.content_length; /* We have all we need. The file is already open, so just write out any initial bytes of the body we read. */ - q->state = RHIZOME_FETCH_RXFILE; - int content_bytes = q->request + q->request_len - parts.content_start; + slot->state = RHIZOME_FETCH_RXFILE; + int content_bytes = slot->request + slot->request_len - parts.content_start; if (content_bytes > 0){ - rhizome_write_content(q, parts.content_start, content_bytes); + rhizome_write_content(slot, parts.content_start, content_bytes); return; } } @@ -992,19 +1082,17 @@ void rhizome_fetch_poll(struct sched_ent *alarm) default: if (debug & DEBUG_RHIZOME_RX) DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); return; } } } - if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){ // timeout or socket error, close the socket if (debug & DEBUG_RHIZOME_RX) DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR); - rhizome_fetch_close(q); + rhizome_fetch_close(slot); } - } /* diff --git a/serval.h b/serval.h index b3d2c493..223b3f7a 100644 --- a/serval.h +++ b/serval.h @@ -757,6 +757,7 @@ void sigIoHandler(int signal); int overlay_mdp_setup_sockets(); +int is_scheduled(const struct sched_ent *alarm); int _schedule(struct __sourceloc whence, struct sched_ent *alarm); int _unschedule(struct __sourceloc whence, struct sched_ent *alarm); int _watch(struct __sourceloc whence, struct sched_ent *alarm); @@ -770,7 +771,6 @@ int fd_poll(); void overlay_interface_discover(struct sched_ent *alarm); void overlay_dummy_poll(struct sched_ent *alarm); void overlay_route_tick(struct sched_ent *alarm); -void rhizome_enqueue_suggestions(struct sched_ent *alarm); void server_shutdown_check(struct sched_ent *alarm); void overlay_mdp_poll(struct sched_ent *alarm); void fd_periodicstats(struct sched_ent *alarm); diff --git a/strbuf_helpers.c b/strbuf_helpers.c index 9eace1e7..1fc480b4 100644 --- a/strbuf_helpers.c +++ b/strbuf_helpers.c @@ -22,6 +22,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include #include #include +#ifdef HAVE_NETINET_IN_H +#include +#endif static inline strbuf _toprint(strbuf sb, char c) { @@ -198,3 +201,71 @@ strbuf strbuf_append_exit_status(strbuf sb, int status) #endif return sb; } + +strbuf strbuf_append_sockaddr(strbuf sb, const struct sockaddr *addr) +{ + const char *fam = NULL; + switch (addr->sa_family) { + case AF_UNSPEC: fam = "AF_UNSPEC"; break; + case AF_UNIX: fam = "AF_UNIX"; break; + case AF_INET: fam = "AF_INET"; break; + case AF_AX25: fam = "AF_AX25"; break; + case AF_IPX: fam = "AF_IPX"; break; + case AF_APPLETALK: fam = "AF_APPLETALK"; break; + case AF_NETROM: fam = "AF_NETROM"; break; + case AF_BRIDGE: fam = "AF_BRIDGE"; break; + case AF_ATMPVC: fam = "AF_ATMPVC"; break; + case AF_X25: fam = "AF_X25"; break; + case AF_INET6: fam = "AF_INET6"; break; + case AF_ROSE: fam = "AF_ROSE"; break; + case AF_DECnet: fam = "AF_DECnet"; break; + case AF_NETBEUI: fam = "AF_NETBEUI"; break; + case AF_SECURITY: fam = "AF_SECURITY"; break; + case AF_KEY: fam = "AF_KEY"; break; + case AF_NETLINK: fam = "AF_NETLINK"; break; + case AF_PACKET: fam = "AF_PACKET"; break; + case AF_ASH: fam = "AF_ASH"; break; + case AF_ECONET: fam = "AF_ECONET"; break; + case AF_ATMSVC: fam = "AF_ATMSVC"; break; + case AF_SNA: fam = "AF_SNA"; break; + case AF_IRDA: fam = "AF_IRDA"; break; + case AF_PPPOX: fam = "AF_PPPOX"; break; + case AF_WANPIPE: fam = "AF_WANPIPE"; break; + case AF_LLC: fam = "AF_LLC"; break; + case AF_TIPC: fam = "AF_TIPC"; break; + case AF_BLUETOOTH: fam = "AF_BLUETOOTH"; break; + } + if (fam) + strbuf_puts(sb, fam); + else + strbuf_sprintf(sb, "[%d]", addr->sa_family); + switch (addr->sa_family) { + case AF_UNIX: + strbuf_putc(sb, ' '); + if (addr->sa_data[0]) + strbuf_toprint_quoted_len(sb, addr->sa_data, "\"\"", sizeof addr->sa_data); + else { + strbuf_puts(sb, "abstract "); + strbuf_toprint_quoted_len(sb, addr->sa_data, "\"\"", sizeof addr->sa_data); + } + break; + case AF_INET: { + const struct sockaddr_in *addr_in = (const struct sockaddr_in *) addr; + strbuf_sprintf(sb, " %u.%u.%u.%u:%u", + ((unsigned char *) &addr_in->sin_addr.s_addr)[0], + ((unsigned char *) &addr_in->sin_addr.s_addr)[1], + ((unsigned char *) &addr_in->sin_addr.s_addr)[2], + ((unsigned char *) &addr_in->sin_addr.s_addr)[3], + ntohs(addr_in->sin_port) + ); + } + break; + default: { + int i; + for (i = 0; i < sizeof addr->sa_data; ++i) + strbuf_sprintf(sb, " %02x", addr->sa_data[i]); + } + break; + } + return sb; +} diff --git a/strbuf_helpers.h b/strbuf_helpers.h index 498569ce..2b529c67 100644 --- a/strbuf_helpers.h +++ b/strbuf_helpers.h @@ -80,4 +80,10 @@ strbuf strbuf_append_shell_quotemeta(strbuf sb, const char *word); */ strbuf strbuf_append_exit_status(strbuf sb, int status); +/* Append a textual description of a struct sockaddr_in. + * @author Andrew Bettison + */ +struct sockaddr; +strbuf strbuf_append_sockaddr(strbuf sb, const struct sockaddr *); + #endif //__STRBUF_HELPERS_H__ diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index be1ebb4f..9314d935 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -44,7 +44,6 @@ configure_servald_server() { executeOk_servald config set server.respawn_on_signal off executeOk_servald config set mdp.wifi.tick_ms 500 executeOk_servald config set mdp.selfannounce.ticks_per_full_address 1 - executeOk_servald config set rhizome.fetch_interval_ms 100 } setup_curl_7() {