Refactor vomp call processing to reduce latency

This commit is contained in:
Jeremy Lakeman 2012-07-05 15:52:21 +09:30
parent ef8c1dd981
commit 7b8e82baea
5 changed files with 607 additions and 665 deletions

150
monitor.c
View File

@ -359,6 +359,7 @@ int monitor_process_command(struct monitor_context *c)
}
}
else if (!strcasecmp(cmd,"monitor vomp"))
// TODO add supported codec list argument
c->flags|=MONITOR_VOMP;
else if (!strcasecmp(cmd,"ignore vomp"))
c->flags&=~MONITOR_VOMP;
@ -372,13 +373,13 @@ int monitor_process_command(struct monitor_context *c)
c->flags&=~MONITOR_PEERS;
else if (sscanf(cmd,"FASTAUDIO:%x:%d",&callSessionToken,&flag)==2)
{
// TODO half implemented
int i;
for(i=0;i<vomp_call_count;i++)
if (vomp_call_states[i].local.session==callSessionToken
||callSessionToken==0) {
vomp_call_states[i].fast_audio=flag;
vomp_call_states[i].local.last_state=-1;
monitor_call_status(&vomp_call_states[i]);
break;
}
}
else if (sscanf(cmd,"call %s %s %s",sid,localDid,remoteDid)==3) {
@ -394,60 +395,58 @@ int monitor_process_command(struct monitor_context *c)
tohex(sid, overlay_nodes[bin][slot].sid, SID_SIZE);
break;
}
}else{
// pack the binary representation of the sid into the same buffer.
stowSid(sid,0,sid);
}
mdp.vompevent.flags=VOMPEVENT_DIAL;
int cn=0,in=0,kp=0;
if(!keyring_next_identity(keyring,&cn,&in,&kp))
{
WRITE_STR(c->alarm.poll.fd,"\nERROR:no local identity, so cannot place call\n");
}
else {
bcopy(keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key,
&mdp.vompevent.local_sid[0], SID_SIZE);
stowSid(&mdp.vompevent.remote_sid[0],0,sid);
vomp_mdp_event(&mdp,NULL,0);
vomp_dial(keyring->contexts[cn]->identities[in]
->keypairs[kp]->public_key, sid, localDid, remoteDid);
}
DEBUG("here");
}
else if (sscanf(cmd,"status %x",&callSessionToken)==1) {
int i;
for(i=0;i<vomp_call_count;i++)
if (vomp_call_states[i].local.session==callSessionToken
||callSessionToken==0) {
vomp_call_states[i].local.last_state=0;
monitor_call_status(&vomp_call_states[i]);
}
} else if (sscanf(cmd,"pickup %x",&callSessionToken)==1) {
mdp.vompevent.flags=VOMPEVENT_PICKUP;
mdp.vompevent.call_session_token=callSessionToken;
vomp_mdp_event(&mdp,NULL,0);
vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
vomp_pickup(call);
}
else if (sscanf(cmd,"hangup %x",&callSessionToken)==1) {
mdp.vompevent.flags=VOMPEVENT_HANGUP;
mdp.vompevent.call_session_token=callSessionToken;
vomp_mdp_event(&mdp,NULL,0);
vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
vomp_hangup(call);
} else if (sscanf(cmd,"dtmf %x %s",&callSessionToken,digits)==2) {
mdp.vompevent.flags=VOMPEVENT_AUDIOPACKET;
mdp.vompevent.call_session_token=callSessionToken;
/* One digit per sample block. */
mdp.vompevent.audio_sample_codec=VOMP_CODEC_DTMF;
mdp.vompevent.audio_sample_bytes=1;
int i;
for(i=0;i<strlen(digits);i++) {
int digit=vomp_parse_dtmf_digit(digits[i]);
if (digit<0) {
snprintf(msg,1024,"\nERROR: invalid DTMF digit 0x%02x\n",digit);
WRITE_STR(c->alarm.poll.fd,msg);
vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
if (call){
/* One digit per sample block. */
mdp.vompevent.audio_sample_codec=VOMP_CODEC_DTMF;
mdp.vompevent.audio_sample_bytes=1;
int i;
for(i=0;i<strlen(digits);i++) {
int digit=vomp_parse_dtmf_digit(digits[i]);
if (digit<0) {
snprintf(msg,1024,"\nERROR: invalid DTMF digit 0x%02x\n",digit);
WRITE_STR(c->alarm.poll.fd,msg);
}
/* 80ms standard tone duration, so that it is a multiple
of the majority of codec time units (70ms is the nominal
DTMF tone length for most systems). */
char code = digit <<4;
vomp_send_status_remote_audio(call, VOMP_CODEC_DTMF, &code, 1);
}
mdp.vompevent.audio_bytes[mdp.vompevent.audio_sample_bytes]
=(digit<<4); /* 80ms standard tone duration, so that it is a multiple
of the majority of codec time units (70ms is the nominal
DTMF tone length for most systems). */
if (overlay_mdp_send(&mdp,0,0)) WHY("Send DTMF failed.");
}
}
snprintf(msg,1024,"\nMONITORSTATUS:%d\n",c->flags);
@ -458,33 +457,25 @@ int monitor_process_command(struct monitor_context *c)
int monitor_process_data(struct monitor_context *c)
{
IN();
/* Called when we have received an entire data sample */
c->state=MONITOR_STATE_COMMAND;
if (vomp_sample_size(c->sample_codec)!=c->data_offset) {
WARNF("Ignoring sample block of incorrect size (expected %d, got %d bytes for codec %d)",
vomp_sample_size(c->sample_codec), c->data_offset, c->sample_codec);
return -1;
RETURN(-1);
}
vomp_call_state *call=vomp_find_call_by_session(c->sample_call_session_token);
if (!call) {
WRITE_STR(c->alarm.poll.fd,"\nERROR:No such call\n");
return -1;
RETURN(-1);
}
overlay_mdp_frame mdp;
mdp.packetTypeAndFlags=MDP_VOMPEVENT;
mdp.vompevent.flags=VOMPEVENT_AUDIOPACKET;
mdp.vompevent.call_session_token=c->sample_call_session_token;
mdp.vompevent.audio_sample_codec=c->sample_codec;
bcopy(&c->buffer[0],&mdp.vompevent.audio_bytes[0],
vomp_sample_size(c->sample_codec));
mdp.vompevent.audio_sample_bytes=vomp_sample_size(c->sample_codec);
vomp_send_status_remote_audio(call, c->sample_codec, &c->buffer[0], vomp_sample_size(c->sample_codec));
if (overlay_mdp_send(&mdp,0,0)) WARN("Send audio failed.");
return 0;
RETURN(0);
}
int monitor_announce_bundle(rhizome_manifest *m)
@ -528,38 +519,30 @@ int monitor_call_status(vomp_call_state *call)
{
int i;
char msg[1024];
int show=0;
IN();
if (call->local.state>call->local.last_state) show=1;
if (call->remote.state>call->remote.last_state) show=1;
call->local.last_state=call->local.state;
call->remote.last_state=call->remote.state;
if (show) {
if (0) DEBUG("sending call status to monitor");
snprintf(msg,1024,"\nCALLSTATUS:%06x:%06x:%d:%d:%d:%s:%s:%s:%s\n",
call->local.session,call->remote.session,
call->local.state,call->remote.state,
call->fast_audio,
alloca_tohex_sid(call->local.sid),
alloca_tohex_sid(call->remote.sid),
call->local.did,call->remote.did);
msg[1023]=0;
for(i=monitor_socket_count -1;i>=0;i--)
{
if (!(monitor_sockets[i].flags&MONITOR_VOMP))
continue;
errno=0;
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
WRITE_STR(monitor_sockets[i].alarm.poll.fd,msg);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHY_perror("write");
INFOF("Tearing down monitor client #%d", i);
monitor_client_close(&monitor_sockets[i]);
}
snprintf(msg,1024,"\nCALLSTATUS:%06x:%06x:%d:%d:%d:%s:%s:%s:%s\n",
call->local.session,call->remote.session,
call->local.state,call->remote.state,
call->fast_audio,
alloca_tohex_sid(call->local.sid),
alloca_tohex_sid(call->remote.sid),
call->local.did,call->remote.did);
msg[1023]=0;
for(i=monitor_socket_count -1;i>=0;i--)
{
if (!(monitor_sockets[i].flags&MONITOR_VOMP))
continue;
errno=0;
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
WRITE_STR(monitor_sockets[i].alarm.poll.fd,msg);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHY_perror("write");
INFOF("Tearing down monitor client #%d", i);
monitor_client_close(&monitor_sockets[i]);
}
}
}
RETURN(0);
}
@ -571,25 +554,24 @@ int monitor_announce_peer(const unsigned char *sid)
return 0;
}
int monitor_send_audio(vomp_call_state *call,overlay_mdp_frame *audio)
int monitor_send_audio(vomp_call_state *call, int audio_codec, unsigned int start_time, unsigned int end_time, const unsigned char *audio, int audio_length)
{
if (0) DEBUGF("Tell call monitor about audio for call %06x:%06x",
call->local.session,call->remote.session);
int sample_bytes=vomp_sample_size(audio->vompevent.audio_sample_codec);
int sample_bytes=vomp_sample_size(audio_codec);
char msg[1024 + MAX_AUDIO_BYTES];
/* All commands followed by binary data start with *len:, so that
they can be easily parsed at the far end, even if not supported.
Put newline at start of these so that receiving data in command
mode doesn't confuse the parser. */
int msglen = snprintf(msg, 1024,
"\n*%d:AUDIOPACKET:%06x:%06x:%d:%d:%d:%lld:%lld\n",
"\n*%d:AUDIOPACKET:%06x:%06x:%d:%d:%d:%d:%d\n",
sample_bytes,
call->local.session,call->remote.session,
call->local.state,call->remote.state,
audio->vompevent.audio_sample_codec,
audio->vompevent.audio_sample_starttime,
audio->vompevent.audio_sample_endtime);
bcopy(&audio->vompevent.audio_bytes[0], &msg[msglen], sample_bytes);
audio_codec, start_time, end_time);
bcopy(audio, &msg[msglen], sample_bytes);
msglen+=sample_bytes;
msg[msglen++]='\n';
monitor_tell_clients(msg, msglen, MONITOR_VOMP);

View File

@ -266,10 +266,8 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
} else {
if (overlay_get_nexthop(p->destination,p->nexthop,&nexthoplen,
&interface)) {
return WHY("Failed to resolve nexthop for voice packet");
}
if (interface==-1&&(!broadcast)) {
return WHY("Failed to determine interface for sending voice packet");
// (we don't need another log message here)
return -1;
}
}

View File

@ -393,10 +393,10 @@ int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *nexthoplen,
if (neh->scores[i]>neh->scores[*interface]) *interface=i;
}
if (neh->scores[*interface]<1) {
if (debug&DEBUG_OVERLAYROUTING) {
*interface=-1;
DEBUGF("No open path to %s",alloca_tohex_sid(neh->node->sid));
}
if (1||debug&DEBUG_OVERLAYROUTING)
DEBUGF("No open path to %s, neighbour score <=0",alloca_tohex_sid(neh->node->sid));
return -1;
}
if (0) DEBUGF("nexthop is %s",alloca_tohex_sid(nexthop));
@ -428,10 +428,14 @@ int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *nexthoplen,
if (best_o>-1) {
return 0;
} else {
return -1; // WHYF("No open path to %s",alloca_tohex_sid(d));
if (1||debug&DEBUG_OVERLAYROUTING)
DEBUGF("No open path to %s, no good neighbour observations",alloca_tohex_sid(d));
return -1;
}
} else {
return -1; // WHYF("No open path to %s",alloca_tohex_sid(d));
if (1||debug&DEBUG_OVERLAYROUTING)
DEBUGF("No open path to %s, unknown peer",alloca_tohex_sid(d));
return -1;
}
}
}

View File

@ -1373,7 +1373,6 @@ typedef struct vomp_call_half {
unsigned char sid[SID_SIZE];
unsigned char did[64];
unsigned char state;
unsigned char last_state; // last state communicated to monitoring parties
unsigned char codec;
unsigned int session;
#define VOMP_SESSION_MASK 0xffffff
@ -1394,16 +1393,21 @@ typedef struct vomp_call_state {
struct sched_ent alarm;
vomp_call_half local;
vomp_call_half remote;
int initiated_call;
int ringing;
int fast_audio;
unsigned long long create_time;
unsigned long long last_activity;
unsigned long long audio_clock;
int audio_started;
// last local & remote status we sent to all interested parties
int last_sent_status;
unsigned char remote_codec_list[256];
int recent_sample_rotor;
vomp_sample_block recent_samples[VOMP_MAX_RECENT_SAMPLES];
int sample_pos;
unsigned int seen_samples[VOMP_MAX_RECENT_SAMPLES *4];
} vomp_call_state;
extern int vomp_call_count;
@ -1438,13 +1442,6 @@ extern vomp_call_state vomp_call_states[VOMP_MAX_CALLS];
#define VOMP_CALL_TIMEOUT 120000
#define VOMP_CALL_STATUS_INTERVAL 1000
#define VOMP_TELLINTERESTED (1<<0)
#define VOMP_TELLREMOTE (1<<1)
#define VOMP_NEWCALL (1<<2)
#define VOMP_FORCETELLREMOTE ((1<<3)|VOMP_TELLREMOTE)
#define VOMP_TELLCODECS (1<<4)
#define VOMP_SENDAUDIO (1<<5)
vomp_call_state *vomp_find_call_by_session(int session_token);
int vomp_mdp_event(overlay_mdp_frame *mdp,
struct sockaddr_un *recvaddr,int recvaddrlen);
@ -1454,8 +1451,11 @@ char *vomp_describe_codec(int c);
int vomp_tick_interval();
int vomp_sample_size(int c);
int vomp_codec_timespan(int c);
int vomp_send_status(vomp_call_state *call,int flags,overlay_mdp_frame *arg);
int vomp_parse_dtmf_digit(char c);
int vomp_dial(unsigned char *local_sid, unsigned char *remote_sid, char *local_did, char *remote_did);
int vomp_pickup(vomp_call_state *call);
int vomp_hangup(vomp_call_state *call);
typedef struct command_line_option {
int (*function)(int argc, const char *const *argv, struct command_line_option *o);
@ -1503,7 +1503,7 @@ int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_setup_sockets();
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_call_status(vomp_call_state *call);
int monitor_send_audio(vomp_call_state *call,overlay_mdp_frame *audio);
int monitor_send_audio(vomp_call_state *call, int audio_codec, unsigned int start_time, unsigned int end_time, const unsigned char *audio, int audio_length);
int monitor_announce_peer(const unsigned char *sid);
int monitor_tell_clients(char *msg, int msglen, int mask);
extern int monitor_socket_count;

1080
vomp.c

File diff suppressed because it is too large Load Diff