Refactor rhizome mdp and vomp packet sending

This commit is contained in:
Jeremy Lakeman 2014-02-03 14:24:41 +10:30
parent 2480fb4a08
commit 32ab923dcc
4 changed files with 159 additions and 106 deletions

View File

@ -162,6 +162,15 @@ void _ob_flip(struct __sourceloc __whence, struct overlay_buffer *b)
b->position = 0;
}
void _ob_clear(struct __sourceloc __whence, struct overlay_buffer *b)
{
if (config.debug.overlaybuffer)
DEBUGF("ob_flip(b=%p) checkpointLength=0 position=0", b);
b->checkpointLength = 0;
b->position = 0;
ob_unlimitsize(b);
}
/* Return 1 if space is available, 0 if not.
*/
ssize_t _ob_makespace(struct __sourceloc __whence, struct overlay_buffer *b, size_t bytes)
@ -281,6 +290,11 @@ void _ob_append_bytes(struct __sourceloc __whence, struct overlay_buffer *b, con
b->position += count;
}
void _ob_append_str(struct __sourceloc whence, struct overlay_buffer *b, const char *str)
{
_ob_append_bytes(whence, b, (const uint8_t*)str, strlen(str)+1);
}
void _ob_append_ui16(struct __sourceloc __whence, struct overlay_buffer *b, uint16_t v)
{
const int bytes = 2;
@ -296,6 +310,21 @@ void _ob_append_ui16(struct __sourceloc __whence, struct overlay_buffer *b, uint
b->position += bytes;
}
void _ob_append_ui16_rv(struct __sourceloc __whence, struct overlay_buffer *b, uint16_t v)
{
const int bytes = 2;
if (ob_makespace(b, bytes)) {
b->bytes[b->position] = v & 0xFF;
b->bytes[b->position+1] = (v >> 8) & 0xFF;
if (config.debug.overlaybuffer)
DEBUGF("ob_append_ui16(b=%p, v=%u) %p[%zd]=%s position=%zu", b, v, b->bytes, b->position, alloca_tohex(&b->bytes[b->position], bytes), b->position + bytes);
} else {
if (config.debug.overlaybuffer)
DEBUGF("ob_append_ui16(b=%p, v=%u) OVERRUN position=%zu", b, v, b->position + bytes);
}
b->position += bytes;
}
void _ob_append_ui32(struct __sourceloc __whence, struct overlay_buffer *b, uint32_t v)
{
const int bytes = 4;
@ -314,6 +343,24 @@ void _ob_append_ui32(struct __sourceloc __whence, struct overlay_buffer *b, uint
b->position += bytes;
}
void _ob_append_ui32_rv(struct __sourceloc __whence, struct overlay_buffer *b, uint32_t v)
{
const int bytes = 4;
if (ob_makespace(b, bytes)) {
b->bytes[b->position] = v & 0xFF;
b->bytes[b->position+1] = (v >> 8) & 0xFF;
b->bytes[b->position+2] = (v >> 16) & 0xFF;
b->bytes[b->position+3] = (v >> 24) & 0xFF;
if (config.debug.overlaybuffer)
DEBUGF("ob_append_ui32(b=%p, v=%"PRIu32") %p[%zd]=%s position=%zu",
b, v, b->bytes, b->position, alloca_tohex(&b->bytes[b->position], bytes), b->position + bytes);
} else {
if (config.debug.overlaybuffer)
DEBUGF("ob_append_ui32(b=%p, v=%"PRIu32") OVERRUN position=%zu", b, v, b->position + bytes);
}
b->position += bytes;
}
void _ob_append_ui64(struct __sourceloc __whence, struct overlay_buffer *b, uint64_t v)
{
const int bytes = 8;
@ -336,6 +383,28 @@ void _ob_append_ui64(struct __sourceloc __whence, struct overlay_buffer *b, uint
b->position += bytes;
}
void _ob_append_ui64_rv(struct __sourceloc __whence, struct overlay_buffer *b, uint64_t v)
{
const int bytes = 8;
if (ob_makespace(b, bytes)) {
b->bytes[b->position] = v & 0xFF;
b->bytes[b->position+1] = (v >> 8) & 0xFF;
b->bytes[b->position+2] = (v >> 16) & 0xFF;
b->bytes[b->position+3] = (v >> 24) & 0xFF;
b->bytes[b->position+4] = (v >> 32) & 0xFF;
b->bytes[b->position+5] = (v >> 40) & 0xFF;
b->bytes[b->position+6] = (v >> 48) & 0xFF;
b->bytes[b->position+7] = (v >> 56) & 0xFF;
if (config.debug.overlaybuffer)
DEBUGF("ob_append_ui64(b=%p, v=%"PRIu64") %p[%zd]=%s position=%zu",
b, v, b->bytes, b->position, alloca_tohex(&b->bytes[b->position], bytes), b->position + bytes);
} else {
if (config.debug.overlaybuffer)
DEBUGF("ob_append_ui64(b=%p, v=%"PRIu64") OVERRUN position=%zu", b, v, b->position + bytes);
}
b->position += bytes;
}
int measure_packed_uint(uint64_t v){
int ret=0;
do{
@ -386,13 +455,6 @@ void _ob_append_packed_ui64(struct __sourceloc __whence, struct overlay_buffer *
} while (v != 0);
}
void _ob_append_rfs(struct __sourceloc __whence, struct overlay_buffer *b, int l)
{
assert(l >= 0);
assert(l <= 0xffff);
b->var_length_offset = b->position;
ob_append_ui16(b, l);
}
/*
@ -592,10 +654,6 @@ void _ob_set(struct __sourceloc __whence, struct overlay_buffer *b, size_t offse
DEBUGF("ob_set(b=%p, offset=%zd, byte=0x%02x) %p[%zd]=%s", b, offset, byte, b->bytes, offset, alloca_tohex(&b->bytes[offset], bytes));
}
void _ob_patch_rfs(struct __sourceloc __whence, struct overlay_buffer *b)
{
ob_set_ui16(b,b->var_length_offset,b->position - (b->var_length_offset + 2));
}
size_t ob_position(struct overlay_buffer *b)
@ -642,7 +700,8 @@ int asprintable(int c)
int ob_dump(struct overlay_buffer *b, char *desc)
{
DEBUGF("overlay_buffer '%s' at %p (%p) : position=%zu, size=%zu", desc, b, b->bytes, b->position, b->sizeLimit);
DEBUGF("overlay_buffer '%s' at %p (%p) : checkpoint=%zu, position=%zu, limit=%zu, size=%zu",
desc, b, b->bytes, b->checkpointLength, b->position, b->sizeLimit, b->allocSize);
if (b->bytes) {
if (b->sizeLimit != SIZE_MAX && b->sizeLimit > 0) {
assert(b->position <= b->sizeLimit);

View File

@ -37,9 +37,6 @@ struct overlay_buffer {
// is this an allocated buffer? can it be resized? Should it be freed?
unsigned char * allocated;
// length position for later patching
size_t var_length_offset;
};
struct overlay_buffer *_ob_new(struct __sourceloc __whence);
@ -51,11 +48,11 @@ int _ob_checkpoint(struct __sourceloc __whence, struct overlay_buffer *b);
int _ob_rewind(struct __sourceloc __whence, struct overlay_buffer *b);
void _ob_limitsize(struct __sourceloc __whence, struct overlay_buffer *b, size_t bytes);
void _ob_flip(struct __sourceloc __whence, struct overlay_buffer *b);
void _ob_clear(struct __sourceloc __whence, struct overlay_buffer *b);
void _ob_unlimitsize(struct __sourceloc __whence, struct overlay_buffer *b);
ssize_t _ob_makespace(struct __sourceloc whence, struct overlay_buffer *b, size_t bytes);
void _ob_set(struct __sourceloc __whence, struct overlay_buffer *b, size_t ofs, unsigned char byte);
void _ob_set_ui16(struct __sourceloc __whence, struct overlay_buffer *b, size_t offset, uint16_t v);
void _ob_patch_rfs(struct __sourceloc __whence, struct overlay_buffer *b);
void _ob_append_byte(struct __sourceloc whence, struct overlay_buffer *b,unsigned char byte);
void _ob_append_bytes(struct __sourceloc whence, struct overlay_buffer *b,const unsigned char *bytes, size_t count);
@ -63,9 +60,12 @@ unsigned char *_ob_append_space(struct __sourceloc whence, struct overlay_buffer
void _ob_append_ui16(struct __sourceloc whence, struct overlay_buffer *b, uint16_t v);
void _ob_append_ui32(struct __sourceloc whence, struct overlay_buffer *b, uint32_t v);
void _ob_append_ui64(struct __sourceloc whence, struct overlay_buffer *b, uint64_t v);
void _ob_append_ui16_rv(struct __sourceloc whence, struct overlay_buffer *b, uint16_t v);
void _ob_append_ui32_rv(struct __sourceloc whence, struct overlay_buffer *b, uint32_t v);
void _ob_append_ui64_rv(struct __sourceloc whence, struct overlay_buffer *b, uint64_t v);
void _ob_append_packed_ui32(struct __sourceloc whence, struct overlay_buffer *b, uint32_t v);
void _ob_append_packed_ui64(struct __sourceloc whence, struct overlay_buffer *b, uint64_t v);
void _ob_append_rfs(struct __sourceloc whence, struct overlay_buffer *b,int l);
void _ob_append_str(struct __sourceloc whence, struct overlay_buffer *b, const char *str);
#define ob_new() _ob_new(__WHENCE__)
#define ob_static(bytes, size) _ob_static(__WHENCE__, bytes, size)
@ -76,11 +76,11 @@ void _ob_append_rfs(struct __sourceloc whence, struct overlay_buffer *b,int l);
#define ob_rewind(b) _ob_rewind(__WHENCE__, b)
#define ob_limitsize(b, size) _ob_limitsize(__WHENCE__, b, size)
#define ob_flip(b) _ob_flip(__WHENCE__, b)
#define ob_clear(b) _ob_clear(__WHENCE__, b)
#define ob_unlimitsize(b) _ob_unlimitsize(__WHENCE__, b)
#define ob_makespace(b, bytes) _ob_makespace(__WHENCE__, b, bytes)
#define ob_set(b, off, byte) _ob_set(__WHENCE__, b, off, byte)
#define ob_set_ui16(b, off, v) _ob_set_ui16(__WHENCE__, b, off, v)
#define ob_patch_rfs(b) _ob_patch_rfs(__WHENCE__, b)
#define ob_append_byte(b, byte) _ob_append_byte(__WHENCE__, b, byte)
#define ob_append_bytes(b, bytes, count) _ob_append_bytes(__WHENCE__, b, bytes, count)
@ -88,9 +88,12 @@ void _ob_append_rfs(struct __sourceloc whence, struct overlay_buffer *b,int l);
#define ob_append_ui16(b, v) _ob_append_ui16(__WHENCE__, b, v)
#define ob_append_ui32(b, v) _ob_append_ui32(__WHENCE__, b, v)
#define ob_append_ui64(b, v) _ob_append_ui64(__WHENCE__, b, v)
#define ob_append_ui16_rv(b, v) _ob_append_ui16_rv(__WHENCE__, b, v)
#define ob_append_ui32_rv(b, v) _ob_append_ui32_rv(__WHENCE__, b, v)
#define ob_append_ui64_rv(b, v) _ob_append_ui64_rv(__WHENCE__, b, v)
#define ob_append_packed_ui32(b, v) _ob_append_packed_ui32(__WHENCE__, b, v)
#define ob_append_packed_ui64(b, v) _ob_append_packed_ui64(__WHENCE__, b, v)
#define ob_append_rfs(b, l) _ob_append_rfs(__WHENCE__, b, l)
#define ob_append_str(b, s) _ob_append_str(__WHENCE__, b, s)
// get one byte, -ve number indicates failure
int ob_peek(struct overlay_buffer *b);

View File

@ -43,8 +43,12 @@ int rhizome_mdp_send_block(struct subscriber *dest, const rhizome_bid_t *bid, ui
if (config.debug.rhizome_tx)
DEBUGF("Requested blocks for bid=%s, ver=%"PRIu64" @%"PRIx64" bitmap %x", alloca_tohex_rhizome_bid_t(*bid), version, fileOffset, bitmap);
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
struct internal_mdp_header header;
bzero(&header, sizeof header);
uint8_t buff[MDP_MTU];
struct overlay_buffer *payload = ob_static(buff, sizeof(buff));
// Reply is broadcast, so we cannot authcrypt, and signing is too time consuming
// for low devices. The result is that an attacker can prevent rhizome transfers
// if they want to by injecting fake blocks. The alternative is to not broadcast
@ -54,57 +58,59 @@ int rhizome_mdp_send_block(struct subscriber *dest, const rhizome_bid_t *bid, ui
// for now would seem the safest. But that would stop us from allowing multiple
// receivers in the special case where additional nodes begin listening in from the
// beginning.
reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN;
reply.out.src.sid = my_subscriber->sid;
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
header.crypt_flags = MDP_FLAG_NO_CRYPT | MDP_FLAG_NO_SIGN;
header.source = my_subscriber;
header.source_port = MDP_PORT_RHIZOME_RESPONSE;
if (dest && (dest->reachable==REACHABLE_UNICAST || dest->reachable==REACHABLE_INDIRECT)){
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
reply.out.dst.sid = dest->sid;
header.destination = dest;
}else{
// send replies to broadcast so that others can hear blocks and record them
// (not that preemptive listening is implemented yet).
reply.out.dst.sid = SID_BROADCAST;
reply.out.ttl=1;
header.ttl = 1;
}
reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE;
reply.out.queue=OQ_OPPORTUNISTIC;
reply.out.payload[0]='B'; // reply contains blocks
// include 16 bytes of BID prefix for identification
bcopy(bid->binary, &reply.out.payload[1], 16);
// and version of manifest (in the correct byte order)
// bcopy(&version, &reply.out.payload[1+16], sizeof(uint64_t));
write_uint64(&reply.out.payload[1+16],version);
header.destination_port = MDP_PORT_RHIZOME_RESPONSE;
header.qos = OQ_OPPORTUNISTIC;
int i;
for(i=0;i<32;i++){
if (bitmap&(1<<(31-i)))
continue;
if (overlay_queue_remaining(reply.out.queue) < 10)
if (overlay_queue_remaining(header.qos) < 10)
break;
// calculate and set offset of block
uint64_t offset = fileOffset+i*blockLength;
ob_clear(payload);
ob_append_byte(payload, 'B'); // contains blocks
// include 16 bytes of BID prefix for identification
ob_append_bytes(payload, bid->binary, 16);
// and version of manifest (in the correct byte order)
ob_append_ui64_rv(payload, version);
write_uint64(&reply.out.payload[1+16+8], offset);
ob_append_ui64_rv(payload, offset);
ssize_t bytes_read = rhizome_read_cached(bid, version, gettime_ms()+5000, offset, &reply.out.payload[1+16+8+8], blockLength);
ssize_t bytes_read = rhizome_read_cached(bid, version, gettime_ms()+5000, offset, ob_current_ptr(payload), blockLength);
if (bytes_read<=0)
break;
reply.out.payload_length=1+16+8+8+(size_t)bytes_read;
ob_append_space(payload, bytes_read);
// Mark the last block of the file, if required
if ((size_t)bytes_read < blockLength)
reply.out.payload[0]='T';
ob_set(payload, 0, 'T');
// send packet
if (overlay_mdp_dispatch(&reply, NULL))
ob_flip(payload);
if (overlay_send_frame(&header, payload))
break;
}
ob_free(payload);
RETURN(0);
OUT();
}
@ -227,32 +233,17 @@ int overlay_mdp_service_dnalookup(struct internal_mdp_header *header, struct ove
int overlay_mdp_service_echo(struct internal_mdp_header *header, struct overlay_buffer *payload)
{
/* Echo is easy: we swap the sender and receiver addresses (and thus port
numbers) and send the frame back. */
IN();
/* Prevent echo:echo connections and the resulting denial of service from triggering endless pongs. */
if (header->source_port == MDP_PORT_ECHO)
RETURN(WHY("echo loop averted"));
RETURN(WHY("Prevented infinite echo loop"));
struct internal_mdp_header response_header;
bzero(&response_header, sizeof response_header);
response_header.source = header->destination;
response_header.source_port = MDP_PORT_ECHO;
response_header.destination = header->source;
response_header.destination_port = header->source_port;
response_header.qos = header->qos;
mdp_init_response(header, &response_header);
// keep all defaults
/* Always send PONGs auth-crypted so that the receipient knows
that they are genuine, and so that we avoid the extra cost
of signing (which is slower than auth-crypting) */
/* If the packet was sent to broadcast, then replace broadcast address
with our local address. */
if (!response_header.source)
response_header.source = my_subscriber;
RETURN(overlay_send_frame(&response_header, payload));
OUT();
}

92
vomp.c
View File

@ -464,20 +464,18 @@ static struct vomp_call_state *vomp_find_or_create_call(struct subscriber *remot
return NULL;
}
static void prepare_vomp_header(struct vomp_call_state *call, overlay_mdp_frame *mdp){
mdp->packetTypeAndFlags=MDP_TX;
mdp->out.src.sid = call->local.subscriber->sid;
mdp->out.src.port=MDP_PORT_VOMP;
mdp->out.dst.sid = call->remote.subscriber->sid;
mdp->out.dst.port=MDP_PORT_VOMP;
static void prepare_vomp_header(
struct vomp_call_state *call, struct internal_mdp_header *header, struct overlay_buffer *payload)
{
header->source = call->local.subscriber;
header->source_port = MDP_PORT_VOMP;
header->destination = call->remote.subscriber;
header->destination_port = MDP_PORT_VOMP;
mdp->out.payload[0]=VOMP_VERSION;
mdp->out.payload[1]=(call->local.session>>8)&0xff;
mdp->out.payload[2]=(call->local.session>>0)&0xff;
mdp->out.payload[3]=(call->remote.session>>8)&0xff;
mdp->out.payload[4]=(call->remote.session>>0)&0xff;
mdp->out.payload[5]=(call->remote.state<<4)|call->local.state;
mdp->out.payload_length=6;
ob_append_byte(payload, VOMP_VERSION);
ob_append_ui16(payload, call->local.session);
ob_append_ui16(payload, call->remote.session);
ob_append_ui16(payload, (call->remote.state<<4)|call->local.state);
// keep trying to punch a NAT tunnel for 10s
// note that requests are rate limited internally to one packet per second
@ -491,14 +489,16 @@ static void prepare_vomp_header(struct vomp_call_state *call, overlay_mdp_frame
static int vomp_send_status_remote(struct vomp_call_state *call)
{
overlay_mdp_frame mdp;
unsigned short *len=&mdp.out.payload_length;
struct internal_mdp_header header;
bzero(&header, sizeof(header));
uint8_t buff[MDP_MTU];
struct overlay_buffer *payload = ob_static(buff, sizeof buff);
prepare_vomp_header(call, &header, payload);
header.qos = OQ_ORDINARY;
bzero(&mdp,sizeof(mdp));
prepare_vomp_header(call, &mdp);
mdp.out.queue=OQ_ORDINARY;
if (call->local.state < VOMP_STATE_RINGINGOUT && call->remote.state < VOMP_STATE_RINGINGOUT) {
int didLen;
unsigned char codecs[CODEC_FLAGS_LENGTH];
/* Include the list of supported codecs */
@ -506,33 +506,34 @@ static int vomp_send_status_remote(struct vomp_call_state *call)
int i;
for (i = 0; i < 256; ++i)
if (is_codec_set(i,codecs)) {
mdp.out.payload[(*len)++]=i;
}
mdp.out.payload[(*len)++]=0;
if (is_codec_set(i,codecs))
ob_append_byte(payload, i);
ob_append_byte(payload, 0);
/* Include src and dst phone numbers */
if (call->initiated_call){
DEBUGF("Sending phone numbers %s, %s",call->local.did,call->remote.did);
didLen = snprintf((char *)(mdp.out.payload + *len), sizeof(mdp.out.payload) - *len, "%s", call->local.did);
*len+=didLen+1;
didLen = snprintf((char *)(mdp.out.payload + *len), sizeof(mdp.out.payload) - *len, "%s", call->remote.did);
*len+=didLen+1;
if (config.debug.vomp)
DEBUGF("Sending phone numbers %s, %s",call->local.did,call->remote.did);
ob_append_str(payload, call->local.did);
ob_append_str(payload, call->remote.did);
}
if (config.debug.vomp)
DEBUGF("mdp frame with codec list is %d bytes", mdp.out.payload_length);
DEBUGF("mdp frame with codec list is %zd bytes", ob_position(payload));
}
call->local.sequence++;
overlay_mdp_dispatch(&mdp, NULL);
ob_flip(payload);
overlay_send_frame(&header, payload);
ob_free(payload);
return 0;
}
int vomp_received_audio(struct vomp_call_state *call, int audio_codec, int time, int sequence,
const unsigned char *audio, int audio_length)
const uint8_t *audio, int audio_length)
{
if (call->local.state!=VOMP_STATE_INCALL)
return -1;
@ -546,25 +547,24 @@ int vomp_received_audio(struct vomp_call_state *call, int audio_codec, int time,
if (sequence==-1)
sequence = call->local.sequence++;
overlay_mdp_frame mdp;
unsigned short *len=&mdp.out.payload_length;
struct internal_mdp_header header;
bzero(&header, sizeof(header));
bzero(&mdp,sizeof(mdp));
prepare_vomp_header(call, &mdp);
uint8_t buff[MDP_MTU];
struct overlay_buffer *payload = ob_static(buff, sizeof buff);
mdp.out.payload[(*len)++]=audio_codec;
prepare_vomp_header(call, &header, payload);
header.qos = OQ_ISOCHRONOUS_VOICE;
ob_append_byte(payload, audio_codec);
time = time / 20;
mdp.out.payload[(*len)++]=(time>>8)&0xff;
mdp.out.payload[(*len)++]=(time>>0)&0xff;
mdp.out.payload[(*len)++]=(sequence>>8)&0xff;
mdp.out.payload[(*len)++]=(sequence>>0)&0xff;
ob_append_ui16(payload, time);
ob_append_ui16(payload, sequence);
ob_append_bytes(payload, audio, audio_length);
bcopy(audio,&mdp.out.payload[(*len)],audio_length);
(*len)+=audio_length;
mdp.out.queue=OQ_ISOCHRONOUS_VOICE;
overlay_mdp_dispatch(&mdp, NULL);
ob_flip(payload);
overlay_send_frame(&header, payload);
ob_free(payload);
return 0;
}