If we fail to fetch a bundle after 10 tries, skip it

This commit is contained in:
Jeremy Lakeman 2014-05-28 16:33:21 +09:30
parent aa670fc8a5
commit 9f39160c81

View File

@ -28,6 +28,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MSG_TYPE_BARS 0
#define MSG_TYPE_REQ 1
#define MAX_TRIES 10
#define CACHE_BARS 60
#define MAX_OLD_BARS 40
#define BARS_PER_RESPONSE ((int)400/RHIZOME_BAR_BYTES)
@ -37,6 +38,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
struct bar_entry
{
unsigned char bar[RHIZOME_BAR_BYTES];
unsigned tries;
time_ms_t next_request;
};
@ -47,7 +49,8 @@ struct rhizome_sync
uint64_t sync_end;
uint64_t highest_seen;
unsigned char sync_complete;
uint64_t bars_seen;
uint32_t bars_seen;
uint32_t bars_skipped;
time_ms_t start_time;
time_ms_t completed;
time_ms_t next_request;
@ -65,12 +68,13 @@ void rhizome_sync_status_html(struct strbuf *b, struct subscriber *subscriber)
if (!subscriber->sync_state)
return;
struct rhizome_sync *state=subscriber->sync_state;
strbuf_sprintf(b, "Seen %"PRId64" BARs [%"PRId64" to %"PRId64" of %"PRId64"], %d interesting<br>",
strbuf_sprintf(b, "Seen %u BARs [%"PRId64" to %"PRId64" of %"PRId64"], %d interesting, %d skipped<br>",
state->bars_seen,
state->sync_start,
state->sync_end,
state->highest_seen,
state->bar_count);
state->bar_count,
state->bars_skipped);
}
static void rhizome_sync_request(struct subscriber *subscriber, uint64_t token, unsigned char forwards)
@ -107,7 +111,7 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
bzero(&header, sizeof header);
struct overlay_buffer *payload = NULL;
for (i=0;i < state->bar_count;i++){
for (i=state->bar_count -1;i>=0;i--){
if (state->bars[i].next_request > now)
continue;
@ -124,8 +128,10 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
uint64_t version = rhizome_bar_version(state->bars[i].bar);
// are we already fetching this bundle [or later]?
rhizome_manifest *m=rhizome_fetch_search(prefix, RHIZOME_BAR_PREFIX_BYTES);
if (m && m->version >= version)
if (m && m->version >= version){
state->bars[i].next_request = now+5000;
continue;
}
if (!payload){
header.source = my_subscriber;
@ -145,7 +151,18 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi
ob_append_bytes(payload, state->bars[i].bar, RHIZOME_BAR_BYTES);
state->bars[i].tries--;
state->bars[i].next_request = now+5000;
if (!state->bars[i].tries){
// remove this BAR and shift the last BAR down to this position if required.
if (config.debug.rhizome)
DEBUGF("Giving up on fetching BAR %s", alloca_tohex(state->bars[i].bar, RHIZOME_BAR_BYTES));
state->bar_count --;
if (i<state->bar_count)
state->bars[i] = state->bars[state->bar_count];
state->bars_skipped++;
}
requests++;
if (requests>=BARS_PER_RESPONSE)
break;
@ -197,9 +214,8 @@ static int sync_bundle_inserted(struct subscriber *subscriber, void *context)
if (config.debug.rhizome)
DEBUGF("Removing BAR %s from queue", alloca_tohex(this_bar, RHIZOME_BAR_BYTES));
state->bar_count --;
if (i<state->bar_count){
if (i<state->bar_count)
state->bars[i] = state->bars[state->bar_count];
}
}
}
@ -221,6 +237,7 @@ static int sync_cache_bar(struct rhizome_sync *state, unsigned char *bar, uint64
if (token!=0 && rhizome_is_bar_interesting(bar)!=0){
bcopy(bar, state->bars[state->bar_count].bar, RHIZOME_BAR_BYTES);
state->bars[state->bar_count].next_request = gettime_ms();
state->bars[state->bar_count].tries = MAX_TRIES;
state->bar_count++;
ret=1;
}