fixed bug with cancelling calls due to lack of listeners.

worked around poll() with accept() unix domain socket linux bug.
generally hooked in the unix domain stream socket interface for
monitoring calls (and later rhizome).
This commit is contained in:
gardners 2012-05-03 03:37:03 +09:30
parent eb8ffeb71f
commit 25f81aeaaf
4 changed files with 96 additions and 11 deletions

View File

@ -87,9 +87,6 @@ int monitor_setup_sockets()
int res = setsockopt(monitor_named_socket, SOL_SOCKET, SO_RCVBUF,
&send_buffer_size, sizeof(send_buffer_size));
if (res) WHYF("setsockopt() failed: errno=%d",errno);
res = fcntl(monitor_named_socket,F_SETFL, O_NONBLOCK);
if (res) WHYF("fcntl(NONBLOCK) on monitor accept socket failed: errno=%d",
errno);
}
}
@ -102,10 +99,15 @@ int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
/* Make sure sockets are open */
monitor_setup_sockets();
#ifdef NOTDEFINED
/* This block should work, but in reality it doesn't.
poll() on linux is ALWAYS claiming that accept() can be
run. So we just have to check it whenever some other fd triggers
poll to break, which fortunately is fairly often. */
if ((*fdcount)>=fdmax) return -1;
if (monitor_named_socket>-1)
{
if (debug&DEBUG_IO) {
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO)) {
fprintf(stderr,
"Monitor named unix domain socket is poll() slot #%d (fd %d)\n",
*fdcount,monitor_named_socket);
@ -114,11 +116,12 @@ int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax)
fds[*fdcount].events=POLLIN;
(*fdcount)++;
}
#endif
int i;
for(i=0;i<monitor_socket_count;i++) {
if ((*fdcount)>=fdmax) return -1;
if (debug&DEBUG_IO) {
if (debug&(DEBUG_IO|DEBUG_VERBOSE_IO)) {
fprintf(stderr,
"Monitor named unix domain client socket is poll() slot #%d (fd %d)\n",
*fdcount,monitor_sockets[i].socket);
@ -147,6 +150,8 @@ int monitor_poll()
}
/* Check for new connections */
fcntl(monitor_named_socket,F_SETFL,
fcntl(monitor_named_socket, F_GETFL, NULL)|O_NONBLOCK);
while((s=accept(monitor_named_socket,&ignored_address,&ignored_length))>-1) {
int res = fcntl(s,F_SETFL, O_NONBLOCK);
if (res) close(s);
@ -161,6 +166,8 @@ int monitor_poll()
write(s,"MONITOR:You are talking to servald\n",
strlen("MONITOR:You are talking to servald\n"));
}
fcntl(monitor_named_socket,F_SETFL,
fcntl(monitor_named_socket, F_GETFL, NULL)&(~O_NONBLOCK));
ignored_length=sizeof(ignored_address);
}
@ -291,12 +298,12 @@ int monitor_process_command(int index,char *cmd)
stowSid(&mdp.vompevent.remote_sid[0],0,sid);
vomp_mdp_event(&mdp,NULL,0);
}
else if (sscanf(cmd,"PICKUP:%x",callSessionToken)==1) {
else if (sscanf(cmd,"PICKUP:%x",&callSessionToken)==1) {
mdp.vompevent.flags=VOMPEVENT_PICKUP;
mdp.vompevent.call_session_token=callSessionToken;
vomp_mdp_event(&mdp,NULL,0);
}
else if (sscanf(cmd,"HANGUP:%x",callSessionToken)==1) {
else if (sscanf(cmd,"HANGUP:%x",&callSessionToken)==1) {
mdp.vompevent.flags=VOMPEVENT_HANGUP;
mdp.vompevent.call_session_token=callSessionToken;
vomp_mdp_event(&mdp,NULL,0);
@ -339,8 +346,33 @@ int monitor_process_data(int index)
int monitor_call_status(vomp_call_state *call)
{
int i;
WHYF("Tell call monitor about call %06x:%06x",
call->local.session,call->remote.session);
char msg[1024];
snprintf(msg,1024,"CALLSTATUS:%06x:%06x:%d:%d\n",
call->local.session,call->remote.session,
call->local.state,call->remote.state);
for(i=0;i<monitor_socket_count;i++)
{
nextInSameSlot:
errno=0;
write(monitor_sockets[i].socket,msg,strlen(msg));
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
close(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
bcopy(&monitor_sockets[monitor_socket_count-1],
&monitor_sockets[i],
sizeof(struct monitor_context));
monitor_socket_count--;
goto nextInSameSlot;
}
}
}
return 0;
}
@ -348,5 +380,44 @@ int monitor_send_audio(vomp_call_state *call,overlay_mdp_frame *audio)
{
WHYF("Tell call monitor about audio for call %06x:%06x",
call->local.session,call->remote.session);
int sample_bytes=vomp_sample_size(audio->vompevent.audio_sample_codec);
unsigned char msg[1024+MAX_AUDIO_BYTES];
snprintf((char *)msg,1024,
"AUDIOPACKET:%06x:%06x:%d:%d:%d:%d:%lld:%lld\n",
call->local.session,call->remote.session,
call->local.state,call->remote.state,
audio->vompevent.audio_sample_codec,
sample_bytes,
audio->vompevent.audio_sample_starttime,
audio->vompevent.audio_sample_endtime);
int msglen=strlen((char *)msg);
bcopy(&audio->vompevent.audio_bytes[0],
&msg[msglen],sample_bytes);
msglen+=sample_bytes;
int i;
for(i=0;i<monitor_socket_count;i++)
{
nextInSameSlot:
errno=0;
write(monitor_sockets[i].socket,msg,msglen);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
close(monitor_sockets[i].socket);
if (i==monitor_socket_count-1) {
monitor_socket_count--;
continue;
} else {
bcopy(&monitor_sockets[monitor_socket_count-1],
&monitor_sockets[i],
sizeof(struct monitor_context));
monitor_socket_count--;
goto nextInSameSlot;
}
}
}
return 0;
}

View File

@ -170,6 +170,14 @@ int overlayServerMode()
if (debug&DEBUG_VERBOSE_IO)
fprintf(stderr,"Waiting via poll() for up to %lldms\n",ms);
int r=poll(fds,fdcount,ms);
if (debug&DEBUG_VERBOSE_IO) {
fprintf(stderr,"poll() says %d file descriptors are ready\n",r);
int i;
for(i=0;i<fdcount;i++)
if (fds[i].revents)
fprintf(stderr,"fd #%d is ready (0x%x)\n",
fds[i].fd,fds[i].revents);
}
/* Do high-priority audio handling first */
vomp_tick();

View File

@ -1352,6 +1352,7 @@ int vomp_tick();
int vomp_tick_interval();
int vomp_sample_size(int c);
int vomp_codec_timespan(int c);
int vomp_send_status(vomp_call_state *call,int flags,overlay_mdp_frame *arg);
typedef struct command_line_option {
int (*function)(int argc, const char *const *argv, struct command_line_option *o);

9
vomp.c
View File

@ -745,7 +745,7 @@ int vomp_mdp_received(overlay_mdp_frame *mdp)
/* could not allocate a call slot, so do nothing */
return WHY("No free call slots");
}
if (0) {
if (1) {
WHYF("Far end is in state %s",vomp_describe_state(sender_state));
WHYF("I am in state %s",vomp_describe_state(call->local.state));
}
@ -770,8 +770,13 @@ int vomp_mdp_received(overlay_mdp_frame *mdp)
{
/* No registered listener, so we cannot answer the call, so just reject
it. */
WHYF("Rejecting call due to lack of a listener: states=%d,%d",
call->local.state,call->remote.state);
call->local.state=VOMP_STATE_CALLENDED;
return vomp_send_status(call,VOMP_TELLREMOTE,NULL);
if (call->remote.state<VOMP_STATE_CALLENDED)
vomp_send_status(call,VOMP_TELLREMOTE,NULL);
/* now let the state machine progress to destroy the call */
}
/* Consider states: our actual state, sender state, what the sender thinks