From becf1998043301a3dabaeb35ed8b98bb7f12f951 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 2 Jul 2013 12:11:57 +0930 Subject: [PATCH] Make sure we can keep our fetch queues full by syncing more BARs --- overlay_queue.c | 3 ++- rhizome_sync.c | 37 +++++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/overlay_queue.c b/overlay_queue.c index 7bd37d0f..e7dfb5f9 100644 --- a/overlay_queue.c +++ b/overlay_queue.c @@ -537,7 +537,8 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim goto skip; }else{ if (overlay_frame_append_payload(&packet->context, packet->interface, frame, packet->buffer)){ - // payload was not queued + // payload was not queued, delay the next attempt slightly + frame->dont_send_until = now + 5; goto skip; } } diff --git a/rhizome_sync.c b/rhizome_sync.c index 469124aa..ffb78c15 100644 --- a/rhizome_sync.c +++ b/rhizome_sync.c @@ -28,7 +28,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #define MSG_TYPE_BARS 0 #define MSG_TYPE_REQ 1 -#define CACHE_BARS 20 +#define CACHE_BARS 60 +#define BARS_PER_RESPONSE (400/RHIZOME_BAR_BYTES) #define HEAD_FLAG INT64_MAX @@ -88,6 +89,11 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi if (state->bars[i].next_request > now) continue; + unsigned char *prefix = &state->bars[i].bar[RHIZOME_BAR_PREFIX_OFFSET]; + + if (rhizome_ignore_manifest_check(prefix, RHIZOME_BAR_PREFIX_BYTES)) + continue; + // do we have free space now in the appropriate fetch queue? unsigned char log2_size = state->bars[i].bar[RHIZOME_BAR_FILESIZE_OFFSET]; if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1) @@ -95,7 +101,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi int64_t version = rhizome_bar_version(state->bars[i].bar); // are we already fetching this bundle [or later]? - rhizome_manifest *m=rhizome_fetch_search(&state->bars[i].bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES); + rhizome_manifest *m=rhizome_fetch_search(prefix, RHIZOME_BAR_PREFIX_BYTES); if (m && m->version >= version) continue; @@ -193,13 +199,13 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_ { // find all interesting BARs in the payload and extend our sync range - unsigned char *bars[CACHE_BARS]; - uint64_t bar_tokens[CACHE_BARS]; + unsigned char *bars[BARS_PER_RESPONSE]; + uint64_t bar_tokens[BARS_PER_RESPONSE]; int bar_count = 0; int has_before=0, has_after=0; int mid_point = -1; - while(ob_remaining(b)>0 && bar_count < CACHE_BARS){ + while(ob_remaining(b)>0 && bar_count < BARS_PER_RESPONSE){ bar_tokens[bar_count]=ob_get_packed_ui64(b); bars[bar_count]=ob_get_bytes_ptr(b, RHIZOME_BAR_BYTES); if (!bars[bar_count]) @@ -249,9 +255,9 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_ // extend the set of BARs we have synced from this peer // we require the list of BARs to be either ASC or DESC and include BARs for *all* manifests in that range // TODO stop if we are taking too much CPU time. - for (i=mid_point; ibar_count < CACHE_BARS; i++) + for (i=mid_point; ibar_count < BARS_PER_RESPONSE; i++) sync_cache_bar(state, bars[i], bar_tokens[i]); - for (i=mid_point -1; i>=0 && state->bar_count < CACHE_BARS; i--) + for (i=mid_point -1; i>=0 && state->bar_count < BARS_PER_RESPONSE; i--) sync_cache_bar(state, bars[i], bar_tokens[i]); if (config.debug.rhizome) DEBUGF("Synced %llu - %llu with %s", state->sync_start, state->sync_end, alloca_tohex_sid(subscriber->sid)); @@ -262,6 +268,7 @@ static void sync_process_bar_list(struct subscriber *subscriber, struct rhizome_ static int append_response(struct overlay_buffer *b, uint64_t token, const unsigned char *bar) { + ob_checkpoint(b); if (ob_append_packed_ui64(b, token)) return -1; if (bar){ @@ -277,7 +284,6 @@ static int append_response(struct overlay_buffer *b, uint64_t token, const unsig } static uint64_t max_token=0; - static void sync_send_response(struct subscriber *dest, int forwards, uint64_t token) { IN(); @@ -333,7 +339,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t rhizome_sync_bundle_inserted(bar); } - if (count < CACHE_BARS){ + if (count < BARS_PER_RESPONSE){ // make sure we include the exact rowid that was requested, even if we just deleted / replaced the manifest if (count==0 && rowid!=token){ if (token!=HEAD_FLAG){ @@ -344,12 +350,15 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t token = rowid; } - append_response(b, rowid, bar); - last = rowid; - count++; + if (append_response(b, rowid, bar)) + ob_rewind(b); + else { + last = rowid; + count++; + } } - if (count >=CACHE_BARS && rowid <= max_token) + if (count >= BARS_PER_RESPONSE && rowid <= max_token) break; } @@ -357,7 +366,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t max_token = token; // send a zero lower bound if we reached the end of our manifest list - if (count && count < CACHE_BARS && !forwards){ + if (count && count < BARS_PER_RESPONSE && !forwards){ append_response(b, 0, NULL); last = 0; count++;