From 5ee82689d283f13d1b160b86eca3c8c16b748b0e Mon Sep 17 00:00:00 2001 From: Andrew Bettison Date: Wed, 17 Oct 2012 18:15:39 +1030 Subject: [PATCH] Start fixing Rhizome stress bugs This commit does not compile. --- rhizome_fetch.c | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index c92034fa..bd581beb 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -34,31 +34,25 @@ typedef struct rhizome_file_fetch_record { char fileid[RHIZOME_FILEHASH_STRLEN + 1]; FILE *file; char filename[1024]; - char request[1024]; int request_len; int request_ofs; - long long file_len; long long file_ofs; - int state; - #define RHIZOME_FETCH_CONNECTING 1 #define RHIZOME_FETCH_SENDINGHTTPREQUEST 2 #define RHIZOME_FETCH_RXHTTPHEADERS 3 #define RHIZOME_FETCH_RXFILE 4 - struct sockaddr_in peer; - } rhizome_file_fetch_record; struct profile_total fetch_stats; /* List of queued transfers */ #define MAX_QUEUED_FILES 4 -int rhizome_file_fetch_queue_count=0; rhizome_file_fetch_record file_fetch_queue[MAX_QUEUED_FILES]; + /* Queue a manifest for importing. @@ -526,10 +520,9 @@ void rhizome_enqueue_suggestions(struct sched_ent *alarm) int i; for(i=0;i=MAX_QUEUED_FILES) - break; int manifest_kept = 0; - rhizome_queue_manifest_import(candidates[i].manifest,&candidates[i].peer, &manifest_kept); + if (rhizome_queue_manifest_import(candidates[i].manifest,&candidates[i].peer, &manifest_kept) == ) + break; if (!manifest_kept) { rhizome_manifest_free(candidates[i].manifest); candidates[i].manifest = NULL; @@ -551,6 +544,13 @@ void rhizome_enqueue_suggestions(struct sched_ent *alarm) return; } +/* Returns 0 if the fetch was queued. + * Returns 1 if the fetch was not queued because a newer version of the same bundle is already + * present. + * Returns 2 if the fetch was not queued because the queue is full. + * Returns 3 if the same fetch was already queued. + * Returns -1 on error. + */ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip, int *manifest_kept) { *manifest_kept = 0; @@ -572,7 +572,7 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri the cache slot number to implicitly store the first bits. */ - if (1||debug & DEBUG_RHIZOME_RX) + if (1||(debug & DEBUG_RHIZOME_RX)) DEBUGF("Fetching manifest bid=%s version=%lld size=%lld:", bid, m->version, filesize); if (rhizome_manifest_version_cache_lookup(m)) { @@ -584,18 +584,25 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri if (debug & DEBUG_RHIZOME_RX) DEBUGF(" is new"); +#if 0 /* Don't queue if queue slots already full */ if (rhizome_file_fetch_queue_count >= MAX_QUEUED_FILES) { if (debug & DEBUG_RHIZOME_RX) DEBUG(" all fetch queue slots full"); return 2; } +#endif - /* Don't queue if already queued */ + /* Don't queue if already queued. If a fetch of an older version is already queued, then this + * logic will let it run to completion before the fetch of the newer version is queued. This + * avoids the problem of indefinite postponement of fetching if new versions are constantly being + * published faster than we can fetch them. + */ int i; - for (i = 0; i < rhizome_file_fetch_queue_count; ++i) { - if (file_fetch_queue[i].manifest) { - if (memcmp(m->cryptoSignPublic, file_fetch_queue[i].manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) { + for (i = 0; i < MAX_QUEUED_FILES; ++i) { + if ( file_fetch_queue[i].manifest + && memcmp(m->cryptoSignPublic, file_fetch_queue[i].manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0 + ) { if (debug & DEBUG_RHIZOME_RX) DEBUGF(" manifest fetch already queued"); return 3; @@ -620,8 +627,8 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri if (gotfile == 0) { /* We need to get the file, unless already queued */ int i; - for (i = 0; i < rhizome_file_fetch_queue_count; ++i) { - if (strcasecmp(m->fileHexHash, file_fetch_queue[i].fileid) == 0) { + for (i = 0; i < MAX_QUEUED_FILES; ++i) { + if (m->manifest && strcasecmp(m->fileHexHash, file_fetch_queue[i].fileid) == 0) { if (debug & DEBUG_RHIZOME_RX) DEBUGF("Payload fetch already queued, slot %d filehash=%s", m->fileHexHash); return 0; @@ -657,9 +664,9 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri return -1; } } + // Find an unused rhizome_file_fetch_record *q=&file_fetch_queue[rhizome_file_fetch_queue_count]; q->manifest = m; - *manifest_kept = 1; bcopy(&addr,&q->peer,sizeof(q->peer)); q->alarm.poll.fd=sock; strncpy(q->fileid, m->fileHexHash, RHIZOME_FILEHASH_STRLEN + 1); @@ -707,6 +714,7 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri schedule(&q->alarm); rhizome_file_fetch_queue_count++; + *manifest_kept = 1; if (debug & DEBUG_RHIZOME_RX) DEBUGF("Queued file for fetching into %s (%d in queue)", q->manifest->dataFileName, rhizome_file_fetch_queue_count);