mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-01-30 16:13:51 +00:00
Start fixing Rhizome stress bugs
This commit does not compile.
This commit is contained in:
parent
1eeb040602
commit
5ee82689d2
@ -34,31 +34,25 @@ typedef struct rhizome_file_fetch_record {
|
|||||||
char fileid[RHIZOME_FILEHASH_STRLEN + 1];
|
char fileid[RHIZOME_FILEHASH_STRLEN + 1];
|
||||||
FILE *file;
|
FILE *file;
|
||||||
char filename[1024];
|
char filename[1024];
|
||||||
|
|
||||||
char request[1024];
|
char request[1024];
|
||||||
int request_len;
|
int request_len;
|
||||||
int request_ofs;
|
int request_ofs;
|
||||||
|
|
||||||
long long file_len;
|
long long file_len;
|
||||||
long long file_ofs;
|
long long file_ofs;
|
||||||
|
|
||||||
int state;
|
int state;
|
||||||
|
|
||||||
#define RHIZOME_FETCH_CONNECTING 1
|
#define RHIZOME_FETCH_CONNECTING 1
|
||||||
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 2
|
#define RHIZOME_FETCH_SENDINGHTTPREQUEST 2
|
||||||
#define RHIZOME_FETCH_RXHTTPHEADERS 3
|
#define RHIZOME_FETCH_RXHTTPHEADERS 3
|
||||||
#define RHIZOME_FETCH_RXFILE 4
|
#define RHIZOME_FETCH_RXFILE 4
|
||||||
|
|
||||||
struct sockaddr_in peer;
|
struct sockaddr_in peer;
|
||||||
|
|
||||||
} rhizome_file_fetch_record;
|
} rhizome_file_fetch_record;
|
||||||
|
|
||||||
struct profile_total fetch_stats;
|
struct profile_total fetch_stats;
|
||||||
|
|
||||||
/* List of queued transfers */
|
/* List of queued transfers */
|
||||||
#define MAX_QUEUED_FILES 4
|
#define MAX_QUEUED_FILES 4
|
||||||
int rhizome_file_fetch_queue_count=0;
|
|
||||||
rhizome_file_fetch_record file_fetch_queue[MAX_QUEUED_FILES];
|
rhizome_file_fetch_record file_fetch_queue[MAX_QUEUED_FILES];
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Queue a manifest for importing.
|
Queue a manifest for importing.
|
||||||
|
|
||||||
@ -526,10 +520,9 @@ void rhizome_enqueue_suggestions(struct sched_ent *alarm)
|
|||||||
int i;
|
int i;
|
||||||
for(i=0;i<candidate_count;i++)
|
for(i=0;i<candidate_count;i++)
|
||||||
{
|
{
|
||||||
if (rhizome_file_fetch_queue_count>=MAX_QUEUED_FILES)
|
|
||||||
break;
|
|
||||||
int manifest_kept = 0;
|
int manifest_kept = 0;
|
||||||
rhizome_queue_manifest_import(candidates[i].manifest,&candidates[i].peer, &manifest_kept);
|
if (rhizome_queue_manifest_import(candidates[i].manifest,&candidates[i].peer, &manifest_kept) == )
|
||||||
|
break;
|
||||||
if (!manifest_kept) {
|
if (!manifest_kept) {
|
||||||
rhizome_manifest_free(candidates[i].manifest);
|
rhizome_manifest_free(candidates[i].manifest);
|
||||||
candidates[i].manifest = NULL;
|
candidates[i].manifest = NULL;
|
||||||
@ -551,6 +544,13 @@ void rhizome_enqueue_suggestions(struct sched_ent *alarm)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Returns 0 if the fetch was queued.
|
||||||
|
* Returns 1 if the fetch was not queued because a newer version of the same bundle is already
|
||||||
|
* present.
|
||||||
|
* Returns 2 if the fetch was not queued because the queue is full.
|
||||||
|
* Returns 3 if the same fetch was already queued.
|
||||||
|
* Returns -1 on error.
|
||||||
|
*/
|
||||||
int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip, int *manifest_kept)
|
int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peerip, int *manifest_kept)
|
||||||
{
|
{
|
||||||
*manifest_kept = 0;
|
*manifest_kept = 0;
|
||||||
@ -572,7 +572,7 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
|
|||||||
the cache slot number to implicitly store the first bits.
|
the cache slot number to implicitly store the first bits.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (1||debug & DEBUG_RHIZOME_RX)
|
if (1||(debug & DEBUG_RHIZOME_RX))
|
||||||
DEBUGF("Fetching manifest bid=%s version=%lld size=%lld:", bid, m->version, filesize);
|
DEBUGF("Fetching manifest bid=%s version=%lld size=%lld:", bid, m->version, filesize);
|
||||||
|
|
||||||
if (rhizome_manifest_version_cache_lookup(m)) {
|
if (rhizome_manifest_version_cache_lookup(m)) {
|
||||||
@ -584,18 +584,25 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
|
|||||||
if (debug & DEBUG_RHIZOME_RX)
|
if (debug & DEBUG_RHIZOME_RX)
|
||||||
DEBUGF(" is new");
|
DEBUGF(" is new");
|
||||||
|
|
||||||
|
#if 0
|
||||||
/* Don't queue if queue slots already full */
|
/* Don't queue if queue slots already full */
|
||||||
if (rhizome_file_fetch_queue_count >= MAX_QUEUED_FILES) {
|
if (rhizome_file_fetch_queue_count >= MAX_QUEUED_FILES) {
|
||||||
if (debug & DEBUG_RHIZOME_RX)
|
if (debug & DEBUG_RHIZOME_RX)
|
||||||
DEBUG(" all fetch queue slots full");
|
DEBUG(" all fetch queue slots full");
|
||||||
return 2;
|
return 2;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Don't queue if already queued */
|
/* Don't queue if already queued. If a fetch of an older version is already queued, then this
|
||||||
|
* logic will let it run to completion before the fetch of the newer version is queued. This
|
||||||
|
* avoids the problem of indefinite postponement of fetching if new versions are constantly being
|
||||||
|
* published faster than we can fetch them.
|
||||||
|
*/
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; i < rhizome_file_fetch_queue_count; ++i) {
|
for (i = 0; i < MAX_QUEUED_FILES; ++i) {
|
||||||
if (file_fetch_queue[i].manifest) {
|
if ( file_fetch_queue[i].manifest
|
||||||
if (memcmp(m->cryptoSignPublic, file_fetch_queue[i].manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) {
|
&& memcmp(m->cryptoSignPublic, file_fetch_queue[i].manifest->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0
|
||||||
|
) {
|
||||||
if (debug & DEBUG_RHIZOME_RX)
|
if (debug & DEBUG_RHIZOME_RX)
|
||||||
DEBUGF(" manifest fetch already queued");
|
DEBUGF(" manifest fetch already queued");
|
||||||
return 3;
|
return 3;
|
||||||
@ -620,8 +627,8 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
|
|||||||
if (gotfile == 0) {
|
if (gotfile == 0) {
|
||||||
/* We need to get the file, unless already queued */
|
/* We need to get the file, unless already queued */
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; i < rhizome_file_fetch_queue_count; ++i) {
|
for (i = 0; i < MAX_QUEUED_FILES; ++i) {
|
||||||
if (strcasecmp(m->fileHexHash, file_fetch_queue[i].fileid) == 0) {
|
if (m->manifest && strcasecmp(m->fileHexHash, file_fetch_queue[i].fileid) == 0) {
|
||||||
if (debug & DEBUG_RHIZOME_RX)
|
if (debug & DEBUG_RHIZOME_RX)
|
||||||
DEBUGF("Payload fetch already queued, slot %d filehash=%s", m->fileHexHash);
|
DEBUGF("Payload fetch already queued, slot %d filehash=%s", m->fileHexHash);
|
||||||
return 0;
|
return 0;
|
||||||
@ -657,9 +664,9 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Find an unused
|
||||||
rhizome_file_fetch_record *q=&file_fetch_queue[rhizome_file_fetch_queue_count];
|
rhizome_file_fetch_record *q=&file_fetch_queue[rhizome_file_fetch_queue_count];
|
||||||
q->manifest = m;
|
q->manifest = m;
|
||||||
*manifest_kept = 1;
|
|
||||||
bcopy(&addr,&q->peer,sizeof(q->peer));
|
bcopy(&addr,&q->peer,sizeof(q->peer));
|
||||||
q->alarm.poll.fd=sock;
|
q->alarm.poll.fd=sock;
|
||||||
strncpy(q->fileid, m->fileHexHash, RHIZOME_FILEHASH_STRLEN + 1);
|
strncpy(q->fileid, m->fileHexHash, RHIZOME_FILEHASH_STRLEN + 1);
|
||||||
@ -707,6 +714,7 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
|
|||||||
schedule(&q->alarm);
|
schedule(&q->alarm);
|
||||||
|
|
||||||
rhizome_file_fetch_queue_count++;
|
rhizome_file_fetch_queue_count++;
|
||||||
|
*manifest_kept = 1;
|
||||||
if (debug & DEBUG_RHIZOME_RX)
|
if (debug & DEBUG_RHIZOME_RX)
|
||||||
DEBUGF("Queued file for fetching into %s (%d in queue)",
|
DEBUGF("Queued file for fetching into %s (%d in queue)",
|
||||||
q->manifest->dataFileName, rhizome_file_fetch_queue_count);
|
q->manifest->dataFileName, rhizome_file_fetch_queue_count);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user