Only request more BARs when we know this wont trigger a useless packet storm

This commit is contained in:
Jeremy Lakeman 2013-07-03 10:52:16 +09:30
parent becf199804
commit 986e664a3a

View File

@ -132,7 +132,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
if (state->bar_count >= CACHE_BARS) if (state->bar_count >= CACHE_BARS)
return; return;
if (state->next_request<=gettime_ms()){ if (state->next_request<=now){
if (state->sync_end < state->highest_seen){ if (state->sync_end < state->highest_seen){
rhizome_sync_request(subscriber, state->sync_end, 1); rhizome_sync_request(subscriber, state->sync_end, 1);
}else if(state->sync_start >0){ }else if(state->sync_start >0){
@ -142,7 +142,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
if (config.debug.rhizome) if (config.debug.rhizome)
DEBUGF("BAR sync with %s complete", alloca_tohex_sid(subscriber->sid)); DEBUGF("BAR sync with %s complete", alloca_tohex_sid(subscriber->sid));
} }
state->next_request = gettime_ms()+500; state->next_request = now+5000;
} }
} }
@ -181,18 +181,26 @@ int rhizome_sync_bundle_inserted(const unsigned char *bar)
return 0; return 0;
} }
static void sync_cache_bar(struct rhizome_sync *state, unsigned char *bar, uint64_t token) static int sync_cache_bar(struct rhizome_sync *state, unsigned char *bar, uint64_t token)
{ {
int ret=0;
// check the database before adding the BAR to the list // check the database before adding the BAR to the list
if (token!=0 && rhizome_is_bar_interesting(bar)!=0){ if (token!=0 && rhizome_is_bar_interesting(bar)!=0){
bcopy(bar, state->bars[state->bar_count].bar, RHIZOME_BAR_BYTES); bcopy(bar, state->bars[state->bar_count].bar, RHIZOME_BAR_BYTES);
state->bars[state->bar_count].next_request = gettime_ms(); state->bars[state->bar_count].next_request = gettime_ms();
state->bar_count++; state->bar_count++;
ret=1;
} }
if (state->sync_end < token) if (state->sync_end < token){
state->sync_end = token; state->sync_end = token;
if (state->sync_start > token) ret=1;
}
if (state->sync_start > token){
state->sync_start = token; state->sync_start = token;
ret=1;
}
return ret;
} }
static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_sync *state, struct overlay_buffer *b) static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_sync *state, struct overlay_buffer *b)
@ -255,20 +263,25 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
// extend the set of BARs we have synced from this peer // extend the set of BARs we have synced from this peer
// we require the list of BARs to be either ASC or DESC and include BARs for *all* manifests in that range // we require the list of BARs to be either ASC or DESC and include BARs for *all* manifests in that range
// TODO stop if we are taking too much CPU time. // TODO stop if we are taking too much CPU time.
for (i=mid_point; i<bar_count && state->bar_count < BARS_PER_RESPONSE; i++) int added=0;
sync_cache_bar(state, bars[i], bar_tokens[i]); for (i=mid_point; i<bar_count && state->bar_count < BARS_PER_RESPONSE; i++){
for (i=mid_point -1; i>=0 && state->bar_count < BARS_PER_RESPONSE; i--) if (sync_cache_bar(state, bars[i], bar_tokens[i]))
sync_cache_bar(state, bars[i], bar_tokens[i]); added=1;
}
for (i=mid_point -1; i>=0 && state->bar_count < BARS_PER_RESPONSE; i--){
if (sync_cache_bar(state, bars[i], bar_tokens[i]))
added=1;
}
if (config.debug.rhizome) if (config.debug.rhizome)
DEBUGF("Synced %llu - %llu with %s", state->sync_start, state->sync_end, alloca_tohex_sid(subscriber->sid)); DEBUGF("Synced %llu - %llu with %s", state->sync_start, state->sync_end, alloca_tohex_sid(subscriber->sid));
state->next_request = gettime_ms(); if (added)
state->next_request = gettime_ms();
} }
} }
static int append_response(struct overlay_buffer *b, uint64_t token, const unsigned char *bar) static int append_response(struct overlay_buffer *b, uint64_t token, const unsigned char *bar)
{ {
ob_checkpoint(b);
if (ob_append_packed_ui64(b, token)) if (ob_append_packed_ui64(b, token))
return -1; return -1;
if (bar){ if (bar){
@ -280,6 +293,7 @@ static int append_response(struct overlay_buffer *b, uint64_t token, const unsig
return -1; return -1;
bzero(ptr, RHIZOME_BAR_BYTES); bzero(ptr, RHIZOME_BAR_BYTES);
} }
ob_checkpoint(b);
return 0; return 0;
} }
@ -310,6 +324,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
struct overlay_buffer *b = ob_static(mdp.out.payload, sizeof(mdp.out.payload)); struct overlay_buffer *b = ob_static(mdp.out.payload, sizeof(mdp.out.payload));
ob_append_byte(b, MSG_TYPE_BARS); ob_append_byte(b, MSG_TYPE_BARS);
ob_checkpoint(b);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT; sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement; sqlite3_stmt *statement;
@ -343,9 +358,12 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
// make sure we include the exact rowid that was requested, even if we just deleted / replaced the manifest // make sure we include the exact rowid that was requested, even if we just deleted / replaced the manifest
if (count==0 && rowid!=token){ if (count==0 && rowid!=token){
if (token!=HEAD_FLAG){ if (token!=HEAD_FLAG){
append_response(b, token, NULL); if (append_response(b, token, NULL))
count++; ob_rewind(b);
last = token; else{
count++;
last = token;
}
}else }else
token = rowid; token = rowid;
} }
@ -367,9 +385,12 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
// send a zero lower bound if we reached the end of our manifest list // send a zero lower bound if we reached the end of our manifest list
if (count && count < BARS_PER_RESPONSE && !forwards){ if (count && count < BARS_PER_RESPONSE && !forwards){
append_response(b, 0, NULL); if (append_response(b, 0, NULL))
last = 0; ob_rewind(b);
count++; else {
last = 0;
count++;
}
} }
sqlite3_finalize(statement); sqlite3_finalize(statement);