fixed bugs with rhizome over mdp receive for files >32*200 bytes

long.  also we request next block of data immediately after
receiving all of the previous block.
This commit is contained in:
gardners 2012-12-01 07:37:22 +10:30
parent 21f562122e
commit ac734ff00d
2 changed files with 39 additions and 16 deletions

View File

@ -135,7 +135,6 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
{
IN();
DEBUGF("Someone sent me a rhizome REPLY via MDP");
if (!mdp->out.payload_length) RETURN(-1);
@ -150,6 +149,7 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]);
int count=mdp->out.payload_length-(1+16+8+8);
unsigned char *bytes=&mdp->out.payload[1+16+8+8];
if (0)
DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx",
count,offset,alloca_tohex(bidprefix,16),version);

View File

@ -76,6 +76,7 @@ struct rhizome_fetch_slot {
int64_t mdpNextTX;
int64_t mdpLastRX;
int mdpIdleTimeout;
int mdpResponsesReceived;
int64_t mdpRXWindowStart;
int mdpRXBlockLength;
uint32_t mdpRXBitmap;
@ -977,7 +978,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
{
if (debug & DEBUG_RHIZOME_RX)
// if (debug & DEBUG_RHIZOME_RX)
DEBUGF("close Rhizome fetch slot=%d", slotno(slot));
assert(slot->state != RHIZOME_FETCH_FREE);
@ -1032,6 +1033,11 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
DEBUGF("MDP connection timed out");
return rhizome_fetch_close(slot);
}
// only issue new requests every 133ms.
// we automatically re-issue once we have received all packets in this
// request also, so if there is no packet loss, we can go substantially
// faster. Optimising behaviour when there is no packet loss is an
// outstanding task.
slot->mdpNextTX=gettime_ms()+133;
overlay_mdp_frame mdp;
@ -1053,11 +1059,17 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap);
write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength);
DEBUGF("src sid=%s, dst sid=%s",
alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid));
DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x",
alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid),
slot->mdpRXWindowStart);
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
// remember when we sent the request so that we can adjust the inter-request
// interval based on how fast the packets arrive.
slot->mdpResponsesReceived=0;
unschedule(&slot->alarm);
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=slot->mdpNextTX;
schedule(&slot->alarm);
@ -1253,8 +1265,6 @@ int rhizome_received_content(unsigned char *bidprefix,
if (!bcmp(rhizome_fetch_queues[i].active.bid,bidprefix,
16))
{
DEBUGF("This response matches slot 0x%p",
rhizome_fetch_queues[i].active);
struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active;
if (slot->file_ofs==offset) {
debug=DEBUG_RHIZOME_RX;
@ -1263,14 +1273,25 @@ int rhizome_received_content(unsigned char *bidprefix,
file is yet to come. */
if (type=='T') slot->file_len=offset+count;
else slot->file_len=offset+count+1;
DEBUGF("Trying to write %d bytes @ %d (file len = %d)",
count,(int)slot->file_ofs,(int)slot->file_len);
dump("content", bytes,count);
rhizome_write_content(slot,(char *)bytes,count);
debug=0;
slot->mdpRXWindowStart=offset+count;
slot->mdpLastRX=gettime_ms();
// TODO: Shift bitmap
unschedule(&slot->alarm);
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=slot->mdpNextTX;
schedule(&slot->alarm);
slot->mdpResponsesReceived++;
if (slot->mdpResponsesReceived==32) {
// We have received all responses, so immediately ask for more
rhizome_fetch_mdp_requestblocks(slot);
}
RETURN(0);
} else {
// TODO: Implement out-of-order reception so that lost packets
@ -1279,11 +1300,13 @@ int rhizome_received_content(unsigned char *bidprefix,
RETURN(0);
}
else
if (0)
DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s",
alloca_tohex(bidprefix,16),
alloca_tohex_bid(rhizome_fetch_queues[i].active.bid));
}
}
RETURN(-1);
}