From 96c0889f9a2ed4b9cdf395d2fc6b9bddae3bda63 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Thu, 11 Apr 2013 15:24:13 +0930 Subject: [PATCH] Don't ask for manifests if we don't have room in transfer queues --- performance_timing.c | 12 ++++----- rhizome.h | 7 +++-- rhizome_database.c | 7 ++++- rhizome_fetch.c | 64 ++++++++++++++++++++++++++++++++++---------- 4 files changed, 66 insertions(+), 24 deletions(-) diff --git a/performance_timing.c b/performance_timing.c index 06460c1f..93085e99 100644 --- a/performance_timing.c +++ b/performance_timing.c @@ -151,17 +151,15 @@ int fd_showstats() // Show periodic rhizome transfer information, but only // if there are some active rhizome transfers. - if ((rhizome_active_fetch_bytes_received(0)+ - rhizome_active_fetch_bytes_received(1)+ - rhizome_active_fetch_bytes_received(2)+ - rhizome_active_fetch_bytes_received(3)+ - rhizome_active_fetch_bytes_received(4))!=-5) - INFOF("Rhizome transfer progress: %d,%d,%d,%d,%d", + if (rhizome_active_fetch_count()!=0) + INFOF("Rhizome transfer progress: %d,%d,%d,%d,%d,%d (remaining %d)", rhizome_active_fetch_bytes_received(0), rhizome_active_fetch_bytes_received(1), rhizome_active_fetch_bytes_received(2), rhizome_active_fetch_bytes_received(3), - rhizome_active_fetch_bytes_received(4)); + rhizome_active_fetch_bytes_received(4), + rhizome_active_fetch_bytes_received(5), + rhizome_fetch_queue_bytes()); // Report any functions that take too much time if (!config.debug.timing) diff --git a/rhizome.h b/rhizome.h index 2eff24f2..7e8f16ac 100644 --- a/rhizome.h +++ b/rhizome.h @@ -176,6 +176,7 @@ typedef struct rhizome_manifest { extern long long rhizome_space; extern unsigned short rhizome_http_server_port; +int log2ll(unsigned long long x); int rhizome_configure(); int rhizome_enabled(); int rhizome_fetch_delay_ms(); @@ -376,8 +377,8 @@ int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len); /* one manifest is required per candidate, plus a few spare. so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES. */ -#define MAX_RHIZOME_MANIFESTS 24 -#define MAX_CANDIDATES 16 +#define MAX_RHIZOME_MANIFESTS 40 +#define MAX_CANDIDATES 32 int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]); rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length); @@ -656,6 +657,8 @@ enum rhizome_start_fetch_result { enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char sid[SID_SIZE],const unsigned char *prefix, size_t prefix_length); int rhizome_any_fetch_active(); int rhizome_any_fetch_queued(); +int rhizome_fetch_queue_bytes(); +int rhizome_fetch_has_queue_space(unsigned char log2_size); struct http_response_parts { int code; diff --git a/rhizome_database.c b/rhizome_database.c index 8cf3faee..f9f5111b 100644 --- a/rhizome_database.c +++ b/rhizome_database.c @@ -1577,6 +1577,7 @@ int rhizome_is_bar_interesting(unsigned char *bar){ time_ms_t start_time=gettime_ms(); int64_t version = rhizome_bar_version(bar); + unsigned char log2_size = bar[RHIZOME_BAR_FILESIZE_OFFSET]; int ret=1; char id_hex[RHIZOME_MANIFEST_ID_STRLEN]; tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES); @@ -1587,7 +1588,11 @@ int rhizome_is_bar_interesting(unsigned char *bar){ DEBUGF("Ignoring %s", id_hex); RETURN(0); } - + + // do we have free space in a fetch queue? + if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1) + RETURN(0); + // are we already fetching this bundle [or later]? rhizome_manifest *m=rhizome_fetch_search(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES); if (m && m->version >= version) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 27e3a5b0..91b27865 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -103,28 +103,30 @@ struct rhizome_fetch_queue { struct rhizome_fetch_slot active; // must be first element in struct int candidate_queue_size; struct rhizome_fetch_candidate *candidate_queue; - long long size_threshold; // will only hold fetches of fewer than this many bytes + unsigned char log_size_threshold; // will only queue payloads smaller than this. }; /* Static allocation of the candidate queues. */ -struct rhizome_fetch_candidate queue0[5]; -struct rhizome_fetch_candidate queue1[4]; -struct rhizome_fetch_candidate queue2[3]; -struct rhizome_fetch_candidate queue3[2]; -struct rhizome_fetch_candidate queue4[1]; +struct rhizome_fetch_candidate queue0[10]; +struct rhizome_fetch_candidate queue1[8]; +struct rhizome_fetch_candidate queue2[6]; +struct rhizome_fetch_candidate queue3[4]; +struct rhizome_fetch_candidate queue4[2]; +struct rhizome_fetch_candidate queue5[2]; #define NELS(a) (sizeof (a) / sizeof *(a)) #define slotno(slot) ((struct rhizome_fetch_queue *)(slot) - &rhizome_fetch_queues[0]) -/* Static allocation of the queue structures. Must be in order of ascending size_threshold. +/* Static allocation of the queue structures. Must be in order of ascending log_size_threshold. */ struct rhizome_fetch_queue rhizome_fetch_queues[] = { - { .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .size_threshold = 10000, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .size_threshold = 100000, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .size_threshold = 1000000, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .size_threshold = 10000000, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .size_threshold = -1, .active = { .state = RHIZOME_FETCH_FREE } } + { .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .log_size_threshold = 10, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .log_size_threshold = 13, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .log_size_threshold = 16, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .log_size_threshold = 19, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .log_size_threshold = 22, .active = { .state = RHIZOME_FETCH_FREE } }, + { .candidate_queue_size = NELS(queue5), .candidate_queue = queue5, .log_size_threshold = 0xFF, .active = { .state = RHIZOME_FETCH_FREE } } }; #define NQUEUES NELS(rhizome_fetch_queues) @@ -146,6 +148,21 @@ int rhizome_active_fetch_bytes_received(int q) return (int)rhizome_fetch_queues[q].active.write_state.file_offset + rhizome_fetch_queues[q].active.write_state.data_size; } +int rhizome_fetch_queue_bytes(){ + int i,j,bytes=0; + for(i=0;ifileLength - received; + } + for (j=0;jfileLength; + } + } + return bytes; +} + static struct sched_ent sched_activate = STRUCT_SCHED_ENT_UNUSED; static struct profile_total fetch_stats; @@ -157,9 +174,10 @@ static struct profile_total fetch_stats; static struct rhizome_fetch_queue *rhizome_find_queue(long long size) { int i; + unsigned char log_size = log2ll(size); for (i = 0; i < NQUEUES; ++i) { struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; - if (q->size_threshold < 0 || size < q->size_threshold) + if (log_size < q->log_size_threshold) return q; } return NULL; @@ -174,9 +192,10 @@ static struct rhizome_fetch_queue *rhizome_find_queue(long long size) static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size) { int i; + unsigned char log_size = log2ll(size); for (i = 0; i < NQUEUES; ++i) { struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; - if ((q->size_threshold < 0 || size < q->size_threshold) && q->active.state == RHIZOME_FETCH_FREE) + if (log_size < q->log_size_threshold && q->active.state == RHIZOME_FETCH_FREE) return &q->active; } return NULL; @@ -873,6 +892,23 @@ rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length){ return NULL; } +/* Do we have space to add a fetch candidate of this size? */ +int rhizome_fetch_has_queue_space(unsigned char log2_size){ + int i; + for (i = 0; i < NQUEUES; ++i) { + struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; + if (log2_size < q->log_size_threshold){ + // is there an empty candidate? + int j=0; + for (j=0;j < q->candidate_queue_size;j++) + if (!q->candidate_queue[j].manifest) + return 1; + return 0; + } + } + return 0; +} + /* Queue a fetch for the payload of the given manifest. If 'peerip' is not NULL, then it is used as * the port and IP address of an HTTP server from which the fetch is performed. Otherwise the fetch * is performed over MDP.