From b7ae55c14300c1cadc092f459350807fca965d1c Mon Sep 17 00:00:00 2001 From: gardners Date: Sat, 1 Dec 2012 10:47:19 +1030 Subject: [PATCH] receiving multiple bundles via rhizome over mdp works now, although there is a bug with alarms being called on fetch slots after they complete, even though they apparently get unscheduled. --- rhizome_fetch.c | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/rhizome_fetch.c b/rhizome_fetch.c index d2df08c4..6217ab0f 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -979,7 +979,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { // if (debug & DEBUG_RHIZOME_RX) - DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); + DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); assert(slot->state != RHIZOME_FETCH_FREE); /* close socket and stop watching it */ @@ -987,6 +987,7 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) unschedule(&slot->alarm); close(slot->alarm.poll.fd); slot->alarm.poll.fd = -1; + slot->alarm.function=NULL; /* Free ephemeral data */ if (slot->file) @@ -1012,6 +1013,13 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) { struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; + + if (slot->state!=5) { + DEBUGF("Stale alarm triggered on idle/reclaimed slot. Ignoring"); + unschedule(alarm); + return; + } + long long now=gettime_ms(); if (now-slot->mdpLastRX>slot->mdpIdleTimeout) { DEBUGF("MDP connection timed out: last RX %lldms ago", @@ -1019,8 +1027,8 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) rhizome_fetch_close(slot); return; } - DEBUGF("now-lastRX(0x%llx) <= idleTimeout(0x%llx)", - now-slot->mdpLastRX,slot->mdpIdleTimeout); + DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p", + slot); if (slot->bidP) rhizome_fetch_mdp_requestblocks(slot); else @@ -1120,7 +1128,7 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) { - DEBUGF("Trying to switch to MDP for Rhizome fetch"); + DEBUGF("Trying to switch to MDP for Rhizome fetch: slot=0x%p",slot); /* close socket and stop watching it */ unwatch(&slot->alarm); @@ -1136,6 +1144,8 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) 3. Set timeout for no traffic received. */ + slot->state=RHIZOME_FETCH_RXFILEMDP; + slot->mdpLastRX=gettime_ms(); if (slot->bidP) { /* We are requesting a file. The http request may have already received @@ -1245,7 +1255,7 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by } } } - DEBUGF("Closing rhizome fetch slot"); + DEBUGF("Closing rhizome fetch slot = 0x%p",slot); rhizome_fetch_close(slot); return; } @@ -1263,11 +1273,11 @@ int rhizome_received_content(unsigned char *bidprefix, IN(); int i; for(i=0;istate==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) { + if (!bcmp(slot->bid,bidprefix,16)) { - struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active; + if (slot->file_ofs==offset) { debug=DEBUG_RHIZOME_RX; /* We don't know the file length until we receive the last @@ -1302,8 +1312,8 @@ int rhizome_received_content(unsigned char *bidprefix, RETURN(0); } else - if (0) - DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s", + if (1) + DEBUGF("Doesn't match this slot = 0x%p, because BIDs don't match: %s* vs %s", alloca_tohex(bidprefix,16), alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); }