feedback from code review by Jeremy.

Some preparation for out-of-order handling.
Consolidated redundantly redundant fields in fetch slot structure.
Probably fixed spurious alarm bug.
This commit is contained in:
gardners 2012-12-03 11:50:35 +10:30
parent 7b4b9dbf81
commit 7ec695c940
2 changed files with 47 additions and 57 deletions

View File

@ -88,7 +88,12 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
// for low devices. The result is that an attacker can prevent rhizome transfers
// if they want to by injecting fake blocks. The alternative is to not broadcast
// back replies, and then we can authcrypt.
reply.packetTypeAndFlags=MDP_TX|MDP_NOSIGN|MDP_NOCRYPT;
// multiple receivers starting at different times, we really need merkle-tree hashing.
// so multiple receivers is not realistic for now. So use non-broadcast unicode
// for now would seem the safest. But that would stop us from allowing multiple
// receivers in the special case where additional nodes begin listening in from the
// beginning.
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
reply.out.ttl=1;
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
// send replies to broadcast so that others can hear blocks and record them

View File

@ -61,6 +61,7 @@ struct rhizome_fetch_slot {
char filename[1024];
int64_t file_len;
int64_t file_ofs;
int64_t last_write_time;
/* HTTP transport specific elements */
char request[1024];
@ -73,11 +74,8 @@ struct rhizome_fetch_slot {
int bidP;
unsigned char prefix[RHIZOME_MANIFEST_ID_BYTES];
int prefix_length;
int64_t mdpNextTX;
int64_t mdpLastRX;
int mdpIdleTimeout;
int mdpResponsesReceived;
int64_t mdpRXWindowStart;
int mdpResponsesOutstanding;
int mdpRXBlockLength;
uint32_t mdpRXBitmap;
unsigned char mdpRXWindow[32*200];
@ -983,8 +981,8 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
assert(slot->state != RHIZOME_FETCH_FREE);
/* close socket and stop watching it */
unwatch(&slot->alarm);
unschedule(&slot->alarm);
unwatch(&slot->alarm);
close(slot->alarm.poll.fd);
slot->alarm.poll.fd = -1;
slot->alarm.function=NULL;
@ -1021,14 +1019,15 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
}
long long now=gettime_ms();
if (now-slot->mdpLastRX>slot->mdpIdleTimeout) {
if (now-slot->last_write_time>slot->mdpIdleTimeout) {
DEBUGF("MDP connection timed out: last RX %lldms ago",
now-slot->mdpLastRX);
now-slot->last_write_time);
rhizome_fetch_close(slot);
return;
}
DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p",
slot);
if (debug& DEBUG_RHIZOME_RX)
DEBUGF("Timeout waiting for blocks. Resending request for slot=0x%p",
slot);
if (slot->bidP)
rhizome_fetch_mdp_requestblocks(slot);
else
@ -1037,17 +1036,11 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
{
if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) {
// connection timed out
DEBUGF("MDP connection timed out");
return rhizome_fetch_close(slot);
}
// only issue new requests every 133ms.
// we automatically re-issue once we have received all packets in this
// request also, so if there is no packet loss, we can go substantially
// faster. Optimising behaviour when there is no packet loss is an
// outstanding task.
slot->mdpNextTX=gettime_ms()+133;
overlay_mdp_frame mdp;
@ -1064,24 +1057,25 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot)
bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES);
write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES],slot->bidVersion);
write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->mdpRXWindowStart);
write_uint64(&mdp.out.payload[RHIZOME_BAR_BYTES+8],slot->file_ofs);
write_uint32(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8],slot->mdpRXBitmap);
write_uint16(&mdp.out.payload[RHIZOME_BAR_BYTES+8+8+4],slot->mdpRXBlockLength);
if (0)
DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%x",
alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid),
slot->mdpRXWindowStart);
slot->file_ofs);
overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0);
// remember when we sent the request so that we can adjust the inter-request
// interval based on how fast the packets arrive.
slot->mdpResponsesReceived=0;
slot->mdpResponsesOutstanding=32; // TODO: set according to bitmap
unschedule(&slot->alarm);
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=slot->mdpNextTX;
slot->alarm.alarm=gettime_ms()+133;
slot->alarm.deadline=slot->alarm.alarm+500;
schedule(&slot->alarm);
return 0;
@ -1095,12 +1089,11 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot)
return rhizome_fetch_close(slot);
}
if ((gettime_ms()-slot->mdpLastRX)>slot->mdpIdleTimeout) {
if ((gettime_ms()-slot->last_write_time)>slot->mdpIdleTimeout) {
// connection timed out
DEBUGF("MDP connection timedout");
return rhizome_fetch_close(slot);
}
slot->mdpNextTX=gettime_ms()+100;
overlay_mdp_frame mdp;
@ -1120,7 +1113,8 @@ static int rhizome_fetch_mdp_requestmanifest(struct rhizome_fetch_slot *slot)
DEBUGF("Set callback function, and set alarm");
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=slot->mdpNextTX;
slot->alarm.alarm=gettime_ms()+100;
slot->alarm.deadline=slot->alarm.alarm+500;
schedule(&slot->alarm);
return 0;
@ -1146,7 +1140,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
slot->state=RHIZOME_FETCH_RXFILEMDP;
slot->mdpLastRX=gettime_ms();
slot->last_write_time=gettime_ms();
if (slot->bidP) {
/* We are requesting a file. The http request may have already received
some of the file, so take that into account when setting up ring buffer.
@ -1158,15 +1152,13 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
to fit into a packet, and probably fit at least one any any outgoing packet
that is not otherwise full. */
slot->mdpIdleTimeout=5000; // give up if nothing received for 5 seconds
slot->mdpRXWindowStart=slot->file_ofs;
slot->mdpRXBitmap=0x00000000; // no blocks received yet
slot->mdpRXBlockLength=1024; // 200;
slot->mdpRXBlockLength=200; // 200;
rhizome_fetch_mdp_requestblocks(slot);
} else {
/* We are requesting a manifest, which is stateless, except that we eventually
give up. All we need to do now is send the request, and set our alarm to
try again in case we haven't heard anything back. */
slot->mdpNextTX=gettime_ms()+100;
slot->mdpIdleTimeout=2000; // only try for two seconds
rhizome_fetch_mdp_requestmanifest(slot);
}
@ -1202,7 +1194,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
}
}
void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes)
int rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int bytes)
{
if (bytes>(slot->file_len-slot->file_ofs))
bytes=slot->file_len-slot->file_ofs;
@ -1210,9 +1202,10 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed to write %d bytes to file @ offset %d", bytes, slot->file_ofs);
rhizome_fetch_close(slot);
return;
return -1;
}
slot->file_ofs+=bytes;
slot->last_write_time=gettime_ms();
if (slot->file_ofs>=slot->file_len) {
/* got all of file */
if (debug & DEBUG_RHIZOME_RX)
@ -1257,13 +1250,16 @@ void rhizome_write_content(struct rhizome_fetch_slot *slot, char *buffer, int by
}
DEBUGF("Closing rhizome fetch slot = 0x%p",slot);
rhizome_fetch_close(slot);
return;
return -1;
}
// reset inactivity timeout
unschedule(&slot->alarm);
slot->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
// slot is still open
return 0;
}
int rhizome_received_content(unsigned char *bidprefix,
@ -1277,36 +1273,31 @@ int rhizome_received_content(unsigned char *bidprefix,
if (slot->state==RHIZOME_FETCH_RXFILEMDP&&slot->bidP) {
if (!bcmp(slot->bid,bidprefix,16))
{
if (slot->file_ofs==offset) {
debug=DEBUG_RHIZOME_RX;
/* We don't know the file length until we receive the last
block. If it isn't the last block, lie, and claim the end of
file is yet to come. */
if (type=='T') slot->file_len=offset+count;
else slot->file_len=offset+count+1;
rhizome_write_content(slot,(char *)bytes,count);
debug=0;
slot->mdpRXWindowStart=offset+count;
slot->mdpLastRX=gettime_ms();
// TODO: Shift bitmap
unschedule(&slot->alarm);
slot->alarm.function = rhizome_fetch_mdp_slot_callback;
slot->alarm.alarm=slot->mdpNextTX;
schedule(&slot->alarm);
slot->mdpResponsesReceived++;
if (slot->mdpResponsesReceived==32) {
// We have received all responses, so immediately ask for more
rhizome_fetch_mdp_requestblocks(slot);
else {
slot->file_len=offset+count+1;
}
if (!rhizome_write_content(slot,(char *)bytes,count))
{
slot->mdpResponsesOutstanding--;
if (slot->mdpResponsesOutstanding==0) {
// We have received all responses, so immediately ask for more
rhizome_fetch_mdp_requestblocks(slot);
}
// TODO: Try flushing out stuck packets that we have kept due to
// packet loss / out-of-order delivery.
}
RETURN(0);
} else {
// TODO: Implement out-of-order reception so that lost packets
// TODO: Implement out-of-order buffering so that lost packets
// don't cause wastage
}
RETURN(0);
@ -1332,11 +1323,6 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(slot);
return;
case RHIZOME_FETCH_RXFILEMDP:
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Fetching via MDP not implemented");
rhizome_fetch_close(slot);
break;
case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */
char buffer[8192];
@ -1410,8 +1396,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
}
break;
default:
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
WARNF("Closing rhizome fetch connection due to illegal/unimplemented state=%d.",slot->state);
rhizome_fetch_close(slot);
return;
}