Refactor towards changing the vomp monitor interface

This commit is contained in:
Jeremy Lakeman 2012-08-06 14:46:46 +09:30
parent 7705676a9e
commit 736a8f43c0
3 changed files with 154 additions and 152 deletions

View File

@ -369,17 +369,6 @@ int monitor_process_command(struct monitor_context *c)
c->flags|=MONITOR_PEERS;
else if (!strcasecmp(cmd,"ignore peers"))
c->flags&=~MONITOR_PEERS;
else if (sscanf(cmd,"FASTAUDIO:%x:%d",&callSessionToken,&flag)==2)
{
// TODO half implemented
int i;
for(i=0;i<vomp_call_count;i++)
if (vomp_call_states[i].local.session==callSessionToken
||callSessionToken==0) {
vomp_call_states[i].fast_audio=flag;
break;
}
}
else if (sscanf(cmd,"call %s %s %s",sid,localDid,remoteDid)==3) {
DEBUG("here");
int gotSid = 0;
@ -411,26 +400,18 @@ int monitor_process_command(struct monitor_context *c)
vomp_dial(keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key, (unsigned char *)sid, localDid, remoteDid);
}
}
}
else if (sscanf(cmd,"status %x",&callSessionToken)==1) {
int i;
for(i=0;i<vomp_call_count;i++)
if (vomp_call_states[i].local.session==callSessionToken
||callSessionToken==0) {
monitor_call_status(&vomp_call_states[i]);
}
} else if (sscanf(cmd,"ringing %x",&callSessionToken)==1) {
vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
struct vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
vomp_ringing(call);
} else if (sscanf(cmd,"pickup %x",&callSessionToken)==1) {
vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
struct vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
vomp_pickup(call);
}
else if (sscanf(cmd,"hangup %x",&callSessionToken)==1) {
vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
struct vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
vomp_hangup(call);
} else if (sscanf(cmd,"dtmf %x %s",&callSessionToken,digits)==2) {
vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
struct vomp_call_state *call=vomp_find_call_by_session(callSessionToken);
if (call){
int i;
for(i=0;i<strlen(digits);i++) {
@ -466,7 +447,7 @@ int monitor_process_data(struct monitor_context *c)
RETURN(-1);
}
vomp_call_state *call=vomp_find_call_by_session(c->sample_call_session_token);
struct vomp_call_state *call=vomp_find_call_by_session(c->sample_call_session_token);
if (!call) {
write_str(c->alarm.poll.fd,"\nERROR:No such call\n");
RETURN(-1);
@ -507,33 +488,6 @@ int monitor_announce_bundle(rhizome_manifest *m)
return 0;
}
int monitor_call_status(vomp_call_state *call)
{
int i;
char msg[1024];
IN();
snprintf(msg,1024,"\nCALLSTATUS:%06x:%06x:%d:%d:%d:%s:%s:%s:%s\n",
call->local.session,call->remote.session,
call->local.state,call->remote.state,
call->fast_audio,
alloca_tohex_sid(call->local.sid),
alloca_tohex_sid(call->remote.sid),
call->local.did,call->remote.did);
msg[1023]=0;
for(i=monitor_socket_count -1;i>=0;i--) {
if (!(monitor_sockets[i].flags&MONITOR_VOMP))
continue;
if ( set_nonblock(monitor_sockets[i].alarm.poll.fd) == -1
|| write_str_nonblock(monitor_sockets[i].alarm.poll.fd, msg) == -1
|| set_block(monitor_sockets[i].alarm.poll.fd) == -1
) {
INFOF("Tearing down monitor client #%d", i);
monitor_client_close(&monitor_sockets[i]);
}
}
RETURN(0);
}
int monitor_announce_peer(const unsigned char *sid)
{
char msg[1024];
@ -550,27 +504,13 @@ int monitor_announce_unreachable_peer(const unsigned char *sid)
return 0;
}
int monitor_send_audio(vomp_call_state *call, int audio_codec, unsigned int start_time, unsigned int end_time, const unsigned char *audio, int audio_length)
{
if (0) DEBUGF("Tell call monitor about audio for call %06x:%06x",
call->local.session,call->remote.session);
int sample_bytes=vomp_sample_size(audio_codec);
char msg[1024 + MAX_AUDIO_BYTES];
/* All commands followed by binary data start with *len:, so that
they can be easily parsed at the far end, even if not supported.
Put newline at start of these so that receiving data in command
mode doesn't confuse the parser. */
int msglen = snprintf(msg, 1024,
"\n*%d:AUDIOPACKET:%06x:%06x:%d:%d:%d:%d:%d\n",
sample_bytes,
call->local.session,call->remote.session,
call->local.state,call->remote.state,
audio_codec, start_time, end_time);
bcopy(audio, &msg[msglen], sample_bytes);
msglen+=sample_bytes;
msg[msglen++]='\n';
monitor_tell_clients(msg, msglen, MONITOR_VOMP);
// test if any monitor clients are interested in a particular type of event
int monitor_client_interested(int mask){
int i;
for(i=monitor_socket_count -1;i>=0;i--) {
if (monitor_sockets[i].flags & mask)
return 1;
}
return 0;
}

View File

@ -1028,32 +1028,8 @@ typedef struct vomp_sample_block {
unsigned char bytes[1024];
} vomp_sample_block;
typedef struct vomp_call_state {
struct sched_ent alarm;
vomp_call_half local;
vomp_call_half remote;
int initiated_call;
int fast_audio;
time_ms_t create_time;
time_ms_t last_activity;
time_ms_t audio_clock;
int audio_started;
// last local & remote status we sent to all interested parties
int last_sent_status;
unsigned char remote_codec_list[256];
int recent_sample_rotor;
vomp_sample_block recent_samples[VOMP_MAX_RECENT_SAMPLES];
int sample_pos;
unsigned int seen_samples[VOMP_MAX_RECENT_SAMPLES *4];
} vomp_call_state;
extern int vomp_call_count;
extern int vomp_active_call;
extern vomp_call_state vomp_call_states[VOMP_MAX_CALLS];
vomp_call_state *vomp_find_call_by_session(int session_token);
struct vomp_call_state;
struct vomp_call_state *vomp_find_call_by_session(int session_token);
int vomp_mdp_event(overlay_mdp_frame *mdp,
struct sockaddr_un *recvaddr,int recvaddrlen);
int vomp_mdp_received(overlay_mdp_frame *mdp);
@ -1062,10 +1038,10 @@ int vomp_sample_size(int c);
int vomp_codec_timespan(int c);
int vomp_parse_dtmf_digit(char c);
int vomp_dial(unsigned char *local_sid, unsigned char *remote_sid, char *local_did, char *remote_did);
int vomp_pickup(vomp_call_state *call);
int vomp_hangup(vomp_call_state *call);
int vomp_ringing(vomp_call_state *call);
int vomp_send_status_remote_audio(vomp_call_state *call, int audio_codec, const unsigned char *audio, int audio_length);
int vomp_pickup(struct vomp_call_state *call);
int vomp_hangup(struct vomp_call_state *call);
int vomp_ringing(struct vomp_call_state *call);
int vomp_send_status_remote_audio(struct vomp_call_state *call, int audio_codec, const unsigned char *audio, int audio_length);
typedef struct command_line_option {
@ -1113,11 +1089,10 @@ int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_setup_sockets();
int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax);
int monitor_call_status(vomp_call_state *call);
int monitor_send_audio(vomp_call_state *call, int audio_codec, unsigned int start_time, unsigned int end_time, const unsigned char *audio, int audio_length);
int monitor_announce_peer(const unsigned char *sid);
int monitor_announce_unreachable_peer(const unsigned char *sid);
int monitor_tell_clients(char *msg, int msglen, int mask);
int monitor_client_interested(int mask);
extern int monitor_socket_count;

183
vomp.c
View File

@ -27,13 +27,76 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "serval.h"
#include "strbuf.h"
/*
Typical call state lifecycle between 2 parties.
Legend;
# incoming command from monitor client
$ outgoing monitor status
<> vomp packet with state change sent across the network
Monitor Init
# MONITOR VOMP
Dialing
# CALL [sid] [myDid] [TheirDid] [token]
> CALLPREP + codecs
$ CALLTO [token]
< NOCALL + codecs
> RINGOUT
$ CALLFROM [token] [sid] [myDid] [TheirDid]
# RING [token]
< RINGIN (they start ringing)
$ RINGING [token]
Answering
# PICKUP [token]
< INCALL
> INCALL
$ INCALL [token]
$ INCALL [token]
Tell any clients that the call hasn't timed out yet
(if servald is behaving this should be redundant, if it isn't behaving how do we hangup?)
$ KEEPALIVE [token]
Hanging up (may also be triggered on network or call establishment timeout)
# HANGUP [token]
> CALLENDED
$ HANGUP [token]
< CALLENDED
$ HANGUP [token]
*/
struct vomp_call_state {
struct sched_ent alarm;
vomp_call_half local;
vomp_call_half remote;
int initiated_call;
int fast_audio;
time_ms_t create_time;
time_ms_t last_activity;
time_ms_t audio_clock;
int audio_started;
// last local & remote status we sent to all interested parties
int last_sent_status;
unsigned char remote_codec_list[256];
int recent_sample_rotor;
vomp_sample_block recent_samples[VOMP_MAX_RECENT_SAMPLES];
int sample_pos;
unsigned int seen_samples[VOMP_MAX_RECENT_SAMPLES *4];
};
/* Although we only support one call at a time, we allow for multiple call states.
This is partly to deal with denial of service attacks that might occur by causing
the ejection of newly allocated session numbers before the caller has had a chance
to progress the call to a further state. */
int vomp_call_count=0;
int vomp_active_call=-1;
vomp_call_state vomp_call_states[VOMP_MAX_CALLS];
struct vomp_call_state vomp_call_states[VOMP_MAX_CALLS];
struct profile_total vomp_stats;
static void vomp_process_tick(struct sched_ent *alarm);
@ -51,7 +114,7 @@ struct sockaddr_un *vomp_interested_usocks[VOMP_MAX_INTERESTED];
int vomp_interested_usock_lengths[VOMP_MAX_INTERESTED];
time_ms_t vomp_interested_expiries[VOMP_MAX_INTERESTED];
vomp_call_state *vomp_find_call_by_session(int session_token)
struct vomp_call_state *vomp_find_call_by_session(int session_token)
{
int i;
for(i=0;i<vomp_call_count;i++)
@ -81,7 +144,7 @@ int vomp_generate_session_id()
return session_id;
}
vomp_call_state *vomp_create_call(unsigned char *remote_sid,
struct vomp_call_state *vomp_create_call(unsigned char *remote_sid,
unsigned char *local_sid,
unsigned int remote_session,
unsigned int local_session,
@ -92,11 +155,11 @@ vomp_call_state *vomp_create_call(unsigned char *remote_sid,
if (!local_session)
local_session=vomp_generate_session_id();
vomp_call_state *call = &vomp_call_states[vomp_call_count];
struct vomp_call_state *call = &vomp_call_states[vomp_call_count];
vomp_call_count++;
/* prepare slot */
bzero(call,sizeof(vomp_call_state));
bzero(call,sizeof(struct vomp_call_state));
bcopy(local_sid,call->local.sid,SID_SIZE);
bcopy(remote_sid,call->remote.sid,SID_SIZE);
call->local.session=local_session;
@ -121,7 +184,7 @@ vomp_call_state *vomp_create_call(unsigned char *remote_sid,
return call;
}
vomp_call_state *vomp_find_or_create_call(unsigned char *remote_sid,
struct vomp_call_state *vomp_find_or_create_call(unsigned char *remote_sid,
unsigned char *local_sid,
unsigned int sender_session,
unsigned int recvr_session,
@ -129,7 +192,7 @@ vomp_call_state *vomp_find_or_create_call(unsigned char *remote_sid,
int recvr_state)
{
int i;
vomp_call_state *call;
struct vomp_call_state *call;
if (debug & DEBUG_VOMP)
DEBUGF("%d calls already in progress.",vomp_call_count);
@ -190,7 +253,7 @@ vomp_call_state *vomp_find_or_create_call(unsigned char *remote_sid,
/* send updated call status to end-point and to any interested listeners as
appropriate */
int vomp_send_status_remote_audio(vomp_call_state *call, int audio_codec, const unsigned char *audio, int audio_length)
int vomp_send_status_remote_audio(struct vomp_call_state *call, int audio_codec, const unsigned char *audio, int audio_length)
{
overlay_mdp_frame mdp;
@ -300,12 +363,12 @@ int vomp_send_status_remote_audio(vomp_call_state *call, int audio_codec, const
return 0;
}
int vomp_send_status_remote(vomp_call_state *call)
int vomp_send_status_remote(struct vomp_call_state *call)
{
return vomp_send_status_remote_audio(call, 0, NULL, 0);
}
int vomp_send_mdp_status_audio(vomp_call_state *call, int audio_codec, unsigned int start_time, unsigned int end_time, const unsigned char *audio, int audio_length)
int vomp_send_mdp_status_audio(struct vomp_call_state *call, int audio_codec, unsigned int start_time, unsigned int end_time, const unsigned char *audio, int audio_length)
{
if (audio && audio_length && vomp_sample_size(audio_codec)!=audio_length)
return WHY("Audio frame is the wrong length");
@ -354,13 +417,52 @@ int vomp_send_mdp_status_audio(vomp_call_state *call, int audio_codec, unsigned
return 0;
}
int vomp_send_mdp_status(vomp_call_state *call)
int vomp_send_mdp_status(struct vomp_call_state *call)
{
return vomp_send_mdp_status_audio(call,0,0,0,NULL,0);
}
int monitor_call_status(struct vomp_call_state *call)
{
char msg[1024];
int n = snprintf(msg,1024,"\nCALLSTATUS:%06x:%06x:%d:%d:%d:%s:%s:%s:%s\n",
call->local.session,call->remote.session,
call->local.state,call->remote.state,
call->fast_audio,
alloca_tohex_sid(call->local.sid),
alloca_tohex_sid(call->remote.sid),
call->local.did,call->remote.did);
monitor_tell_clients(msg, n, MONITOR_VOMP);
return 0;
}
int monitor_send_audio(struct vomp_call_state *call, int audio_codec, unsigned int start_time, unsigned int end_time, const unsigned char *audio, int audio_length)
{
if (0) DEBUGF("Tell call monitor about audio for call %06x:%06x",
call->local.session,call->remote.session);
int sample_bytes=vomp_sample_size(audio_codec);
char msg[1024 + MAX_AUDIO_BYTES];
/* All commands followed by binary data start with *len:, so that
they can be easily parsed at the far end, even if not supported.
Put newline at start of these so that receiving data in command
mode doesn't confuse the parser. */
int msglen = snprintf(msg, 1024,
"\n*%d:AUDIOPACKET:%06x:%06x:%d:%d:%d:%d:%d\n",
sample_bytes,
call->local.session,call->remote.session,
call->local.state,call->remote.state,
audio_codec, start_time, end_time);
bcopy(audio, &msg[msglen], sample_bytes);
msglen+=sample_bytes;
msg[msglen++]='\n';
monitor_tell_clients(msg, msglen, MONITOR_VOMP);
return 0;
}
// send call state updates if required.
int vomp_update(vomp_call_state *call)
int vomp_update(struct vomp_call_state *call)
{
int combined_status=(call->remote.state<<4)|call->local.state;
@ -376,7 +478,7 @@ int vomp_update(vomp_call_state *call)
vomp_send_status_remote(call);
// tell monitor clients
if (monitor_socket_count)
if (monitor_socket_count && monitor_client_interested(MONITOR_VOMP))
monitor_call_status(call);
// tell mdp clients
@ -386,7 +488,7 @@ int vomp_update(vomp_call_state *call)
return 0;
}
int vomp_call_start_audio(vomp_call_state *call)
int vomp_call_start_audio(struct vomp_call_state *call)
{
call->audio_started=1;
return WHY("Not implemented");
@ -394,7 +496,7 @@ int vomp_call_start_audio(vomp_call_state *call)
// check a small circular buffer of recently seen audio
// we're not trying to be perfect here, we still expect all clients to reorder and filter duplicates
int vomp_audio_already_seen(vomp_call_state *call, unsigned int end_time)
int vomp_audio_already_seen(struct vomp_call_state *call, unsigned int end_time)
{
int i;
for(i=0;i<VOMP_MAX_RECENT_SAMPLES *4;i++)
@ -407,7 +509,7 @@ int vomp_audio_already_seen(vomp_call_state *call, unsigned int end_time)
return 0;
}
int vomp_process_audio(vomp_call_state *call,unsigned int sender_duration,overlay_mdp_frame *mdp)
int vomp_process_audio(struct vomp_call_state *call,unsigned int sender_duration,overlay_mdp_frame *mdp)
{
int ofs=14;
// if (mdp->in.payload_length>14)
@ -455,13 +557,13 @@ int vomp_process_audio(vomp_call_state *call,unsigned int sender_duration,overla
return 0;
}
int vomp_call_stop_audio(vomp_call_state *call)
int vomp_call_stop_audio(struct vomp_call_state *call)
{
call->audio_started=0;
return WHY("Not implemented");
}
int vomp_ringing(vomp_call_state *call){
int vomp_ringing(struct vomp_call_state *call){
if (call){
if (debug & DEBUG_VOMP)
DEBUGF("RING RING!");
@ -473,7 +575,7 @@ int vomp_ringing(vomp_call_state *call){
return 0;
}
int vomp_call_destroy(vomp_call_state *call)
int vomp_call_destroy(struct vomp_call_state *call)
{
/* do some general clean ups */
if (call->audio_started) vomp_call_stop_audio(call);
@ -495,7 +597,7 @@ int vomp_call_destroy(vomp_call_state *call)
unschedule(&vomp_call_states[vomp_call_count].alarm);
bcopy(&vomp_call_states[vomp_call_count],
call,
sizeof(vomp_call_state));
sizeof(struct vomp_call_state));
schedule(&call->alarm);
}
return 0;
@ -515,7 +617,7 @@ int vomp_dial(unsigned char *local_sid, unsigned char *remote_sid, char *local_d
/* allocate unique call session token, which is how the client will
refer to this call during its life */
vomp_call_state *call=vomp_create_call(
struct vomp_call_state *call=vomp_create_call(
remote_sid,
local_sid,
0,
@ -534,7 +636,7 @@ int vomp_dial(unsigned char *local_sid, unsigned char *remote_sid, char *local_d
return 0;
}
int vomp_pickup(vomp_call_state *call)
int vomp_pickup(struct vomp_call_state *call)
{
if (call){
if (debug && DEBUG_VOMP)
@ -550,7 +652,7 @@ int vomp_pickup(vomp_call_state *call)
return 0;
}
int vomp_hangup(vomp_call_state *call)
int vomp_hangup(struct vomp_call_state *call)
{
if (call){
if (debug & DEBUG_VOMP)
@ -678,7 +780,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp, struct sockaddr_un *recvaddr,int recv
case VOMPEVENT_CALLINFO:
{
/* provide call endpoint info to user */
vomp_call_state *call
struct vomp_call_state *call
=vomp_find_call_by_session(mdp->vompevent.call_session_token);
/* collect call info and send to requestor */
@ -734,7 +836,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp, struct sockaddr_un *recvaddr,int recv
break;
case VOMPEVENT_CALLREJECT: /* hangup is the same */
{
vomp_call_state *call
struct vomp_call_state *call
=vomp_find_call_by_session(mdp->vompevent.call_session_token);
if (!call)
return overlay_mdp_reply_error(mdp_named.poll.fd,recvaddr, recvaddrlen, 4006, "No such call");
@ -744,7 +846,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp, struct sockaddr_un *recvaddr,int recv
break;
case VOMPEVENT_PICKUP:
{
vomp_call_state *call = vomp_find_call_by_session(mdp->vompevent.call_session_token);
struct vomp_call_state *call = vomp_find_call_by_session(mdp->vompevent.call_session_token);
if (!call)
return overlay_mdp_reply_error
(mdp_named.poll.fd,recvaddr,recvaddrlen,4006,
@ -762,7 +864,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp, struct sockaddr_un *recvaddr,int recv
case VOMPEVENT_AUDIOPACKET: /* user supplying audio */
{
// DEBUG("Audio packet arrived");
vomp_call_state *call = vomp_find_call_by_session(mdp->vompevent.call_session_token);
struct vomp_call_state *call = vomp_find_call_by_session(mdp->vompevent.call_session_token);
if (call) {
return vomp_send_status_remote_audio(call,
mdp->vompevent.audio_sample_codec,
@ -784,7 +886,7 @@ int vomp_mdp_event(overlay_mdp_frame *mdp, struct sockaddr_un *recvaddr,int recv
return WHY("Not implemented");
}
int vomp_extract_remote_codec_list(vomp_call_state *call,overlay_mdp_frame *mdp)
int vomp_extract_remote_codec_list(struct vomp_call_state *call,overlay_mdp_frame *mdp)
{
int i;
if (debug & DEBUG_VOMP)
@ -798,23 +900,6 @@ int vomp_extract_remote_codec_list(vomp_call_state *call,overlay_mdp_frame *mdp)
return 0;
}
/*
Simplified call state lifecycle;
(we dial)
> CALLPREP + codecs
< NOCALL + codecs
> RINGOUT
< RINGIN (they start ringing)
(we start "they are ringing" tone)
(they pickup)
< INCALL
> INCALL
(either hangup, or state error)
> CALLENDED
< CALLENDED
*/
/* At this point we know the MDP frame is addressed to the VoMP port, but
we have not inspected the contents. As these frames are wire-format, we
must pay attention to endianness. */
@ -828,7 +913,7 @@ int vomp_mdp_received(overlay_mdp_frame *mdp)
/* only auth-crypted frames make it this far */
vomp_call_state *call=NULL;
struct vomp_call_state *call=NULL;
switch(mdp->in.payload[0]) {
case 0x01: /* Ordinary VoMP state+optional audio frame */
@ -862,11 +947,13 @@ int vomp_mdp_received(overlay_mdp_frame *mdp)
if (!recvr_session && (debug & DEBUG_VOMP))
DEBUG("recvr_session==0, created call");
// TODO ignore state changes if sequence is stale?
// TODO ignore state changes that seem to go backwards?
if ((!vomp_interested_usock_count)
&&(!monitor_socket_count))
&&(!monitor_socket_count)
&&(!monitor_client_interested(MONITOR_VOMP)))
{
/* No registered listener, so we cannot answer the call, so just reject
it. */
@ -1413,7 +1500,7 @@ static void vomp_process_tick(struct sched_ent *alarm)
int len;
time_ms_t now = gettime_ms();
vomp_call_state *call = (vomp_call_state *)alarm;
struct vomp_call_state *call = (struct vomp_call_state *)alarm;
/* See if any calls need to begin expiring
(current timeout is set at 2 minutes) */