Setup scheduled alarm per vomp call and send keep alives

This commit is contained in:
Jeremy Lakeman 2012-07-03 15:13:39 +09:30
parent 495de9e0ec
commit 3d39e92628
5 changed files with 160 additions and 209 deletions

View File

@ -50,7 +50,6 @@ void list_alarms(){
// simply populate .alarm with the absolute time, and .function with the method to call.
// on calling .poll.revents will be zero.
int schedule(struct sched_ent *alarm){
long long now=overlay_gettime_ms();
struct sched_ent *node = next_alarm, *last = NULL;
while(node!=NULL){
if (node->alarm > alarm->alarm)

View File

@ -35,9 +35,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MONITOR_DATA_SIZE MAX_AUDIO_BYTES
struct monitor_context {
struct sched_ent alarm;
#define MONITOR_VOMP (1<<0)
#define MONITOR_RHIZOME (1<<1)
#define MONITOR_PEERS (1<<2)
int flags;
char line[MONITOR_LINE_LENGTH];
int line_length;
@ -54,7 +51,6 @@ struct monitor_context {
#define MAX_MONITOR_SOCKETS 8
int monitor_socket_count=0;
struct monitor_context monitor_sockets[MAX_MONITOR_SOCKETS];
long long monitor_last_update_time=0;
int monitor_process_command(struct monitor_context *c);
int monitor_process_data(struct monitor_context *c);
@ -134,34 +130,11 @@ int monitor_setup_sockets()
void monitor_poll(struct sched_ent *alarm)
{
int s,i,m;
int s;
unsigned char buffer[1024];
char msg[1024];
struct sockaddr *ignored_address=(struct sockaddr *)&buffer[0];
socklen_t ignored_length=sizeof(ignored_address);
/* tell all monitor clients about status of all calls periodically */
long long now = overlay_gettime_ms();
if (monitor_last_update_time > (now + 1000)) {
INFO("Fixed run away monitor_last_update_time");
monitor_last_update_time = now + 1000;
}
if (now > (monitor_last_update_time + 1000)) {
// DEBUG("Send keep alives");
monitor_last_update_time = now;
for(i = 0; i < vomp_call_count; i++) {
/* Push out any undelivered status changes */
monitor_call_status(&vomp_call_states[i]);
INFOF("Sending keepalives for call #%d",i);
/* And let far-end know that call is still alive */
snprintf(msg,sizeof(msg) -1,"\nKEEPALIVE:%06x\n", vomp_call_states[i].local.session);
for(m = 0;m < monitor_socket_count; m++)
WRITE_STR(monitor_sockets[m].alarm.poll.fd,msg);
}
}
/* Check for new connections */
/* We don't care about the peer's address */
ignored_length = 0;
@ -285,10 +258,10 @@ static void monitor_new_client(int s) {
#ifdef linux
struct ucred ucred;
socklen_t len;
int res;
#else
gid_t othergid;
#endif
int res;
uid_t otheruid;
struct monitor_context *c;

View File

@ -148,9 +148,6 @@ schedule(&_sched_##X);
/* Periodically update route table. */
SCHEDULE(overlay_route_tick, 100);
/* Keep an eye on VoMP calls so that we can expire stale ones etc */
SCHEDULE(vomp_tick, 1000);
/* Show CPU usage stats periodically */
SCHEDULE(fd_periodicstats, 3000);

View File

@ -1097,6 +1097,12 @@ int overlay_saw_mdp_containing_frame(overlay_frame *f,long long now);
#define DEBUG_PACKETCONSTRUCTION (1 << 23)
#define DEBUG_MANIFESTS (1 << 24)
/* bitmask values for monitor_tell_clients */
#define MONITOR_VOMP (1<<0)
#define MONITOR_RHIZOME (1<<1)
#define MONITOR_PEERS (1<<2)
int serval_packetvisualise(FILE *f,char *message,unsigned char *packet,int plen);
int overlay_broadcast_drop_check(unsigned char *a);
@ -1377,6 +1383,7 @@ typedef struct vomp_sample_block {
#define VOMP_MAX_RECENT_SAMPLES 2
typedef struct vomp_call_state {
struct sched_ent alarm;
vomp_call_half local;
vomp_call_half remote;
int ringing;
@ -1384,7 +1391,6 @@ typedef struct vomp_call_state {
unsigned long long create_time;
unsigned long long last_activity;
unsigned long long audio_clock;
long long next_status_time;
int audio_started;
int last_sent_status;
unsigned char remote_codec_list[256];
@ -1563,7 +1569,6 @@ void rhizome_enqueue_suggestions(struct sched_ent *alarm);
void server_shutdown_check(struct sched_ent *alarm);
void overlay_mdp_poll(struct sched_ent *alarm);
void fd_periodicstats(struct sched_ent *alarm);
void vomp_tick(struct sched_ent *alarm);
void rhizome_check_connections(struct sched_ent *alarm);
void monitor_client_poll(struct sched_ent *alarm);

325
vomp.c
View File

@ -32,7 +32,10 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
int vomp_call_count=0;
int vomp_active_call=-1;
vomp_call_state vomp_call_states[VOMP_MAX_CALLS];
struct profile_total vomp_stats;
int dump_vomp_status();
void vomp_process_tick(struct sched_ent *alarm);
/* which codecs we support (set by registered listener) */
unsigned char vomp_local_codec_list[256];
@ -53,111 +56,119 @@ vomp_call_state *vomp_find_call_by_session(int session_token)
return NULL;
}
int vomp_generate_session_id(){
int session_id=0;
while (!session_id)
{
if (urandombytes((unsigned char *)&session_id,sizeof(int)))
return WHY("Insufficient entropy");
session_id&=VOMP_SESSION_MASK;
DEBUGF("session=0x%08x\n",session_id);
int i;
/* reject duplicate call session numbers */
for(i=0;i<vomp_call_count;i++)
if (session_id==vomp_call_states[i].local.session
||session_id==vomp_call_states[i].local.session){
session_id=0;
break;
}
}
return session_id;
}
vomp_call_state *vomp_create_call(unsigned char *remote_sid,
unsigned char *local_sid,
unsigned int remote_session,
unsigned int local_session,
int remote_state,
int local_state){
if (!local_session)
local_session=vomp_generate_session_id();
vomp_call_state *call = &vomp_call_states[vomp_call_count];
vomp_call_count++;
/* prepare slot */
bzero(call,sizeof(vomp_call_state));
bcopy(local_sid,call->local.sid,SID_SIZE);
bcopy(remote_sid,call->remote.sid,SID_SIZE);
call->local.session=local_session;
call->remote.session=remote_session;
call->local.state=local_state;
call->remote.state=remote_state;
call->create_time=overlay_gettime_ms();
call->last_activity=call->create_time;
call->alarm.alarm = call->create_time+VOMP_CALL_STATUS_INTERVAL;
call->alarm.function = vomp_process_tick;
vomp_stats.name="vomp_process_tick";
call->alarm.stats=&vomp_stats;
schedule(&call->alarm);
WHYF("Returning new call #%d",local_session);
monitor_call_status(call);
return call;
}
vomp_call_state *vomp_find_or_create_call(unsigned char *remote_sid,
unsigned char *local_sid,
unsigned int sender_session,
unsigned int recvr_session,
int sender_state,int recvr_state)
int sender_state,
int recvr_state)
{
int expired_slot=-1;
int i;
vomp_call_state *call;
if (0) printf("%d calls already in progress.\n",vomp_call_count);
for(i=0;i<vomp_call_count;i++)
{
call = &vomp_call_states[i];
/* do the fast comparison first, and only if that matches proceed to
the slower SID comparisons */
if (0)
fprintf(stderr,"asking for %06x:%06x, this call %06x:%06x\n",
sender_session,recvr_session,
vomp_call_states[i].remote.session,
vomp_call_states[i].local.session);
call->remote.session,
call->local.session);
int checked=0;
if (vomp_call_states[i].remote.session&&sender_session) {
if (call->remote.session&&sender_session) {
checked++;
if(sender_session!=vomp_call_states[i].remote.session)
if(sender_session!=call->remote.session)
continue;
}
if (vomp_call_states[i].local.session&&recvr_session) {
if (call->local.session&&recvr_session) {
checked++;
if(recvr_session!=vomp_call_states[i].local.session)
if(recvr_session!=call->local.session)
continue;
}
if (!checked) continue;
if (memcmp(remote_sid,vomp_call_states[i].remote.sid,SID_SIZE)) continue;
if (memcmp(local_sid,vomp_call_states[i].local.sid,SID_SIZE)) continue;
if (memcmp(remote_sid,call->remote.sid,SID_SIZE)) continue;
if (memcmp(local_sid,call->local.sid,SID_SIZE)) continue;
/* it matches. but has it expired (no activity in 120 seconds)?
NOTE: as these time calculations are unsigned, we must add to
the last activity time rather than subtract from the current time
when calculating the timeout.
*/
if (vomp_call_states[i].last_activity+VOMP_CALL_TIMEOUT<(overlay_gettime_ms()))
{
WHYF("slot %d has expired.",i);
WHYF(" last_activity=%lld, now=%lld",
vomp_call_states[i].last_activity,overlay_gettime_ms());
expired_slot=i;
continue;
}
/* it matches. */
/* Record session number if required */
if (!vomp_call_states[i].remote.session)
vomp_call_states[i].remote.session=sender_session;
if (!call->remote.session)
call->remote.session=sender_session;
if (0) {
WHYF("Returning existing call #%d",i);
fprintf(stderr,"%06x:%06x matches call #%d %06x:%06x\n",
sender_session,recvr_session,i,
vomp_call_states[i].remote.session,
vomp_call_states[i].local.session);
call->remote.session,
call->local.session);
}
return &vomp_call_states[i];
return call;
}
/* not in the list. So allocate a slot, but only if the call is in progress. */
if (recvr_state==VOMP_STATE_CALLENDED&&sender_state==VOMP_STATE_CALLENDED)
return NULL;
if (expired_slot>-1) i=expired_slot;
else if ((i<VOMP_MAX_CALLS)&&(i==vomp_call_count)) {
/* there is room to allocate another, so do that */
vomp_call_count++;
} else {
/* no room, either reallocate an existing slot, or fail.
We try to reuse slots that either mark ended calls, or
*/
int candidates[VOMP_MAX_CALLS];
int candidate_count=0;
for(i=0;i<VOMP_MAX_CALLS;i++)
if ((vomp_call_states[i].local.state==VOMP_STATE_NOCALL)||
(vomp_call_states[i].local.state==VOMP_STATE_CALLENDED))
candidates[candidate_count++]=i;
i=candidates[random()%candidate_count];
}
/* prepare slot */
bzero(&vomp_call_states[i],sizeof(vomp_call_state));
bcopy(local_sid,&vomp_call_states[i].local.sid,SID_SIZE);
bcopy(remote_sid,&vomp_call_states[i].remote.sid,SID_SIZE);
if (!recvr_session) {
urandombytes((unsigned char *)&recvr_session,sizeof(int));
recvr_session&=VOMP_SESSION_MASK;
}
if (!sender_session) {
urandombytes((unsigned char *)&sender_session,sizeof(int));
sender_session&=VOMP_SESSION_MASK;
}
vomp_call_states[i].local.session=recvr_session;
vomp_call_states[i].remote.session=sender_session;
vomp_call_states[i].local.state=VOMP_STATE_NOCALL;
vomp_call_states[i].remote.state=VOMP_STATE_NOCALL;
vomp_call_states[i].create_time=overlay_gettime_ms();
vomp_call_states[i].last_activity=vomp_call_states[i].create_time;
WHYF("Returning new call #%d",i);
monitor_call_status(&vomp_call_states[i]);
return &vomp_call_states[i];
return vomp_create_call(remote_sid, local_sid, sender_session, recvr_session, VOMP_STATE_NOCALL, VOMP_STATE_NOCALL);
}
/* send updated call status to end-point and to any interested listeners as
@ -419,20 +430,17 @@ int vomp_call_destroy(vomp_call_state *call)
vomp_send_status(call,VOMP_TELLREMOTE|VOMP_TELLINTERESTED,NULL);
/* now release the call structure */
int i;
for(i=0;i<VOMP_MAX_CALLS;i++)
if (call==&vomp_call_states[i]) break;
if (i>=VOMP_MAX_CALLS) return WHY("supplied call handle looks invalid");
if (i==vomp_call_count-1)
vomp_call_count--;
else
{
bcopy(&vomp_call_states[vomp_call_count-1],
&vomp_call_states[i],
sizeof(vomp_call_state));
vomp_call_count--;
}
int i = (call - vomp_call_states);
unschedule(&call->alarm);
vomp_call_count--;
if (i!=vomp_call_count){
unschedule(&vomp_call_states[vomp_call_count].alarm);
bcopy(&vomp_call_states[vomp_call_count],
call,
sizeof(vomp_call_state));
schedule(&call->alarm);
}
return 0;
}
@ -576,14 +584,12 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
/* and provide a quick summary of all calls in progress */
int i;
for(i=0;i<VOMP_MAX_CALLS;i++)
for(i=0;i<vomp_call_count;i++)
{
if (i<vomp_call_count) {
mdpreply.vompevent.other_calls_sessions[i]
=vomp_call_states[i].local.session;
mdpreply.vompevent.other_calls_states[i]
=vomp_call_states[i].local.state;
}
mdpreply.vompevent.other_calls_sessions[i]
=vomp_call_states[i].local.session;
mdpreply.vompevent.other_calls_states[i]
=vomp_call_states[i].local.state;
}
return overlay_mdp_reply(mdp_named.poll.fd,recvaddr,recvaddrlen,&mdpreply);
@ -601,43 +607,21 @@ int vomp_mdp_event(overlay_mdp_frame *mdp,
return overlay_mdp_reply_error
(mdp_named.poll.fd,recvaddr,recvaddrlen,4004,
"All call slots in use");
int slot=vomp_call_count++;
vomp_call_state *call=&vomp_call_states[slot];
bzero(call,sizeof(vomp_call_state));
bcopy(mdp->vompevent.local_sid,call->local.sid,SID_SIZE);
bcopy(mdp->vompevent.remote_sid,call->remote.sid,SID_SIZE);
bcopy(mdp->vompevent.local_did,call->local.did,64);
bcopy(mdp->vompevent.remote_did,call->remote.did,64);
call->create_time=overlay_gettime_ms();
call->local.state=VOMP_STATE_CALLPREP;
call->remote.state=VOMP_STATE_NOCALL; /* far end has yet to agree that a call is happening */
monitor_call_status(call);
/* allocate unique call session token, which is how the client will
refer to this call during its life */
while (!call->local.session)
{
if (urandombytes((unsigned char *)&call->local.session,sizeof(int)))
return overlay_mdp_reply_error
(mdp_named.poll.fd,recvaddr,recvaddrlen,4005,
"Insufficient entropy");
call->local.session&=VOMP_SESSION_MASK;
DEBUGF("session=0x%08x\n",call->local.session);
int i;
for(i=0;i<vomp_call_count;i++)
if (i!=slot)
if (call->local.session==vomp_call_states[i].local.session) break;
/* reject duplicate call session numbers */
if (i<vomp_call_count) call->local.session=0;
}
call->local.session&=VOMP_SESSION_MASK;
call->last_activity=overlay_gettime_ms();
refer to this call during its life */
vomp_call_state *call=vomp_create_call(
mdp->vompevent.remote_sid,
mdp->vompevent.local_sid,
0,
0,
VOMP_STATE_NOCALL,
VOMP_STATE_CALLPREP
);
/* send status update to remote, thus causing call to be created
(hopefully) at far end. */
vomp_send_status(call,VOMP_TELLREMOTE|VOMP_TELLINTERESTED,NULL);
WHY("sending MDP reply back");
dump("recvaddr",(unsigned char *)recvaddr,recvaddrlen);
int result= overlay_mdp_reply_error
(mdp_named.poll.fd,recvaddr,recvaddrlen,0, "Success");
if (result) WHY("Failed to send MDP reply");
@ -1458,59 +1442,52 @@ int app_vomp_monitor(int argc, const char *const *argv, struct command_line_opti
return overlay_mdp_client_done();
}
void vomp_tick(struct sched_ent *alarm)
{
/* Send any reminder packets for call state, and also process any audio. */
unsigned long long now=overlay_gettime_ms();
int i;
for(i=0;i<vomp_call_count;i++)
{
if (now>vomp_call_states[i].next_status_time)
{
vomp_send_status(&vomp_call_states[i],VOMP_FORCETELLREMOTE,NULL);
vomp_call_states[i].next_status_time=now+VOMP_CALL_STATUS_INTERVAL;
}
/* See if any calls need to begin expiring
(current timeout is set at 2 minutes) */
if (vomp_call_states[i].local.state<VOMP_STATE_INCALL
&&((vomp_call_states[i].create_time+VOMP_CALL_TIMEOUT)<now))
{
/* timeout calls that haven't reached INCALL status, e.g.,
ringing. As well as sensible UX, it also prevents our call
slots getting full of cruft. */
WHYF("Destroying stale call #%d (state %d.%d)",i,
vomp_call_states[i].local.state,vomp_call_states[i].remote.state);
vomp_call_destroy(&vomp_call_states[i]);
/* since this slot will get reclaimed, we need to wind back one in
the iteration of the list of slots */
i--;
} else if (vomp_call_states[i].last_activity+VOMP_CALL_TIMEOUT<now)
switch(vomp_call_states[i].local.state)
{
case VOMP_STATE_INCALL:
/* Timeout while call in progress, so end call.
Keep call structure hanging around for a bit so that we can
synchonrise with the far end if possible. */
vomp_call_states[i].local.state=VOMP_STATE_CALLENDED;
monitor_call_status(&vomp_call_states[i]);
vomp_send_status(&vomp_call_states[i],
VOMP_TELLREMOTE|VOMP_TELLINTERESTED,NULL);
vomp_call_states[i].last_activity=now;
vomp_call_stop_audio(&vomp_call_states[i]);
break;
default:
/* Call timed out while not actually in progress, so just immmediately
tear the call down */
vomp_call_destroy(&vomp_call_states[i]);
/* since this slot will get reclaimed, we need to wind back one in
the iteration of the list of slots */
i--;
break;
}
}
alarm->alarm = overlay_gettime_ms()+1000;
void vomp_process_tick(struct sched_ent *alarm){
char msg[32];
int len;
unsigned long long now = overlay_gettime_ms();
vomp_call_state *call = (vomp_call_state *)alarm;
/* Push out any undelivered status changes */
monitor_call_status(call);
/* tell the other party we are still here */
vomp_send_status(call,VOMP_FORCETELLREMOTE,NULL);
/* tell local monitor clients the call is still alive */
len = snprintf(msg,sizeof(msg) -1,"\nKEEPALIVE:%06x\n", call->local.session);
monitor_tell_clients(msg, len, MONITOR_VOMP);
/* See if any calls need to begin expiring
(current timeout is set at 2 minutes) */
if (call->local.state<VOMP_STATE_INCALL
&&((call->create_time+VOMP_CALL_TIMEOUT)<now))
{
/* timeout calls that haven't reached INCALL status, e.g.,
ringing. As well as sensible UX, it also prevents our call
slots getting full of cruft. */
vomp_call_destroy(call);
return;
} else if (call->last_activity+VOMP_CALL_TIMEOUT<now)
switch(call->local.state)
{
case VOMP_STATE_INCALL:
/* Timeout while call in progress, so end call.
Keep call structure hanging around for a bit so that we can
synchonrise with the far end if possible. */
call->local.state=VOMP_STATE_CALLENDED;
monitor_call_status(call);
vomp_send_status(call, VOMP_TELLREMOTE|VOMP_TELLINTERESTED,NULL);
call->last_activity=now;
vomp_call_stop_audio(call);
break;
default:
/* Call timed out while not actually in progress, so just immmediately
tear the call down */
vomp_call_destroy(call);
return;
}
alarm->alarm = overlay_gettime_ms() + VOMP_CALL_STATUS_INTERVAL;
schedule(alarm);
return;
}