diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index f9a79d6e..52ac6087 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -88,7 +88,12 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) // for low devices. The result is that an attacker can prevent rhizome transfers // if they want to by injecting fake blocks. The alternative is to not broadcast // back replies, and then we can authcrypt. - reply.packetTypeAndFlags=MDP_TX|MDP_NOSIGN|MDP_NOCRYPT; + // multiple receivers starting at different times, we really need merkle-tree hashing. + // so multiple receivers is not realistic for now. So use non-broadcast unicode + // for now would seem the safest. But that would stop us from allowing multiple + // receivers in the special case where additional nodes begin listening in from the + // beginning. + reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN; reply.out.ttl=1; bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE); // send replies to broadcast so that others can hear blocks and record them diff --git a/rhizome_fetch.c b/rhizome_fetch.c index c89d17fa..83d0f813 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -61,6 +61,7 @@ struct rhizome_fetch_slot { char filename[1024]; int64_t file_len; int64_t file_ofs; + int64_t last_write_time; /* HTTP transport specific elements */ char request[1024]; @@ -73,11 +74,8 @@ struct rhizome_fetch_slot { int bidP; unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES]; int prefix_length; - int64_t mdpNextTX; - int64_t mdpLastRX; int mdpIdleTimeout; - int mdpResponsesReceived; - int64_t mdpRXWindowStart; + int mdpResponsesOutstanding; int mdpRXBlockLength; uint32_t mdpRXBitmap; unsigned char mdpRXWindow[32*200]; @@ -983,8 +981,8 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) assert(slot->state != RHIZOME_FETCH_FREE); /* close socket and stop watching it */ - unwatch(&slot->alarm); unschedule(&slot->alarm); + unwatch(&slot->alarm); close(slot->alarm.poll.fd); slot->alarm.poll.fd = -1; slot->alarm.function=NULL; @@ -1021,14 +1019,15 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) } long long now=gettime_ms(); - if (now-slot->mdpLastRX>slot->mdpIdleTimeout) { + if (now-slot->last_write_time>slot->mdpIdleTimeout) { DEBUGF("MDP connection timed out: last RX %lldms ago", - now-slot->mdpLastRX); + now-slot->last_write_time); rhizome_fetch_close(slot); return; } - DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p", - slot); + if (debug& DEBUG_RHIZOME_RX) + DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p", + slot); if (slot->bidP) rhizome_fetch_mdp_requestblocks(slot); else @@ -1037,17 +1036,11 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) { - if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { - // connection timed out - DEBUGF("MDP connection timed out"); - return rhizome_fetch_close(slot); - } // only issue new requests every 133ms. // we automatically re-issue once we have received all packets in this // request also, so if there is no packet loss, we can go substantially // faster. Optimising behaviour when there is no packet loss is an // outstanding task. - slot->mdpNextTX=gettime_ms()+133; overlay_mdp_frame mdp; @@ -1064,24 +1057,25 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->bidVersion); - write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXWindowStart); + write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->file_ofs); write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap); write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength); if (0) DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x", alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), - slot->mdpRXWindowStart); + slot->file_ofs); overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); // remember when we sent the request so that we can adjust the inter-request // interval based on how fast the packets arrive. - slot->mdpResponsesReceived=0; + slot->mdpResponsesOutstanding=32; // TODO: set according to bitmap unschedule(&slot->alarm); slot->alarm.function = rhizome_fetch_mdp_slot_callback; - slot->alarm.alarm=slot->mdpNextTX; + slot->alarm.alarm=gettime_ms()+133; + slot->alarm.deadline=slot->alarm.alarm+500; schedule(&slot->alarm); return 0; @@ -1095,12 +1089,11 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) return rhizome_fetch_close(slot); } - if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) { + if ((gettime_ms()-slot->last_write_time)>slot->mdpIdleTimeout) { // connection timed out DEBUGF("MDP connection timedout"); return rhizome_fetch_close(slot); } - slot->mdpNextTX=gettime_ms()+100; overlay_mdp_frame mdp; @@ -1120,7 +1113,8 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot) DEBUGF("Set callback function, and set alarm"); slot->alarm.function = rhizome_fetch_mdp_slot_callback; - slot->alarm.alarm=slot->mdpNextTX; + slot->alarm.alarm=gettime_ms()+100; + slot->alarm.deadline=slot->alarm.alarm+500; schedule(&slot->alarm); return 0; @@ -1146,7 +1140,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) slot->state=RHIZOME_FETCH_RXFILEMDP; - slot->mdpLastRX=gettime_ms(); + slot->last_write_time=gettime_ms(); if (slot->bidP) { /* We are requesting a file. The http request may have already received some of the file, so take that into account when setting up ring buffer. @@ -1158,15 +1152,13 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) to fit into a packet, and probably fit at least one any any outgoing packet that is not otherwise full. */ slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds - slot->mdpRXWindowStart=slot->file_ofs; slot->mdpRXBitmap=0x00000000; // no blocks received yet - slot->mdpRXBlockLength=1024; // 200; + slot->mdpRXBlockLength=200; // 200; rhizome_fetch_mdp_requestblocks(slot); } else { /* We are requesting a manifest, which is stateless, except that we eventually give up. All we need to do now is send the request, and set our alarm to try again in case we haven't heard anything back. */ - slot->mdpNextTX=gettime_ms()+100; slot->mdpIdleTimeout=2000; // only try for two seconds rhizome_fetch_mdp_requestmanifest(slot); } @@ -1202,7 +1194,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) } } -void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) +int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes) { if (bytes>(slot->file_len-slot->file_ofs)) bytes=slot->file_len-slot->file_ofs; @@ -1210,9 +1202,10 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by if (debug & DEBUG_RHIZOME_RX) DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs); rhizome_fetch_close(slot); - return; + return -1; } slot->file_ofs+=bytes; + slot->last_write_time=gettime_ms(); if (slot->file_ofs>=slot->file_len) { /* got all of file */ if (debug & DEBUG_RHIZOME_RX) @@ -1257,13 +1250,16 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by } DEBUGF("Closing rhizome fetch slot = 0x%p",slot); rhizome_fetch_close(slot); - return; + return -1; } // reset inactivity timeout unschedule(&slot->alarm); slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT; slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT; schedule(&slot->alarm); + + // slot is still open + return 0; } int rhizome_received_content(unsigned char *bidprefix, @@ -1277,36 +1273,31 @@ int rhizome_received_content(unsigned char *bidprefix, if (slot->state==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) { if (!bcmp(slot->bid,bidprefix,16)) { - if (slot->file_ofs==offset) { - debug=DEBUG_RHIZOME_RX; /* We don't know the file length until we receive the last block. If it isn't the last block, lie, and claim the end of file is yet to come. */ if (type=='T') slot->file_len=offset+count; - else slot->file_len=offset+count+1; - rhizome_write_content(slot,(char *)bytes,count); - debug=0; - slot->mdpRXWindowStart=offset+count; - slot->mdpLastRX=gettime_ms(); - - // TODO: Shift bitmap - - unschedule(&slot->alarm); - slot->alarm.function = rhizome_fetch_mdp_slot_callback; - slot->alarm.alarm=slot->mdpNextTX; - schedule(&slot->alarm); - - slot->mdpResponsesReceived++; - if (slot->mdpResponsesReceived==32) { - // We have received all responses, so immediately ask for more - rhizome_fetch_mdp_requestblocks(slot); + else { + slot->file_len=offset+count+1; + } + if (!rhizome_write_content(slot,(char *)bytes,count)) + { + slot->mdpResponsesOutstanding--; + if (slot->mdpResponsesOutstanding==0) { + // We have received all responses, so immediately ask for more + rhizome_fetch_mdp_requestblocks(slot); + } + + // TODO: Try flushing out stuck packets that we have kept due to + // packet loss / out-of-order delivery. + } RETURN(0); } else { - // TODO: Implement out-of-order reception so that lost packets + // TODO: Implement out-of-order buffering so that lost packets // don't cause wastage } RETURN(0); @@ -1332,11 +1323,6 @@ void rhizome_fetch_poll(struct sched_ent *alarm) case RHIZOME_FETCH_SENDINGHTTPREQUEST: rhizome_fetch_write(slot); return; - case RHIZOME_FETCH_RXFILEMDP: - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Fetching via MDP not implemented"); - rhizome_fetch_close(slot); - break; case RHIZOME_FETCH_RXFILE: { /* Keep reading until we have the promised amount of data */ char buffer[8192]; @@ -1410,8 +1396,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) } break; default: - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); + WARNF("Closing rhizome fetch connection due to illegal/unimplemented state=%d.",slot->state); rhizome_fetch_close(slot); return; }