From ac734ff00d41060c3cea992f1ea46fcc0d668bb9 Mon Sep 17 00:00:00 2001 From: gardners Date: Sat, 1 Dec 2012 07:37:22 +1030 Subject: [PATCH] fixed bugs with rhizome over mdp receive for files >32*200 bytes long. also we request next block of data immediately after receiving all of the previous block. --- overlay_mdp_services.c | 6 +++--- rhizome_fetch.c | 49 +++++++++++++++++++++++++++++++----------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index 4dac4572..c00db07f 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -135,7 +135,6 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) { IN(); - DEBUGF("Someone sent me a rhizome REPLY via MDP"); if (!mdp->out.payload_length) RETURN(-1); @@ -150,8 +149,9 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); int count=mdp->out.payload_length-(1+16+8+8); unsigned char *bytes=&mdp->out.payload[1+16+8+8]; - DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx", - count,offset,alloca_tohex(bidprefix,16),version); + if (0) + DEBUGF("Received %d bytes @ 0x%llx for %s* version 0x%llx", + count,offset,alloca_tohex(bidprefix,16),version); /* Now see if there is a slot that matches. If so, then see if the bytes are in the window, and write them. diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 7aeadb4c..d0227ba4 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -76,6 +76,7 @@ struct rhizome_fetch_slot { int64_t mdpNextTX; int64_t mdpLastRX; int mdpIdleTimeout; + int mdpResponsesReceived; int64_t mdpRXWindowStart; int mdpRXBlockLength; uint32_t mdpRXBitmap; @@ -977,7 +978,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { - if (debug & DEBUG_RHIZOME_RX) + // if (debug & DEBUG_RHIZOME_RX) DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); assert(slot->state != RHIZOME_FETCH_FREE); @@ -1032,6 +1033,11 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) DEBUGF("MDP connection timed out"); return rhizome_fetch_close(slot); } + // only issue new requests every 133ms. + // we automatically re-issue once we have received all packets in this + // request also, so if there is no packet loss, we can go substantially + // faster. Optimising behaviour when there is no packet loss is an + // outstanding task. slot->mdpNextTX=gettime_ms()+133; overlay_mdp_frame mdp; @@ -1053,11 +1059,17 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); - DEBUGF("src sid=%s, dst sid=%s", - alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid)); + DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", + alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), + slot->mdpRXWindowStart); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + // remember when we sent the request so that we can adjust the inter-request + // interval based on how fast the packets arrive. + slot->mdpResponsesReceived=0; + + unschedule(&slot->alarm); slot->alarm.function = rhizome_fetch_mdp_slot_callback; slot->alarm.alarm=slot->mdpNextTX; schedule(&slot->alarm); @@ -1253,8 +1265,6 @@ int rhizome_received_content(unsigned char *bidprefix, if (!bcmp(rhizome_fetch_queues[i].active.bid,bidprefix, 16)) { - DEBUGF("This response matches slot 0x%p", - rhizome_fetch_queues[i].active); struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active; if (slot->file_ofs==offset) { debug=DEBUG_RHIZOME_RX; @@ -1263,14 +1273,25 @@ int rhizome_received_content(unsigned char *bidprefix, file is yet to come. */ if (type=='T') slot->file_len=offset+count; else slot->file_len=offset+count+1; - DEBUGF("Trying to write %d bytes @ %d (file len = %d)", - count,(int)slot->file_ofs,(int)slot->file_len); - dump("content", bytes,count); rhizome_write_content(slot,(char *)bytes,count); debug=0; slot->mdpRXWindowStart=offset+count; - slot->mdpLastRX=gettime_ms(); + slot->mdpLastRX=gettime_ms(); + // TODO: Shift bitmap + + unschedule(&slot->alarm); + slot->alarm.function = rhizome_fetch_mdp_slot_callback; + slot->alarm.alarm=slot->mdpNextTX; + schedule(&slot->alarm); + + slot->mdpResponsesReceived++; + if (slot->mdpResponsesReceived==32) { + // We have received all responses, so immediately ask for more + rhizome_fetch_mdp_requestblocks(slot); + } + + RETURN(0); } else { // TODO: Implement out-of-order reception so that lost packets @@ -1279,11 +1300,13 @@ int rhizome_received_content(unsigned char *bidprefix, RETURN(0); } else - DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s", - alloca_tohex(bidprefix,16), - alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); + if (0) + DEBUGF("Doesn't match this slot, because BIDs don't match: %s* vs %s", + alloca_tohex(bidprefix,16), + alloca_tohex_bid(rhizome_fetch_queues[i].active.bid)); } - } + } + RETURN(-1); }