Reduce latency impact of BAR lookups based on aggregate time

This commit is contained in:
Jeremy Lakeman 2013-06-14 14:09:42 +09:30
parent b67516558e
commit a18230f591
2 changed files with 61 additions and 50 deletions

View File

@ -439,17 +439,18 @@ sqlite3_stmt *_sqlite_prepare(struct __sourceloc __whence, sqlite_retry_state *r
sqlite3_stmt *_sqlite_prepare_loglevel(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, strbuf stmt)
{
IN();
sqlite3_stmt *statement = NULL;
if (strbuf_overrun(stmt)) {
WHYF("SQL overrun: %s", strbuf_str(stmt));
return NULL;
RETURN(NULL);
}
if (!rhizome_db && rhizome_opendb() == -1)
return NULL;
RETURN(NULL);
while (1) {
switch (sqlite3_prepare_v2(rhizome_db, strbuf_str(stmt), -1, &statement, NULL)) {
case SQLITE_OK:
return statement;
RETURN(statement);
case SQLITE_BUSY:
case SQLITE_LOCKED:
if (retry && _sqlite_retry(__whence, retry, strbuf_str(stmt))) {
@ -459,13 +460,14 @@ sqlite3_stmt *_sqlite_prepare_loglevel(struct __sourceloc __whence, int log_leve
default:
LOGF(log_level, "query invalid, %s: %s", sqlite3_errmsg(rhizome_db), strbuf_str(stmt));
sqlite3_finalize(statement);
return NULL;
RETURN(NULL);
}
}
}
int _sqlite_step_retry(struct __sourceloc __whence, int log_level, sqlite_retry_state *retry, sqlite3_stmt *statement)
{
IN();
int ret = -1;
sqlite_trace_whence = &__whence;
while (statement) {
@ -494,6 +496,7 @@ int _sqlite_step_retry(struct __sourceloc __whence, int log_level, sqlite_retry_
}
}
sqlite_trace_whence = NULL;
OUT();
return ret;
}
@ -1556,48 +1559,14 @@ int rhizome_delete_file(const char *fileid)
return rhizome_delete_file_retry(&retry, fileid);
}
time_ms_t lookup_time=0;
time_ms_t last_bar_lookup=0;
int rhizome_is_bar_interesting(unsigned char *bar){
IN();
if (last_bar_lookup>(gettime_ms()-1000)) {
/* Looking up rhizome bundles by BAR can be slow.
If so, then ignore some percentage of BARs. */
if (lookup_time>10) {
// if >10ms then only check 1 in 8 BARs
if (random()&0x7) RETURN(0);
}
if (lookup_time>100) {
// if >10ms then only check 1 in 32 (1 in 8 of 1 in 4) BARs
if (random()&0x3) RETURN(0);
}
}
time_ms_t start_time=gettime_ms();
int64_t version = rhizome_bar_version(bar);
unsigned char log2_size = bar[RHIZOME_BAR_FILESIZE_OFFSET];
int ret=1;
int64_t version = rhizome_bar_version(bar);
char id_hex[RHIZOME_MANIFEST_ID_STRLEN];
tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
strcat(id_hex, "%");
// are we ignoring this manifest?
if (rhizome_ignore_manifest_check(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES)){
DEBUGF("Ignoring %s", id_hex);
RETURN(0);
}
// do we have free space in a fetch queue?
if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1)
RETURN(0);
// are we already fetching this bundle [or later]?
rhizome_manifest *m=rhizome_fetch_search(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
if (m && m->version >= version)
RETURN(0);
// do we have this bundle [or later]?
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare(&retry,
@ -1616,11 +1585,6 @@ int rhizome_is_bar_interesting(unsigned char *bar){
}
sqlite3_finalize(statement);
time_ms_t end_time=gettime_ms();
lookup_time=end_time-start_time;
last_bar_lookup=end_time;
if (lookup_time>50) WARNF("Looking up a BAR took %lldms",lookup_time);
RETURN(ret);
OUT();
}

View File

@ -254,6 +254,8 @@ error:
return -1;
}
time_ms_t lookup_time=0;
int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long now)
{
IN();
@ -388,13 +390,50 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
mdp.out.payload_length=0;
// parse BAR's
while(ob_remaining(f->payload)>0){
unsigned char *bar=ob_get_bytes_ptr(f->payload, RHIZOME_BAR_BYTES);
unsigned char *bars[50];
int bar_count=0;
while(ob_remaining(f->payload)>0 && bar_count<50){
unsigned char *bar;
bars[bar_count]=bar=ob_get_bytes_ptr(f->payload, RHIZOME_BAR_BYTES);
if (!bar){
WARNF("Expected whole BAR @%x (only %d bytes remain)", ob_position(f->payload), ob_remaining(f->payload));
break;
}
if (rhizome_is_bar_interesting(bar)==1){
// are we ignoring this manifest?
if (rhizome_ignore_manifest_check(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES))
continue;
// do we have free space in a fetch queue?
unsigned char log2_size = bar[RHIZOME_BAR_FILESIZE_OFFSET];
if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1)
continue;
int64_t version = rhizome_bar_version(bar);
// are we already fetching this bundle [or later]?
rhizome_manifest *m=rhizome_fetch_search(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
if (m && m->version >= version)
continue;
bar_count++;
}
// perform costly database lookups
int index;
int test_count=0;
int max_tests = (lookup_time?(int)(40 / lookup_time):bar_count);
if (max_tests<=0)
max_tests=2;
time_ms_t start_time = gettime_ms();
for (index=0;index<bar_count;index++){
if (test_count > max_tests || gettime_ms() - start_time >40)
break;
if (bar_count > max_tests && random()%bar_count >= max_tests)
continue;
test_count++;
if (rhizome_is_bar_interesting(bars[index])==1){
// add a request for the manifest
if (mdp.out.payload_length==0){
bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE);
@ -409,12 +448,19 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
mdp.out.queue=OQ_ORDINARY;
}
DEBUGF("Requesting manifest for BAR %s", alloca_tohex(bar, RHIZOME_BAR_BYTES));
bcopy(bar, &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES);
DEBUGF("Requesting manifest for BAR %s", alloca_tohex(bars[index], RHIZOME_BAR_BYTES));
bcopy(bars[index], &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES);
mdp.out.payload_length+=RHIZOME_BAR_BYTES;
}
}
time_ms_t end_time=gettime_ms();
if (test_count)
lookup_time=(end_time-start_time)/test_count;
else
lookup_time = (end_time - start_time);
if (mdp.out.payload_length>0)
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
@ -422,3 +468,4 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long
RETURN(0);
OUT();
}