Only send one audio block per packet, but request that it be sent multiple times

If we receive a large buffer of audio, we want to stuff the packet with multiple frames and send them together.
And we want to send redundant copies of the audio to help recover from packet loss.
But if all our redundant copies end up in the same packet, we're screwed anyway.

This is a temporary hack until the network layer implements NACK / retry for resilient multi-hop delivery
This commit is contained in:
Jeremy Lakeman 2012-09-28 11:30:03 +09:30
parent 2a7bd28e51
commit 66e0711d6b
5 changed files with 124 additions and 176 deletions

View File

@ -270,7 +270,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Multiple replies can be used to respond with more. */
#define MDP_MAX_SID_REQUEST 59
#define VOMP_MAX_CALLS 16
/* Maximum amount of audio to cram into a VoMP audio packet.
More lets us include preemptive retransmissions.
Less reduces the chance of packets getting lost, and reduces
@ -282,7 +281,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MDP_GOODBYE 9
#define MDP_AWAITREPLY 9999
#define VOMP_SESSION_MASK 0xffffff
/* max number of recent samples to cram into a VoMP frame as well as the current
frame of audio (preemptive audio retransmission) */
#define VOMP_MAX_RECENT_SAMPLES 2

View File

@ -380,7 +380,7 @@ static int monitor_clear(int argc, const char *const *argv, struct command_line_
return monitor_write_error(c,"Unknown monitor type");
char msg[1024];
snprintf(msg,sizeof(msg),"\nMONITORSTATUS:%d\n",c->flags);
snprintf(msg,sizeof(msg),"\nINFO:%d\n",c->flags);
write_str(c->alarm.poll.fd,msg);
return 0;

View File

@ -256,14 +256,6 @@ int overlay_payload_enqueue(int q, struct overlay_frame *p)
if (0) dump_queue("after",q);
if (q==OQ_ISOCHRONOUS_VOICE) {
// Send a packet immediately to reduce latency
// Also this prevents aggregation of multiple voice frames which would
// increase the chance of packet loss leading to missing audio
// TODO, remove when we NACK and retry all frames
overlay_send_packet(NULL);
}
return 0;
}

View File

@ -79,7 +79,6 @@ int bundle_offset[2]={0,0};
int overlay_rhizome_add_advertisements(int interface_number, struct overlay_buffer *e)
{
IN();
int voice_mode=0;
/* behave differently during voice mode.
Basically don't encourage people to grab stuff from us, but keep
@ -92,8 +91,8 @@ int overlay_rhizome_add_advertisements(int interface_number, struct overlay_buff
For now, we will just advertised only occassionally.
*/
time_ms_t now = gettime_ms();
if (now<rhizome_voice_timeout) voice_mode=1;
if (voice_mode) if (random()&3) { RETURN(0); }
if (now<rhizome_voice_timeout)
RETURN(0);
int pass;
int bytes=e->sizeLimit-e->position;

271
vomp.c
View File

@ -41,7 +41,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Dialing
// client requests an outgoing call
# CALL [sid] [myDid] [TheirDid]
> CALLPREP + codecs
> CALLPREP + codecs + phone numbers
// let the client know what token we are going to use for the remainder of the call
$ CALLTO [token] [mySid] [myDid] [TheirSid] [TheirDid]
// allocate a session number and tell them our codecs,
@ -82,6 +82,37 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
$ HANGUP [token]
*/
/*
Minimum network format requirements;
- your call session, packed integer
- my call state
- my sequence number
Pre-ring call setup;
- my call session
- my supported codec list
- your number
- my number
- my name
In call audio;
- codec
- elapsed time from call start
- audio duration
- audio data (remainder of payload)
Assuming minimum audio duration per packet is 20ms, 1 byte sequence should let us deal with ~2.5s of jitter.
If we have >2.5s of jitter, the network is obviously too crappy to support a voice call anyway.
If we can assume constant duration per codec, and I believe we can,
we can use the sequence number to derive the other audio timing information.
We need to resume a call even with large periods of zero traffic (eg >10s),
we should be able to use our own wall clock to estimate which 5s interval the audio belongs to.
*/
// ideally these id's should only be used on the network, with monitor events to inform clients of state changes
#define VOMP_STATE_NOCALL 1
#define VOMP_STATE_CALLPREP 2
@ -90,6 +121,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define VOMP_STATE_INCALL 5
#define VOMP_STATE_CALLENDED 6
#define VOMP_SESSION_MASK 0xffffff
#define VOMP_MAX_CALLS 16
struct vomp_call_half {
unsigned char sid[SID_SIZE];
char did[64];
@ -97,16 +133,6 @@ struct vomp_call_half {
unsigned char codec;
unsigned int session;
unsigned int sequence;
/* the following is from call creation, not start of audio flow */
unsigned long long milliseconds_since_call_start;
};
struct vomp_sample_block {
unsigned int codec;
int len;
time_ms_t starttime;
time_ms_t endtime;
unsigned char bytes[1024];
};
struct vomp_call_state {
@ -114,27 +140,27 @@ struct vomp_call_state {
struct vomp_call_half local;
struct vomp_call_half remote;
int initiated_call;
int fast_audio;
time_ms_t create_time;
time_ms_t last_activity;
time_ms_t audio_clock;
int audio_started;
// last local & remote status we sent to all interested parties
int last_sent_status;
unsigned char remote_codec_list[256];
int recent_sample_rotor;
struct vomp_sample_block recent_samples[VOMP_MAX_RECENT_SAMPLES];
// track when we first heard audio, so we can calculate timing from the current sequence number
int first_remote_audio_sequence;
// simple ring buffer of audio sample times, used to drop duplicate incoming frames
// stores end times, since this is an odd number we can initialise the buffer to zero's
int sample_pos;
unsigned int seen_samples[VOMP_MAX_RECENT_SAMPLES *4];
};
/* Although we only support one call at a time, we allow for multiple call states.
/* Some clients may only support one call at a time, even then we allow for multiple call states.
This is partly to deal with denial of service attacks that might occur by causing
the ejection of newly allocated session numbers before the caller has had a chance
to progress the call to a further state. */
int vomp_call_count=0;
int vomp_active_call=-1;
struct vomp_call_state vomp_call_states[VOMP_MAX_CALLS];
struct profile_total vomp_stats;
@ -180,7 +206,6 @@ struct vomp_call_state *vomp_create_call(unsigned char *remote_sid,
unsigned int remote_session,
unsigned int local_session)
{
int i;
if (!local_session)
local_session=vomp_generate_session_id();
@ -199,10 +224,6 @@ struct vomp_call_state *vomp_create_call(unsigned char *remote_sid,
call->create_time=gettime_ms();
call->last_activity=call->create_time;
// fill sample cache with invalid times
for (i=0;i<VOMP_MAX_RECENT_SAMPLES *4;i++)
call->seen_samples[i]=0xFFFFFFFF;
call->alarm.alarm = call->create_time+VOMP_CALL_STATUS_INTERVAL;
call->alarm.function = vomp_process_tick;
vomp_stats.name="vomp_process_tick";
@ -279,6 +300,32 @@ struct vomp_call_state *vomp_find_or_create_call(unsigned char *remote_sid,
return NULL;
}
static void prepare_vomp_header(struct vomp_call_state *call, overlay_mdp_frame *mdp){
mdp->packetTypeAndFlags=MDP_TX;
bcopy(call->local.sid,mdp->out.src.sid,SID_SIZE);
mdp->out.src.port=MDP_PORT_VOMP;
bcopy(call->remote.sid,mdp->out.dst.sid,SID_SIZE);
mdp->out.dst.port=MDP_PORT_VOMP;
mdp->out.payload[0]=0x01; /* Normal VoMP frame */
mdp->out.payload[1]=(call->remote.state<<4)|call->local.state;
mdp->out.payload[2]=(call->remote.sequence>>8)&0xff;
mdp->out.payload[3]=(call->remote.sequence>>0)&0xff;
mdp->out.payload[4]=(call->local.sequence>>8)&0xff;
mdp->out.payload[5]=(call->local.sequence>>0)&0xff;
time_ms_t call_millis = gettime_ms() - call->create_time;
mdp->out.payload[6]=(call_millis>>8)&0xff;
mdp->out.payload[7]=(call_millis>>0)&0xff;
mdp->out.payload[8]=(call->remote.session>>16)&0xff;
mdp->out.payload[9]=(call->remote.session>>8)&0xff;
mdp->out.payload[10]=(call->remote.session>>0)&0xff;
mdp->out.payload[11]=(call->local.session>>16)&0xff;
mdp->out.payload[12]=(call->local.session>>8)&0xff;
mdp->out.payload[13]=(call->local.session>>0)&0xff;
mdp->out.payload_length=14;
}
/* send updated call status to end-point and to any interested listeners as
appropriate */
@ -288,29 +335,7 @@ int vomp_send_status_remote(struct vomp_call_state *call)
unsigned short *len=&mdp.out.payload_length;
bzero(&mdp,sizeof(mdp));
mdp.packetTypeAndFlags=MDP_TX;
bcopy(call->local.sid,mdp.out.src.sid,SID_SIZE);
mdp.out.src.port=MDP_PORT_VOMP;
bcopy(call->remote.sid,mdp.out.dst.sid,SID_SIZE);
mdp.out.dst.port=MDP_PORT_VOMP;
mdp.out.payload[0]=0x01; /* Normal VoMP frame */
mdp.out.payload[1]=(call->remote.state<<4)|call->local.state;
mdp.out.payload[2]=(call->remote.sequence>>8)&0xff;
mdp.out.payload[3]=(call->remote.sequence>>0)&0xff;
mdp.out.payload[4]=(call->local.sequence>>8)&0xff;
mdp.out.payload[5]=(call->local.sequence>>0)&0xff;
time_ms_t call_millis = gettime_ms() - call->create_time;
mdp.out.payload[6]=(call_millis>>8)&0xff;
mdp.out.payload[7]=(call_millis>>0)&0xff;
mdp.out.payload[8]=(call->remote.session>>16)&0xff;
mdp.out.payload[9]=(call->remote.session>>8)&0xff;
mdp.out.payload[10]=(call->remote.session>>0)&0xff;
mdp.out.payload[11]=(call->local.session>>16)&0xff;
mdp.out.payload[12]=(call->local.session>>8)&0xff;
mdp.out.payload[13]=(call->local.session>>0)&0xff;
*len=14;
prepare_vomp_header(call, &mdp);
if (call->local.state < VOMP_STATE_RINGINGOUT && call->remote.state < VOMP_STATE_RINGINGOUT) {
/* Include src and dst phone numbers */
@ -338,42 +363,6 @@ int vomp_send_status_remote(struct vomp_call_state *call)
DEBUGF("mdp frame with codec list is %d bytes", mdp.out.payload_length);
}
if (call->local.state==VOMP_STATE_INCALL) {
unsigned char *p=&mdp.out.payload[0];
struct vomp_sample_block *sb=call->recent_samples;
int rotor=call->recent_sample_rotor%VOMP_MAX_RECENT_SAMPLES;
if (sb[rotor].len==vomp_sample_size(sb[rotor].codec)){
/* write the sample end-time in milliseconds since call establishment */
p[(*len)++]=(call->audio_clock>>24)&0xff;
p[(*len)++]=(call->audio_clock>>16)&0xff;
p[(*len)++]=(call->audio_clock>>8)&0xff;
p[(*len)++]=(call->audio_clock>>0)&0xff;
/* stuff frame with most recent sample blocks as a form of preemptive
retransmission. But don't make the packets too large. */
while (((*len)+1+sb[rotor].len) <VOMP_STUFF_BYTES && sb[rotor].len==vomp_sample_size(sb[rotor].codec)) {
p[(*len)++]=sb[rotor].codec;
bcopy(&sb[rotor].bytes[0],&p[*len],sb[rotor].len);
(*len)+=sb[rotor].len;
rotor--; if (rotor<0) rotor+=VOMP_MAX_RECENT_SAMPLES;
rotor%=VOMP_MAX_RECENT_SAMPLES;
// stop if we've run out of samples before we ran out of bytes
if ((!sb[rotor].endtime)||(sb[rotor].endtime+1==call->audio_clock)) break;
}
call->recent_sample_rotor++;
call->recent_sample_rotor%=VOMP_MAX_RECENT_SAMPLES;
}
}
/* XXX Here we act as our own client. This used to be able to block.
We should really refactor overlay_mdp_poll() so that we can deliver
the frame directly.
Make sure that we don't want (just drop the message if there is
congestion) */
overlay_mdp_dispatch(&mdp,0,NULL,0);
call->local.sequence++;
@ -383,54 +372,47 @@ int vomp_send_status_remote(struct vomp_call_state *call)
// copy audio into the rotor buffers
int vomp_received_audio(struct vomp_call_state *call, int audio_codec, const unsigned char *audio, int audio_length)
{
if (call->local.state!=VOMP_STATE_INCALL)
return -1;
int codec_block_size=vomp_sample_size(audio_codec);
int offset=0;
struct vomp_sample_block *sb=call->recent_samples;
int codec_duration = vomp_codec_timespan(audio_codec);
while(offset<audio_length){
overlay_mdp_frame mdp;
unsigned short *len=&mdp.out.payload_length;
int rotor=call->recent_sample_rotor%VOMP_MAX_RECENT_SAMPLES;
bzero(&mdp,sizeof(mdp));
prepare_vomp_header(call, &mdp);
if (sb[rotor].len==0 || call->audio_clock!=sb[rotor].endtime+1){
/*
What timestamp to attach to the sample?
Two obvious choices:
1. The sample is for the most recent n milliseconds; or
2. The sample is for the next n milliseconds following the
last sample.
Option 1 introduces all sorts of problems with sample production
jitter, where as option 2 has no such problems, but simply requires the
producer of audio to ensure that they provide exactly the right amount
of audio, or risk the call getting out of sync. This is a fairly
reasonable expectation, or else things go to pot.
Note that in-call slew is the responsibility of the player, not the
recorder of audio. Basically if the audio queue starts to bank up,
then the player needs to drop samples.
*/
sb[rotor].codec=audio_codec;
sb[rotor].len=0;
sb[rotor].starttime=call->audio_clock;
sb[rotor].endtime=call->audio_clock+vomp_codec_timespan(sb[rotor].codec)-1;
call->audio_clock=sb[rotor].endtime+1;
}else if(sb[rotor].codec!=audio_codec){
WHY("Did not finish previous audio buffer!!");
}
mdp.out.payload[(*len)++]=(call->audio_clock>>24)&0xff;
mdp.out.payload[(*len)++]=(call->audio_clock>>16)&0xff;
mdp.out.payload[(*len)++]=(call->audio_clock>>8)&0xff;
mdp.out.payload[(*len)++]=(call->audio_clock>>0)&0xff;
mdp.out.payload[(*len)++]=audio_codec;
int copy_size = (audio_length - offset);
if (copy_size > codec_block_size - sb[rotor].len)
copy_size=codec_block_size - sb[rotor].len;
if (offset+codec_block_size>audio_length)
codec_block_size = audio_length - offset;
bcopy(audio + offset,&sb[rotor].bytes[sb[rotor].len],copy_size);
sb[rotor].len+=copy_size;
offset+=copy_size;
bcopy(audio+offset,&mdp.out.payload[(*len)],codec_block_size);
(*len)+=codec_block_size;
offset+=codec_block_size;
// send audio whenever we get the right number of bytes.
if (sb[rotor].len>=codec_block_size){
vomp_send_status_remote(call);
}
call->audio_clock += codec_duration;
// send the payload more than once to add resilience to dropped packets
// TODO remove once network links have built in retries
mdp.out.send_copies=VOMP_MAX_RECENT_SAMPLES;
overlay_mdp_dispatch(&mdp,0,NULL,0);
call->local.sequence++;
}
return 0;
@ -442,7 +424,7 @@ int monitor_call_status(struct vomp_call_state *call)
int n = snprintf(msg,1024,"\nCALLSTATUS:%06x:%06x:%d:%d:%d:%s:%s:%s:%s\n",
call->local.session,call->remote.session,
call->local.state,call->remote.state,
call->fast_audio,
0,
alloca_tohex_sid(call->local.sid),
alloca_tohex_sid(call->remote.sid),
call->local.did,call->remote.did);
@ -546,12 +528,6 @@ int vomp_update(struct vomp_call_state *call)
return 0;
}
int vomp_call_start_audio(struct vomp_call_state *call)
{
call->audio_started=1;
return WHY("Not implemented");
}
// check a small circular buffer of recently seen audio
// we're not trying to be perfect here, we still expect all clients to reorder and filter duplicates
int vomp_audio_already_seen(struct vomp_call_state *call, unsigned int end_time)
@ -575,49 +551,40 @@ int vomp_process_audio(struct vomp_call_state *call,unsigned int sender_duration
/* Get end time marker for sample block collection */
unsigned int e=0, s=0;
e=mdp->in.payload[ofs++]<<24;
e|=mdp->in.payload[ofs++]<<16;
e|=mdp->in.payload[ofs++]<<8;
e|=mdp->in.payload[ofs++]<<0;
sender_duration = (e&0xFFFF0000)|sender_duration;
if (debug & DEBUG_VOMP)
DEBUGF("Jitter %d, %lld", sender_duration - e, (long long)((gettime_ms() - call->create_time) - e));
int sequence = call->remote.sequence;
while(ofs<mdp->in.payload_length)
if(ofs<mdp->in.payload_length)
{
int codec=mdp->in.payload[ofs];
// DEBUGF("Spotted a %s sample block",vomp_describe_codec(codec));
if (!codec||vomp_sample_size(codec)<0) break;
if ((ofs+1+vomp_sample_size(codec))>mdp->in.payload_length) break;
s=mdp->in.payload[ofs++]<<24;
s|=mdp->in.payload[ofs++]<<16;
s|=mdp->in.payload[ofs++]<<8;
s|=mdp->in.payload[ofs++]<<0;
/* work out start-time from end-time less duration of included sample(s).
XXX - Assumes only non-adaptive codecs. */
s = e-vomp_codec_timespan(codec)+1;
sender_duration = (s&0xFFFF0000)|sender_duration;
// simplistic jitter debug info
if (debug & DEBUG_VOMP)
DEBUGF("Jitter %d, %lld", sender_duration - s, (long long)((gettime_ms() - call->create_time) - s));
int codec=mdp->in.payload[ofs++];
int audio_len = mdp->in.payload_length - ofs;
if ((!codec)||vomp_sample_size(codec)<0) return -1;
e = s + vomp_codec_timespan(codec) - 1;
/* Pass audio frame to all registered listeners */
if (!vomp_audio_already_seen(call, e)){
if (monitor_socket_count)
monitor_send_audio(call, codec, s, e,
&mdp->in.payload[ofs+1],
vomp_sample_size(codec),
&mdp->in.payload[ofs],
audio_len,
sequence);
}
ofs+=1+vomp_sample_size(codec);
e=s-1;
sequence--;
}
return 0;
}
int vomp_call_stop_audio(struct vomp_call_state *call)
{
call->audio_started=0;
return WHY("Not implemented");
}
int vomp_ringing(struct vomp_call_state *call){
if (call){
if ((!call->initiated_call) && call->local.state<VOMP_STATE_RINGINGIN && call->remote.state==VOMP_STATE_RINGINGOUT){
@ -633,9 +600,6 @@ int vomp_ringing(struct vomp_call_state *call){
int vomp_call_destroy(struct vomp_call_state *call)
{
/* do some general clean ups */
if (call->audio_started) vomp_call_stop_audio(call);
if (debug & DEBUG_VOMP)
DEBUGF("Destroying call %s <--> %s", call->local.did,call->remote.did);
@ -715,7 +679,6 @@ int vomp_hangup(struct vomp_call_state *call)
if (call){
if (debug & DEBUG_VOMP)
DEBUG("Hanging up");
if (call->local.state==VOMP_STATE_INCALL) vomp_call_stop_audio(call);
vomp_update_local_state(call, VOMP_STATE_CALLENDED);
vomp_update(call);
}
@ -817,7 +780,6 @@ int vomp_mdp_received(overlay_mdp_frame *mdp)
/* For whatever reason, the far end has given up on the call,
so we must also move to CALLENDED no matter what state we were in */
if (call->audio_started) vomp_call_stop_audio(call);
recvr_state=VOMP_STATE_CALLENDED;
}
@ -897,9 +859,6 @@ int vomp_mdp_received(overlay_mdp_frame *mdp)
// Fall through
case (VOMP_STATE_INCALL<<3)|VOMP_STATE_INCALL:
/* play any audio that they have sent us. */
if (!call->audio_started) {
if (vomp_call_start_audio(call)) call->local.codec=VOMP_CODEC_ENGAGED;
}
vomp_process_audio(call,sender_duration,mdp);
break;