From 9f39160c81f245d089f65144ae7a9c45e0141f0e Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 28 May 2014 16:33:21 +0930 Subject: [PATCH] If we fail to fetch a bundle after 10 tries, skip it --- rhizome_sync.c | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/rhizome_sync.c b/rhizome_sync.c index 30d77b3a..89461208 100644 --- a/rhizome_sync.c +++ b/rhizome_sync.c @@ -28,6 +28,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #define MSG_TYPE_BARS 0 #define MSG_TYPE_REQ 1 +#define MAX_TRIES 10 #define CACHE_BARS 60 #define MAX_OLD_BARS 40 #define BARS_PER_RESPONSE ((int)400/RHIZOME_BAR_BYTES) @@ -37,6 +38,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. struct bar_entry { unsigned char bar[RHIZOME_BAR_BYTES]; + unsigned tries; time_ms_t next_request; }; @@ -47,7 +49,8 @@ struct rhizome_sync uint64_t sync_end; uint64_t highest_seen; unsigned char sync_complete; - uint64_t bars_seen; + uint32_t bars_seen; + uint32_t bars_skipped; time_ms_t start_time; time_ms_t completed; time_ms_t next_request; @@ -65,12 +68,13 @@ void rhizome_sync_status_html(struct strbuf *b, struct subscriber *subscriber) if (!subscriber->sync_state) return; struct rhizome_sync *state=subscriber->sync_state; - strbuf_sprintf(b, "Seen %"PRId64" BARs [%"PRId64" to %"PRId64" of %"PRId64"], %d interesting
", + strbuf_sprintf(b, "Seen %u BARs [%"PRId64" to %"PRId64" of %"PRId64"], %d interesting, %d skipped
", state->bars_seen, state->sync_start, state->sync_end, state->highest_seen, - state->bar_count); + state->bar_count, + state->bars_skipped); } static void rhizome_sync_request(struct subscriber *subscriber, uint64_t token, unsigned char forwards) @@ -107,7 +111,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi bzero(&header, sizeof header); struct overlay_buffer *payload = NULL; - for (i=0;i < state->bar_count;i++){ + for (i=state->bar_count -1;i>=0;i--){ if (state->bars[i].next_request > now) continue; @@ -124,8 +128,10 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi uint64_t version = rhizome_bar_version(state->bars[i].bar); // are we already fetching this bundle [or later]? rhizome_manifest *m=rhizome_fetch_search(prefix, RHIZOME_BAR_PREFIX_BYTES); - if (m && m->version >= version) + if (m && m->version >= version){ + state->bars[i].next_request = now+5000; continue; + } if (!payload){ header.source = my_subscriber; @@ -145,7 +151,18 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi ob_append_bytes(payload, state->bars[i].bar, RHIZOME_BAR_BYTES); + state->bars[i].tries--; state->bars[i].next_request = now+5000; + if (!state->bars[i].tries){ + // remove this BAR and shift the last BAR down to this position if required. + if (config.debug.rhizome) + DEBUGF("Giving up on fetching BAR %s", alloca_tohex(state->bars[i].bar, RHIZOME_BAR_BYTES)); + state->bar_count --; + if (ibar_count) + state->bars[i] = state->bars[state->bar_count]; + state->bars_skipped++; + } + requests++; if (requests>=BARS_PER_RESPONSE) break; @@ -197,9 +214,8 @@ static int sync_bundle_inserted(struct subscriber *subscriber, void *context) if (config.debug.rhizome) DEBUGF("Removing BAR %s from queue", alloca_tohex(this_bar, RHIZOME_BAR_BYTES)); state->bar_count --; - if (ibar_count){ + if (ibar_count) state->bars[i] = state->bars[state->bar_count]; - } } } @@ -221,6 +237,7 @@ static int sync_cache_bar(struct rhizome_sync *state, unsigned char *bar, uint64 if (token!=0 && rhizome_is_bar_interesting(bar)!=0){ bcopy(bar, state->bars[state->bar_count].bar, RHIZOME_BAR_BYTES); state->bars[state->bar_count].next_request = gettime_ms(); + state->bars[state->bar_count].tries = MAX_TRIES; state->bar_count++; ret=1; }