Simplify fetch decisions when manifest arrives

This commit is contained in:
Jeremy Lakeman 2014-06-28 17:41:05 +09:30
parent 588b95ab20
commit 58ea5a1c8b
3 changed files with 49 additions and 126 deletions

View File

@ -837,4 +837,5 @@ int rhizome_database_filehash_from_id(const rhizome_bid_t *bidp, uint64_t versio
int overlay_mdp_service_rhizome_sync(struct internal_mdp_header *header, struct overlay_buffer *payload); int overlay_mdp_service_rhizome_sync(struct internal_mdp_header *header, struct overlay_buffer *payload);
int rhizome_sync_bundle_inserted(const unsigned char *bar); int rhizome_sync_bundle_inserted(const unsigned char *bar);
DECLARE_ALARM(rhizome_fetch_status);
#endif //__SERVAL_DNA__RHIZOME_H #endif //__SERVAL_DNA__RHIZOME_H

View File

@ -42,8 +42,6 @@ struct rhizome_fetch_candidate {
for MDP. */ for MDP. */
struct socket_address addr; struct socket_address addr;
const struct subscriber *peer; const struct subscriber *peer;
int priority;
}; };
/* Represents an active fetch (in progress) of a bundle payload (.manifest != NULL) or of a bundle /* Represents an active fetch (in progress) of a bundle payload (.manifest != NULL) or of a bundle
@ -154,51 +152,34 @@ static const char * fetch_state(int state)
} }
} }
static uint64_t rhizome_active_fetch_bytes_received(unsigned q) DEFINE_ALARM(rhizome_fetch_status);
void rhizome_fetch_status(struct sched_ent *alarm)
{ {
assert(q < NQUEUES);
if (rhizome_fetch_queues[q].active.state == RHIZOME_FETCH_FREE)
return 0;
return rhizome_fetch_queues[q].active.write_state.file_offset;
}
static uint64_t rhizome_fetch_queue_bytes()
{
uint64_t bytes = 0;
unsigned i; unsigned i;
DEBUGF("==== Fetch status");
for(i=0;i<NQUEUES;i++){ for(i=0;i<NQUEUES;i++){
if (rhizome_fetch_queues[i].active.state!=RHIZOME_FETCH_FREE){ struct rhizome_fetch_queue *q=&rhizome_fetch_queues[i];
assert(rhizome_fetch_queues[i].active.manifest->filesize != RHIZOME_SIZE_UNSET); unsigned candidates=0;
bytes += rhizome_fetch_queues[i].active.manifest->filesize - rhizome_fetch_queues[i].active.write_state.file_offset; uint64_t candidate_size = 0;
}
unsigned j; unsigned j;
for (j=0;j<rhizome_fetch_queues[i].candidate_queue_size;j++){ for (j=0;j<q->candidate_queue_size;j++){
if (rhizome_fetch_queues[i].candidate_queue[j].manifest) { if (q->candidate_queue[j].manifest){
assert(rhizome_fetch_queues[i].candidate_queue[j].manifest->filesize != RHIZOME_SIZE_UNSET); candidates++;
bytes += rhizome_fetch_queues[i].candidate_queue[j].manifest->filesize; assert(q->candidate_queue[j].manifest->filesize != RHIZOME_SIZE_UNSET);
candidate_size += q->candidate_queue[j].manifest->filesize;
} }
} }
} // if (candidates == 0 && q->active.state==RHIZOME_FETCH_FREE)
return bytes; // continue;
DEBUGF("Fetch slot %d, candidates %u of %u %"PRIu64" bytes, %s %"PRIu64" of %"PRIu64,
i, candidates, q->candidate_queue_size, candidate_size,
fetch_state(q->active.state),
q->active.state==RHIZOME_FETCH_FREE?0:q->active.write_state.file_offset,
q->active.manifest?q->active.manifest->filesize:0);
} }
void rhizome_fetch_log_short_status() time_ms_t now = gettime_ms();
{ RESCHEDULE(alarm, now + 3000, TIME_MS_NEVER_WILL, TIME_MS_NEVER_WILL);
unsigned active = 0;
unsigned i;
for(i=0;i<NQUEUES;i++)
if (rhizome_fetch_queues[i].active.state!=RHIZOME_FETCH_FREE)
active++;
if (!active)
return;
INFOF("Rhizome transfer progress: %"PRIu64",%"PRIu64",%"PRIu64",%"PRIu64",%"PRIu64",%"PRIu64" (remaining %"PRIu64")",
rhizome_active_fetch_bytes_received(0),
rhizome_active_fetch_bytes_received(1),
rhizome_active_fetch_bytes_received(2),
rhizome_active_fetch_bytes_received(3),
rhizome_active_fetch_bytes_received(4),
rhizome_active_fetch_bytes_received(5),
rhizome_fetch_queue_bytes());
} }
int rhizome_fetch_status_html(strbuf b) int rhizome_fetch_status_html(strbuf b)
@ -217,7 +198,7 @@ int rhizome_fetch_status_html(strbuf b)
} }
} }
strbuf_sprintf(b, "<p>Slot %u, (%u of %u [%"PRIu64" bytes]): ", i, candidates, q->candidate_queue_size, candidate_size); strbuf_sprintf(b, "<p>Slot %u, (%u of %u [%"PRIu64" bytes]): ", i, candidates, q->candidate_queue_size, candidate_size);
if (q->active.state!=RHIZOME_FETCH_FREE){ if (q->active.state!=RHIZOME_FETCH_FREE && q->active.manifest){
strbuf_sprintf(b, "%s %"PRIu64" of %"PRIu64" from %s*", strbuf_sprintf(b, "%s %"PRIu64" of %"PRIu64" from %s*",
fetch_state(q->active.state), fetch_state(q->active.state),
q->active.write_state.file_offset, q->active.write_state.file_offset,
@ -230,8 +211,9 @@ int rhizome_fetch_status_html(strbuf b)
return 0; return 0;
} }
static struct sched_ent sched_activate = STRUCT_SCHED_ENT_UNUSED; static void rhizome_start_next_queued_fetches(struct sched_ent *alarm);
static struct profile_total rsnqf_stats = { .name="rhizome_start_next_queued_fetches" }; static struct profile_total rsnqf_stats = { .name="rhizome_start_next_queued_fetches" };
static struct sched_ent sched_activate = { .function = rhizome_start_next_queued_fetches, .stats = &rsnqf_stats };
static struct profile_total fetch_stats = { .name="rhizome_fetch_poll" }; static struct profile_total fetch_stats = { .name="rhizome_fetch_poll" };
/* Find a queue suitable for a fetch of the given number of bytes. If there is no suitable queue, /* Find a queue suitable for a fetch of the given number of bytes. If there is no suitable queue,
@ -897,34 +879,29 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
RETURN(0); RETURN(0);
} }
int priority=100; /* normal priority */
if (config.debug.rhizome_rx) if (config.debug.rhizome_rx)
DEBUGF("Considering import bid=%s version=%"PRIu64" size=%"PRIu64" priority=%d:", DEBUGF("Considering import bid=%s version=%"PRIu64" size=%"PRIu64,
alloca_tohex_rhizome_bid_t(m->cryptoSignPublic), m->version, m->filesize, priority); alloca_tohex_rhizome_bid_t(m->cryptoSignPublic), m->version, m->filesize);
if (!rhizome_is_manifest_interesting(m)) { if (!rhizome_is_manifest_interesting(m)) {
if (config.debug.rhizome_rx) if (config.debug.rhizome_rx)
DEBUG(" already have that version or newer"); DEBUG(" already stored that version or newer");
rhizome_manifest_free(m); rhizome_manifest_free(m);
RETURN(-1); RETURN(-1);
} }
if (config.debug.rhizome_rx) {
uint64_t stored_version;
if (sqlite_exec_uint64(&stored_version, "SELECT version FROM MANIFESTS WHERE id = ?", RHIZOME_BID_T, &m->cryptoSignPublic, END) > 0)
DEBUGF(" is new (have version %"PRIu64")", stored_version);
}
assert(m->filesize != RHIZOME_SIZE_UNSET); assert(m->filesize != RHIZOME_SIZE_UNSET);
if (m->filesize == 0) {
if (!rhizome_manifest_verify(m)) { // if we haven't verified it yet, verify now
WHY("Error verifying manifest when considering for import"); if (!m->selfSigned && !rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */ /* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary, 60000); rhizome_queue_ignore_manifest(m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary, 60000);
rhizome_manifest_free(m); rhizome_manifest_free(m);
RETURN(-1); RETURN(-1);
} }
if (m->filesize == 0) {
rhizome_import_received_bundle(m); rhizome_import_received_bundle(m);
rhizome_manifest_free(m); rhizome_manifest_free(m);
RETURN(0); RETURN(0);
@ -948,30 +925,20 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
unsigned j; unsigned j;
for (j = 0; j < q->candidate_queue_size; ) { for (j = 0; j < q->candidate_queue_size; ) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j]; struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (c->manifest) { if (!c->manifest){
if (ci == -1 && q == qi)
ci = j;
break;
}
if (cmp_rhizome_bid_t(&m->cryptoSignPublic, &c->manifest->cryptoSignPublic) == 0) { if (cmp_rhizome_bid_t(&m->cryptoSignPublic, &c->manifest->cryptoSignPublic) == 0) {
if (c->manifest->version >= m->version) { if (c->manifest->version >= m->version) {
rhizome_manifest_free(m); rhizome_manifest_free(m);
RETURN(0); RETURN(0);
} }
if (!m->selfSigned && !rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
rhizome_fetch_unqueue(q, j); rhizome_fetch_unqueue(q, j);
} else { }else
if (ci == -1 && q == qi && c->priority < priority) j++;
ci = j;
++j;
}
} else {
if (ci == -1 && q == qi)
ci = j;
break;
}
} }
} }
// No duplicate was found, so if no free queue place was found either then bail out. // No duplicate was found, so if no free queue place was found either then bail out.
@ -980,42 +947,12 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
RETURN(1); RETURN(1);
} }
if (!m->selfSigned && !rhizome_manifest_verify(m)) {
WHY("Error verifying manifest when considering queuing for import");
/* Don't waste time looking at this manifest again for a while */
rhizome_queue_ignore_manifest(m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary, 60000);
rhizome_manifest_free(m);
RETURN(-1);
}
struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, ci); struct rhizome_fetch_candidate *c = rhizome_fetch_insert(qi, ci);
c->manifest = m; c->manifest = m;
c->priority = priority;
c->addr = *addr; c->addr = *addr;
c->peer = peer; c->peer = peer;
if (config.debug.rhizome_rx) {
DEBUG("Rhizome fetch queues:");
unsigned i, j;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
for (j = 0; j < q->candidate_queue_size; ++j) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (!c->manifest)
break;
DEBUGF("%d:%d manifest=%p bid=%s priority=%d size=%"PRIu64, i, j,
c->manifest,
alloca_tohex_rhizome_bid_t(c->manifest->cryptoSignPublic),
c->priority,
c->manifest->filesize
);
}
}
}
if (!is_scheduled(&sched_activate)) { if (!is_scheduled(&sched_activate)) {
sched_activate.function = rhizome_start_next_queued_fetches;
sched_activate.stats = &rsnqf_stats;
sched_activate.alarm = gettime_ms() + rhizome_fetch_delay_ms(); sched_activate.alarm = gettime_ms() + rhizome_fetch_delay_ms();
sched_activate.deadline = sched_activate.alarm + config.rhizome.idle_timeout; sched_activate.deadline = sched_activate.alarm + config.rhizome.idle_timeout;
schedule(&sched_activate); schedule(&sched_activate);

View File

@ -269,21 +269,6 @@ int overlay_rhizome_saw_advertisements(struct decode_context *context, struct ov
assert(m->version == summ.version); assert(m->version == summ.version);
assert(m->manifest_body_bytes == summ.body_len); assert(m->manifest_body_bytes == summ.body_len);
// are we already fetching this bundle [or later]?
rhizome_manifest *mf=rhizome_fetch_search(m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary);
if (mf && mf->version >= m->version)
goto next;
if (!rhizome_is_manifest_interesting(m)) {
/* We already have this version or newer */
if (config.debug.rhizome_ads)
DEBUG("We already have that manifest or newer.");
goto next;
}
if (config.debug.rhizome_ads)
DEBUG("Not seen before.");
// start the fetch process! // start the fetch process!
rhizome_suggest_queue_manifest_import(m, &httpaddr, f->source); rhizome_suggest_queue_manifest_import(m, &httpaddr, f->source);
// the above function will free the manifest structure, make sure we don't free it again // the above function will free the manifest structure, make sure we don't free it again