Don't ask for manifests if we don't have room in transfer queues

This commit is contained in:
Jeremy Lakeman 2013-04-11 15:24:13 +09:30
parent 76d7743efc
commit 96c0889f9a
4 changed files with 66 additions and 24 deletions

View File

@ -151,17 +151,15 @@ int fd_showstats()
// Show periodic rhizome transfer information, but only
// if there are some active rhizome transfers.
if ((rhizome_active_fetch_bytes_received(0)+
rhizome_active_fetch_bytes_received(1)+
rhizome_active_fetch_bytes_received(2)+
rhizome_active_fetch_bytes_received(3)+
rhizome_active_fetch_bytes_received(4))!=-5)
INFOF("Rhizome transfer progress: %d,%d,%d,%d,%d",
if (rhizome_active_fetch_count()!=0)
INFOF("Rhizome transfer progress: %d,%d,%d,%d,%d,%d (remaining %d)",
rhizome_active_fetch_bytes_received(0),
rhizome_active_fetch_bytes_received(1),
rhizome_active_fetch_bytes_received(2),
rhizome_active_fetch_bytes_received(3),
rhizome_active_fetch_bytes_received(4));
rhizome_active_fetch_bytes_received(4),
rhizome_active_fetch_bytes_received(5),
rhizome_fetch_queue_bytes());
// Report any functions that take too much time
if (!config.debug.timing)

View File

@ -176,6 +176,7 @@ typedef struct rhizome_manifest {
extern long long rhizome_space;
extern unsigned short rhizome_http_server_port;
int log2ll(unsigned long long x);
int rhizome_configure();
int rhizome_enabled();
int rhizome_fetch_delay_ms();
@ -376,8 +377,8 @@ int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len);
/* one manifest is required per candidate, plus a few spare.
so MAX_RHIZOME_MANIFESTS must be > MAX_CANDIDATES.
*/
#define MAX_RHIZOME_MANIFESTS 24
#define MAX_CANDIDATES 16
#define MAX_RHIZOME_MANIFESTS 40
#define MAX_CANDIDATES 32
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length);
@ -656,6 +657,8 @@ enum rhizome_start_fetch_result {
enum rhizome_start_fetch_result rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char sid[SID_SIZE],const unsigned char *prefix, size_t prefix_length);
int rhizome_any_fetch_active();
int rhizome_any_fetch_queued();
int rhizome_fetch_queue_bytes();
int rhizome_fetch_has_queue_space(unsigned char log2_size);
struct http_response_parts {
int code;

View File

@ -1577,6 +1577,7 @@ int rhizome_is_bar_interesting(unsigned char *bar){
time_ms_t start_time=gettime_ms();
int64_t version = rhizome_bar_version(bar);
unsigned char log2_size = bar[RHIZOME_BAR_FILESIZE_OFFSET];
int ret=1;
char id_hex[RHIZOME_MANIFEST_ID_STRLEN];
tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
@ -1587,7 +1588,11 @@ int rhizome_is_bar_interesting(unsigned char *bar){
DEBUGF("Ignoring %s", id_hex);
RETURN(0);
}
// do we have free space in a fetch queue?
if (log2_size!=0xFF && rhizome_fetch_has_queue_space(log2_size)!=1)
RETURN(0);
// are we already fetching this bundle [or later]?
rhizome_manifest *m=rhizome_fetch_search(&bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES);
if (m && m->version >= version)

View File

@ -103,28 +103,30 @@ struct rhizome_fetch_queue {
struct rhizome_fetch_slot active; // must be first element in struct
int candidate_queue_size;
struct rhizome_fetch_candidate *candidate_queue;
long long size_threshold; // will only hold fetches of fewer than this many bytes
unsigned char log_size_threshold; // will only queue payloads smaller than this.
};
/* Static allocation of the candidate queues.
*/
struct rhizome_fetch_candidate queue0[5];
struct rhizome_fetch_candidate queue1[4];
struct rhizome_fetch_candidate queue2[3];
struct rhizome_fetch_candidate queue3[2];
struct rhizome_fetch_candidate queue4[1];
struct rhizome_fetch_candidate queue0[10];
struct rhizome_fetch_candidate queue1[8];
struct rhizome_fetch_candidate queue2[6];
struct rhizome_fetch_candidate queue3[4];
struct rhizome_fetch_candidate queue4[2];
struct rhizome_fetch_candidate queue5[2];
#define NELS(a) (sizeof (a) / sizeof *(a))
#define slotno(slot) ((struct rhizome_fetch_queue *)(slot) - &rhizome_fetch_queues[0])
/* Static allocation of the queue structures. Must be in order of ascending size_threshold.
/* Static allocation of the queue structures. Must be in order of ascending log_size_threshold.
*/
struct rhizome_fetch_queue rhizome_fetch_queues[] = {
{ .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .size_threshold = 10000, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .size_threshold = 100000, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .size_threshold = 1000000, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .size_threshold = 10000000, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .size_threshold = -1, .active = { .state = RHIZOME_FETCH_FREE } }
{ .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .log_size_threshold = 10, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .log_size_threshold = 13, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .log_size_threshold = 16, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .log_size_threshold = 19, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .log_size_threshold = 22, .active = { .state = RHIZOME_FETCH_FREE } },
{ .candidate_queue_size = NELS(queue5), .candidate_queue = queue5, .log_size_threshold = 0xFF, .active = { .state = RHIZOME_FETCH_FREE } }
};
#define NQUEUES NELS(rhizome_fetch_queues)
@ -146,6 +148,21 @@ int rhizome_active_fetch_bytes_received(int q)
return (int)rhizome_fetch_queues[q].active.write_state.file_offset + rhizome_fetch_queues[q].active.write_state.data_size;
}
int rhizome_fetch_queue_bytes(){
int i,j,bytes=0;
for(i=0;i<NQUEUES;i++){
if (rhizome_fetch_queues[i].active.state!=RHIZOME_FETCH_FREE){
int received=rhizome_fetch_queues[i].active.write_state.file_offset + rhizome_fetch_queues[i].active.write_state.data_size;
bytes+=rhizome_fetch_queues[i].active.manifest->fileLength - received;
}
for (j=0;j<rhizome_fetch_queues[i].candidate_queue_size;j++){
if (rhizome_fetch_queues[i].candidate_queue[j].manifest)
bytes+=rhizome_fetch_queues[i].candidate_queue[j].manifest->fileLength;
}
}
return bytes;
}
static struct sched_ent sched_activate = STRUCT_SCHED_ENT_UNUSED;
static struct profile_total fetch_stats;
@ -157,9 +174,10 @@ static struct profile_total fetch_stats;
static struct rhizome_fetch_queue *rhizome_find_queue(long long size)
{
int i;
unsigned char log_size = log2ll(size);
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
if (q->size_threshold < 0 || size < q->size_threshold)
if (log_size < q->log_size_threshold)
return q;
}
return NULL;
@ -174,9 +192,10 @@ static struct rhizome_fetch_queue *rhizome_find_queue(long long size)
static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size)
{
int i;
unsigned char log_size = log2ll(size);
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
if ((q->size_threshold < 0 || size < q->size_threshold) && q->active.state == RHIZOME_FETCH_FREE)
if (log_size < q->log_size_threshold && q->active.state == RHIZOME_FETCH_FREE)
return &q->active;
}
return NULL;
@ -873,6 +892,23 @@ rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length){
return NULL;
}
/* Do we have space to add a fetch candidate of this size? */
int rhizome_fetch_has_queue_space(unsigned char log2_size){
int i;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
if (log2_size < q->log_size_threshold){
// is there an empty candidate?
int j=0;
for (j=0;j < q->candidate_queue_size;j++)
if (!q->candidate_queue[j].manifest)
return 1;
return 0;
}
}
return 0;
}
/* Queue a fetch for the payload of the given manifest. If 'peerip' is not NULL, then it is used as
* the port and IP address of an HTTP server from which the fetch is performed. Otherwise the fetch
* is performed over MDP.