Make sure we can keep our fetch queues full by syncing more BARs

This commit is contained in:
Jeremy Lakeman 2013-07-02 12:11:57 +09:30
parent 0c681b879d
commit becf199804
2 changed files with 25 additions and 15 deletions

View File

@ -537,7 +537,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
goto skip;
}else{
if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)){
// payload was not queued
// payload was not queued, delay the next attempt slightly
frame->dont_send_until = now + 5;
goto skip;
}
}

View File

@ -28,7 +28,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MSG_TYPE_BARS 0
#define MSG_TYPE_REQ 1
#define CACHE_BARS 20
#define CACHE_BARS 60
#define BARS_PER_RESPONSE (400/RHIZOME_BAR_BYTES)
#define HEAD_FLAG INT64_MAX
@ -88,6 +89,11 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
if (state->bars[i].next_request > now)
continue;
unsigned char *prefix = &state->bars[i].bar[RHIZOME_BAR_PREFIX_OFFSET];
if (rhizome_ignore_manifest_check(prefix, RHIZOME_BAR_PREFIX_BYTES))
continue;
// do we have free space now in the appropriate fetch queue?
unsigned char log2_size = state->bars[i].bar[RHIZOME_BAR_FILESIZE_OFFSET];
if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1)
@ -95,7 +101,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
int64_t version = rhizome_bar_version(state->bars[i].bar);
// are we already fetching this bundle [or later]?
rhizome_manifest *m=rhizome_fetch_search(&state->bars[i].bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
rhizome_manifest *m=rhizome_fetch_search(prefix, RHIZOME_BAR_PREFIX_BYTES);
if (m && m->version >= version)
continue;
@ -193,13 +199,13 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
{
// find all interesting BARs in the payload and extend our sync range
unsigned char *bars[CACHE_BARS];
uint64_t bar_tokens[CACHE_BARS];
unsigned char *bars[BARS_PER_RESPONSE];
uint64_t bar_tokens[BARS_PER_RESPONSE];
int bar_count = 0;
int has_before=0, has_after=0;
int mid_point = -1;
while(ob_remaining(b)>0 && bar_count < CACHE_BARS){
while(ob_remaining(b)>0 && bar_count < BARS_PER_RESPONSE){
bar_tokens[bar_count]=ob_get_packed_ui64(b);
bars[bar_count]=ob_get_bytes_ptr(b, RHIZOME_BAR_BYTES);
if (!bars[bar_count])
@ -249,9 +255,9 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
// extend the set of BARs we have synced from this peer
// we require the list of BARs to be either ASC or DESC and include BARs for *all* manifests in that range
// TODO stop if we are taking too much CPU time.
for (i=mid_point; i<bar_count && state->bar_count < CACHE_BARS; i++)
for (i=mid_point; i<bar_count && state->bar_count < BARS_PER_RESPONSE; i++)
sync_cache_bar(state, bars[i], bar_tokens[i]);
for (i=mid_point -1; i>=0 && state->bar_count < CACHE_BARS; i--)
for (i=mid_point -1; i>=0 && state->bar_count < BARS_PER_RESPONSE; i--)
sync_cache_bar(state, bars[i], bar_tokens[i]);
if (config.debug.rhizome)
DEBUGF("Synced %llu - %llu with %s", state->sync_start, state->sync_end, alloca_tohex_sid(subscriber->sid));
@ -262,6 +268,7 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_
static int append_response(struct overlay_buffer *b, uint64_t token, const unsigned char *bar)
{
ob_checkpoint(b);
if (ob_append_packed_ui64(b, token))
return -1;
if (bar){
@ -277,7 +284,6 @@ static int append_response(struct overlay_buffer *b, uint64_t token, const unsig
}
static uint64_t max_token=0;
static void sync_send_response(struct subscriber *dest, int forwards, uint64_t token)
{
IN();
@ -333,7 +339,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
rhizome_sync_bundle_inserted(bar);
}
if (count < CACHE_BARS){
if (count < BARS_PER_RESPONSE){
// make sure we include the exact rowid that was requested, even if we just deleted / replaced the manifest
if (count==0 && rowid!=token){
if (token!=HEAD_FLAG){
@ -344,12 +350,15 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
token = rowid;
}
append_response(b, rowid, bar);
last = rowid;
count++;
if (append_response(b, rowid, bar))
ob_rewind(b);
else {
last = rowid;
count++;
}
}
if (count >=CACHE_BARS && rowid <= max_token)
if (count >= BARS_PER_RESPONSE && rowid <= max_token)
break;
}
@ -357,7 +366,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
max_token = token;
// send a zero lower bound if we reached the end of our manifest list
if (count && count < CACHE_BARS && !forwards){
if (count && count < BARS_PER_RESPONSE && !forwards){
append_response(b, 0, NULL);
last = 0;
count++;