receiving multiple bundles via rhizome over mdp works now, although

there is a bug with alarms being called on fetch slots after they
complete, even though they apparently get unscheduled.
This commit is contained in:
gardners 2012-12-01 10:47:19 +10:30
parent 55df66f6d6
commit b7ae55c143

View File

@ -979,7 +979,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
{
// if (debug & DEBUG_RHIZOME_RX)
DEBUGF("close Rhizome fetch slot=%d", slotno(slot));
DEBUGF("close Rhizome fetch slot=%d", slotno(slot));
assert(slot->state != RHIZOME_FETCH_FREE);
/* close socket and stop watching it */
@ -987,6 +987,7 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
unschedule(&slot->alarm);
close(slot->alarm.poll.fd);
slot->alarm.poll.fd = -1;
slot->alarm.function=NULL;
/* Free ephemeral data */
if (slot->file)
@ -1012,6 +1013,13 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
{
struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm;
if (slot->state!=5) {
DEBUGF("Stale alarm triggered on idle/reclaimed slot. Ignoring");
unschedule(alarm);
return;
}
long long now=gettime_ms();
if (now-slot->mdpLastRX>slot->mdpIdleTimeout) {
DEBUGF("MDP connection timed out: last RX %lldms ago",
@ -1019,8 +1027,8 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
rhizome_fetch_close(slot);
return;
}
DEBUGF("now-lastRX(0x%llx) <= idleTimeout(0x%llx)",
now-slot->mdpLastRX,slot->mdpIdleTimeout);
DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p",
slot);
if (slot->bidP)
rhizome_fetch_mdp_requestblocks(slot);
else
@ -1120,7 +1128,7 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot)
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
{
DEBUGF("Trying to switch to MDP for Rhizome fetch");
DEBUGF("Trying to switch to MDP for Rhizome fetch: slot=0x%p",slot);
/* close socket and stop watching it */
unwatch(&slot->alarm);
@ -1136,6 +1144,8 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
3. Set timeout for no traffic received.
*/
slot->state=RHIZOME_FETCH_RXFILEMDP;
slot->mdpLastRX=gettime_ms();
if (slot->bidP) {
/* We are requesting a file. The http request may have already received
@ -1245,7 +1255,7 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
}
}
}
DEBUGF("Closing rhizome fetch slot");
DEBUGF("Closing rhizome fetch slot = 0x%p",slot);
rhizome_fetch_close(slot);
return;
}
@ -1263,11 +1273,11 @@ int rhizome_received_content(unsigned char *bidprefix,
IN();
int i;
for(i=0;i<NQUEUES;i++) {
if (rhizome_fetch_queues[i].active.bidP) {
if (!bcmp(rhizome_fetch_queues[i].active.bid,bidprefix,
16))
struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active;
if (slot->state==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) {
if (!bcmp(slot->bid,bidprefix,16))
{
struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active;
if (slot->file_ofs==offset) {
debug=DEBUG_RHIZOME_RX;
/* We don't know the file length until we receive the last
@ -1302,8 +1312,8 @@ int rhizome_received_content(unsigned char *bidprefix,
RETURN(0);
}
else
if (0)
DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s",
if (1)
DEBUGF("Doesn't match this slot = 0x%p, because BIDs don't match: %s* vs %s",
alloca_tohex(bidprefix,16),
alloca_tohex_bid(rhizome_fetch_queues[i].active.bid));
}