Issue #30, remove periodic Rhizome fetch alarm

Replace the main-loop scheduled periodic alarm with an "activate" alarm that is
scheduled whenever a fetch candidate is added to any queue, unless the alarm is
already scheduled.

Replace the "rhizome.fetch_interval_ms" config item with
"rhizome.fetch_delay_ms" [default 50], which is the number of milliseconds
between adding a fetch candidate and firing the "activate" alarm.  This allows
time for a few more Rhizome advertisment packets to arrive after the first one,
before deciding which fetches to start first.

Add new `is_scheduled()` alarm primitive.
This commit is contained in:
Andrew Bettison 2012-10-24 15:13:50 +10:30
parent 935a545ac7
commit 39fc4ce6de
7 changed files with 180 additions and 169 deletions

View File

@ -74,6 +74,11 @@ int deadline(struct sched_ent *alarm)
return 0;
}
int is_scheduled(const struct sched_ent *alarm)
{
return alarm->_next || alarm->_prev || alarm == next_alarm || alarm == next_deadline;
}
// add an alarm to the list of scheduled function calls.
// simply populate .alarm with the absolute time, and .function with the method to call.
// on calling .poll.revents will be zero.
@ -84,8 +89,8 @@ int _schedule(struct __sourceloc __whence, struct sched_ent *alarm)
struct sched_ent *node = next_alarm, *last = NULL;
if (alarm->_next || alarm->_prev || alarm==next_alarm || alarm==next_deadline)
FATAL("Attempted to schedule an alarm that is still scheduled.");
if (is_scheduled(alarm))
FATAL("Scheduling an alarm that is already scheduled");
if (!alarm->function)
return WHY("Can't schedule if you haven't set the function pointer");

View File

@ -84,10 +84,6 @@ int overlayServerMode()
send periodic traffic. This means we need to */
INFO("Running in overlay mode.");
/* Make sure rhizome configured settings are known. */
if (rhizome_fetch_interval_ms < 1)
rhizome_configure();
/* Get keyring available for use.
Required for MDP, and very soon as a complete replacement for the
HLR for DNA lookups, even in non-overlay mode. */
@ -165,11 +161,6 @@ schedule(&_sched_##X); }
// preload directory service information
directory_service_init();
/* Pick next rhizome files to grab every few seconds
from the priority list continuously being built from observed
bundle announcements */
SCHEDULE(rhizome_start_next_queued_fetches, rhizome_fetch_interval_ms, rhizome_fetch_interval_ms*3);
/* Periodically check for new interfaces */
SCHEDULE(overlay_interface_discover, 1, 100);

View File

@ -21,24 +21,31 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "rhizome.h"
#include <stdlib.h>
static int rhizome_enabled_flag = -1; // unknown
int rhizome_fetch_interval_ms = -1;
static int _rhizome_enabled = -1;
static int _rhizome_fetch_delay_ms = -1;
/* Configure rhizome.
@author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_configure()
{
rhizome_enabled_flag = confValueGetBoolean("rhizome.enable", 1);
rhizome_fetch_interval_ms = (int) confValueGetInt64Range("rhizome.fetch_interval_ms", 3000, 1, 3600000);
_rhizome_enabled = confValueGetBoolean("rhizome.enable", 1);
_rhizome_fetch_delay_ms = (int) confValueGetInt64Range("rhizome.fetch_delay_ms", 50, 1, 3600000);
return 0;
}
int rhizome_enabled()
{
if (rhizome_enabled_flag < 0)
if (_rhizome_enabled < 0)
rhizome_configure();
return rhizome_enabled_flag;
return _rhizome_enabled;
}
int rhizome_fetch_delay_ms()
{
if (_rhizome_fetch_delay_ms < 1)
rhizome_configure();
return _rhizome_fetch_delay_ms;
}
/* Import a bundle from a pair of files, one containing the manifest and the optional other

View File

@ -150,10 +150,11 @@ typedef struct rhizome_manifest {
#define RHIZOME_SERVICE_MESHMS "MeshMS1"
extern long long rhizome_space;
extern int rhizome_fetch_interval_ms;
extern unsigned short rhizome_http_server_port;
int rhizome_configure();
int rhizome_enabled();
int rhizome_fetch_delay_ms();
int rhizome_set_datastore_path(const char *path);

View File

@ -91,7 +91,8 @@ struct rhizome_fetch_queue rhizome_fetch_queues[] = {
#define NQUEUES NELS(rhizome_fetch_queues)
struct profile_total fetch_stats;
static struct sched_ent sched_activate = STRUCT_SCHED_ENT_UNUSED;
static struct profile_total fetch_stats;
/* Find a queue suitable for a fetch of the given number of bytes. If there is no suitable queue,
* return NULL.
@ -440,136 +441,6 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m)
return rhizome_bundle_import(m, m->ttl - 1 /* TTL */);
}
/* Queue a fetch for the payload of the given manifest. If 'peerip' is not NULL, then it is used as
* the port and IP address of an HTTP server from which the fetch is performed. Otherwise the fetch
* is performed over MDP.
*
* If the fetch cannot be queued for any reason (error, queue full, no suitable queue) then the
* manifest is freed and returns -1. Otherwise, the pointer to the manifest is stored in the queue
* entry and the manifest is freed when the fetch has completed or is abandoned for any reason.
*
* Verifies manifests as late as possible to avoid wasting time.
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip)
{
IN();
const char *bid = alloca_tohex_bid(m->cryptoSignPublic);
int priority=100; /* normal priority */
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Considering import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority);
if (rhizome_manifest_version_cache_lookup(m)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG(" already have that version or newer");
rhizome_manifest_free(m);
RETURN(-1);
}
if (debug & DEBUG_RHIZOME_RX) {
long long stored_version;
if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'", bid) > 0)
DEBUGF(" is new (have version %lld)", stored_version);
}
if (m->fileLength == 0) {
if (rhizome_manifest_verify(m) != 0) {
WHY("Error verifying manifest when considering for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
rhizome_import_received_bundle(m);
RETURN(0);
}
// Find the proper queue for the payload. If there is none suitable, it is an error.
struct rhizome_fetch_queue *qi = rhizome_find_queue(m->fileLength);
if (!qi) {
WHYF("No suitable fetch queue for bundle size=%lld", m->fileLength);
rhizome_manifest_free(m);
RETURN(-1);
}
// Search all the queues for the same manifest (it could be in any queue because its payload size
// may have changed between versions.) If a newer or the same version is already queued, then
// ignore this one. Otherwise, unqueue all older candidates.
int ci = -1;
int i, j;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
for (j = 0; j < q->candidate_queue_size; ) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (c->manifest) {
if (memcmp(m->cryptoSignPublic, c->manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) {
if (c->manifest->version >= m->version) {
rhizome_manifest_free(m);
RETURN(0);
}
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
rhizome_fetch_unqueue(q, j);
} else {
if (ci == -1 && q == qi && c->priority < priority)
ci = j;
++j;
}
} else {
if (ci == -1 && q == qi)
ci = j;
break;
}
}
}
// No duplicate was found, so if no free queue place was found either then bail out.
if (ci == -1) {
rhizome_manifest_free(m);
RETURN(1);
}
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, j);
c->manifest = m;
c->peer = *peerip;
c->priority = priority;
if (debug & DEBUG_RHIZOME_RX) {
DEBUG("Rhizome fetch queues:");
int i, j;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
for (j = 0; j < q->candidate_queue_size; ++j) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (!c->manifest)
break;
DEBUGF("%d:%d manifest=%p bid=%s priority=%d size=%lld", i, j,
c->manifest,
alloca_tohex_bid(c->manifest->cryptoSignPublic),
c->priority,
(long long) c->manifest->fileLength
);
}
}
}
RETURN(0);
}
static int schedule_fetch(struct rhizome_fetch_slot *slot)
{
int sock = -1;
@ -608,19 +479,19 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->peer.sin_family, buf, ntohs(slot->peer.sin_port), alloca_str_toprint(slot->request)
);
slot->alarm.poll.fd = sock;
slot->request_ofs=0;
slot->state=RHIZOME_FETCH_CONNECTING;
slot->request_ofs = 0;
slot->state = RHIZOME_FETCH_CONNECTING;
slot->file = file;
slot->file_len=-1;
slot->file_ofs=0;
slot->file_len = -1;
slot->file_ofs = 0;
/* Watch for activity on the socket */
slot->alarm.function=rhizome_fetch_poll;
fetch_stats.name="rhizome_fetch_poll";
slot->alarm.stats=&fetch_stats;
slot->alarm.poll.events=POLLIN|POLLOUT;
slot->alarm.function = rhizome_fetch_poll;
fetch_stats.name = "rhizome_fetch_poll";
slot->alarm.stats = &fetch_stats;
slot->alarm.poll.events = POLLIN|POLLOUT;
watch(&slot->alarm);
/* And schedule a timeout alarm */
slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
return 0;
@ -865,21 +736,158 @@ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot)
}
}
/* Called periodically to start any queued fetches that may not have been started as soon as the
* slot became available, for some unknown reason.
*
* This method is probably unnecessary.
/* Called soon after any fetch candidate is queued, to start any queued fetches.
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
void rhizome_start_next_queued_fetches(struct sched_ent *alarm)
static void rhizome_start_next_queued_fetches(struct sched_ent *alarm)
{
int i;
for (i = 0; i < NQUEUES; ++i)
rhizome_start_next_queued_fetch(&rhizome_fetch_queues[i].active);
alarm->alarm = gettime_ms() + rhizome_fetch_interval_ms;
alarm->deadline = alarm->alarm + rhizome_fetch_interval_ms * 3;
schedule(alarm);
}
/* Queue a fetch for the payload of the given manifest. If 'peerip' is not NULL, then it is used as
* the port and IP address of an HTTP server from which the fetch is performed. Otherwise the fetch
* is performed over MDP.
*
* If the fetch cannot be queued for any reason (error, queue full, no suitable queue) then the
* manifest is freed and returns -1. Otherwise, the pointer to the manifest is stored in the queue
* entry and the manifest is freed when the fetch has completed or is abandoned for any reason.
*
* Verifies manifests as late as possible to avoid wasting time.
*
* This function does not activate any fetches, it just queues the fetch candidates and sets an
* alarm that will trip as soon as there is no pending I/O, or at worst, in 500ms. This allows a
* full packet's worth of Rhizome advertisements to be processed, queued and prioritised before
* deciding which fetches to perform first.
*
* @author Andrew Bettison <andrew@servalproject.com>
*/
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip)
{
IN();
const char *bid = alloca_tohex_bid(m->cryptoSignPublic);
int priority=100; /* normal priority */
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Considering import bid=%s version=%lld size=%lld priority=%d:", bid, m->version, m->fileLength, priority);
if (rhizome_manifest_version_cache_lookup(m)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG(" already have that version or newer");
rhizome_manifest_free(m);
RETURN(-1);
}
if (debug & DEBUG_RHIZOME_RX) {
long long stored_version;
if (sqlite_exec_int64(&stored_version, "select version from manifests where id='%s'", bid) > 0)
DEBUGF(" is new (have version %lld)", stored_version);
}
if (m->fileLength == 0) {
if (rhizome_manifest_verify(m) != 0) {
WHY("Error verifying manifest when considering for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
rhizome_import_received_bundle(m);
RETURN(0);
}
// Find the proper queue for the payload. If there is none suitable, it is an error.
struct rhizome_fetch_queue *qi = rhizome_find_queue(m->fileLength);
if (!qi) {
WHYF("No suitable fetch queue for bundle size=%lld", m->fileLength);
rhizome_manifest_free(m);
RETURN(-1);
}
// Search all the queues for the same manifest (it could be in any queue because its payload size
// may have changed between versions.) If a newer or the same version is already queued, then
// ignore this one. Otherwise, unqueue all older candidates.
int ci = -1;
int i, j;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
for (j = 0; j < q->candidate_queue_size; ) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (c->manifest) {
if (memcmp(m->cryptoSignPublic, c->manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) {
if (c->manifest->version >= m->version) {
rhizome_manifest_free(m);
RETURN(0);
}
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
rhizome_fetch_unqueue(q, j);
} else {
if (ci == -1 && q == qi && c->priority < priority)
ci = j;
++j;
}
} else {
if (ci == -1 && q == qi)
ci = j;
break;
}
}
}
// No duplicate was found, so if no free queue place was found either then bail out.
if (ci == -1) {
rhizome_manifest_free(m);
RETURN(1);
}
if (!m->selfSigned && rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m, peerip, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, j);
c->manifest = m;
c->peer = *peerip;
c->priority = priority;
if (debug & DEBUG_RHIZOME_RX) {
DEBUG("Rhizome fetch queues:");
int i, j;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
for (j = 0; j < q->candidate_queue_size; ++j) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (!c->manifest)
break;
DEBUGF("%d:%d manifest=%p bid=%s priority=%d size=%lld", i, j,
c->manifest,
alloca_tohex_bid(c->manifest->cryptoSignPublic),
c->priority,
(long long) c->manifest->fileLength
);
}
}
}
if (!is_scheduled(&sched_activate)) {
sched_activate.function = rhizome_start_next_queued_fetches;
sched_activate.stats = NULL;
sched_activate.alarm = gettime_ms() + rhizome_fetch_delay_ms();
sched_activate.deadline = sched_activate.alarm + 5000;
schedule(&sched_activate);
}
RETURN(0);
}
static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
@ -985,7 +993,7 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
// reset inactivity timeout
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
}

View File

@ -757,6 +757,7 @@ void sigIoHandler(int signal);
int overlay_mdp_setup_sockets();
int is_scheduled(const struct sched_ent *alarm);
int _schedule(struct __sourceloc whence, struct sched_ent *alarm);
int _unschedule(struct __sourceloc whence, struct sched_ent *alarm);
int _watch(struct __sourceloc whence, struct sched_ent *alarm);
@ -770,7 +771,6 @@ int fd_poll();
void overlay_interface_discover(struct sched_ent *alarm);
void overlay_dummy_poll(struct sched_ent *alarm);
void overlay_route_tick(struct sched_ent *alarm);
void rhizome_start_next_queued_fetches(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm);
void overlay_mdp_poll(struct sched_ent *alarm);
void fd_periodicstats(struct sched_ent *alarm);

View File

@ -44,7 +44,6 @@ configure_servald_server() {
executeOk_servald config set server.respawn_on_signal off
executeOk_servald config set mdp.wifi.tick_ms 500
executeOk_servald config set mdp.selfannounce.ticks_per_full_address 1
executeOk_servald config set rhizome.fetch_interval_ms 100
}
setup_curl_7() {