serval-dna/monitor-client.c
Andrew Bettison 7f0fef2209 Fix an intermittent server test failure
The quit-on-monitor-client-disconnect test was non-deterministic
depending on load (eg, other concurrently running tests).  Under load,
it was likely that the server did not process the "monitor quit" command
before checking for disconnect, so the server did not terminate.

The fix was to make the monitor interface read and process all queued
input from the client before checking for HUP or ERR condition on the
socket.  With this fix, the "sleep 1" kludges before and after the echo
"monitor quit" to the console command are no longer needed.

In the process the monitor interface code was modernised: eg, now it
calls read_nonblock() instead of read(2).
2016-09-06 12:33:29 +09:30

257 lines
6.4 KiB
C

/*
Copyright (C) 2012 Paul Gardner-Stephen
Copyright (C) 2012 Serval Project Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <stdarg.h>
#include <signal.h>
#include <sys/types.h>
#ifdef WIN32
#include "win32/win32.h"
#endif
#include <unistd.h>
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#include <sys/un.h>
#include <fcntl.h>
#include <ctype.h>
#include "constants.h"
#include "conf.h"
#include "log.h"
#include "debug.h"
#include "str.h"
#include "strbuf_helpers.h"
#include "socket.h"
#include "monitor-client.h"
#define STATE_INIT 0
#define STATE_DATA 1
#define STATE_READY 2
#define MONITOR_CLIENT_BUFFER_SIZE 8192
#define MAX_ARGS 32
struct monitor_state {
char *cmd;
int argc;
char *argv[MAX_ARGS];
unsigned char *data;
size_t dataBytes;
size_t cmdBytes;
int state;
unsigned char buffer[MONITOR_CLIENT_BUFFER_SIZE];
size_t bufferBytes;
};
/* Open monitor interface abstract domain named socket */
int monitor_client_open(struct monitor_state **res)
{
int fd;
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
return WHYF_perror("socket(AF_UNIX, SOCK_STREAM, 0)");
struct socket_address addr;
if (make_local_sockaddr(&addr, "monitor.socket") == -1)
return -1;
DEBUGF(monitor, "Attempting to connect to %s", alloca_socket_address(&addr));
if (socket_connect(fd, &addr) == -1) {
close(fd);
return -1;
}
*res = (struct monitor_state*)malloc(sizeof(struct monitor_state));
memset(*res,0,sizeof(struct monitor_state));
return fd;
}
int monitor_client_close(int fd, struct monitor_state *res){
free(res);
close(fd);
DEBUGF(monitor, "Closed fd %d", fd);
return 0;
}
int monitor_client_writeline(int fd,char *fmt, ...)
{
char msg[512];
int n;
va_list ap;
if (fd<0)
return -1;
va_start(ap, fmt);
n=vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
if (IF_DEBUG(monitor))
dump("{monitor} Writing to monitor", msg, n);
return write(fd,msg,n);
}
int monitor_client_writeline_and_data(int fd,unsigned char *data,int bytes,char *fmt,...)
{
int maxlen=512+bytes;
char out[maxlen];
va_list ap;
int n;
if (fd<0)
return -1;
n=snprintf(out,maxlen-bytes,"*%d:",bytes);
va_start(ap, fmt);
n+=vsnprintf(out+n, maxlen-bytes-n, fmt, ap);
va_end(ap);
bcopy(data,out+n,bytes);
n+=bytes;
if (IF_DEBUG(monitor))
dump("{monitor} Writing to monitor", out, n);
return write(fd,out,n);
}
int monitor_client_read(int fd, struct monitor_state *res, struct monitor_command_handler *handlers, int handler_count)
{
/* Read any available bytes */
size_t oldOffset = res->bufferBytes;
if (oldOffset+1>=MONITOR_CLIENT_BUFFER_SIZE)
return WHY("Buffer full without finding command");
if (res->bufferBytes==0)
res->cmd = (char *)res->buffer;
ssize_t bytesRead = read_nonblock(fd, res->buffer + oldOffset, MONITOR_CLIENT_BUFFER_SIZE - oldOffset);
if (bytesRead == -2)
return 0;
if (bytesRead == -1)
return -1;
if (bytesRead == 0) {
WARNF("read(%d, %p, %zd) returned %zd", fd, res->buffer + oldOffset, MONITOR_CLIENT_BUFFER_SIZE - oldOffset, (size_t)bytesRead);
return -1;
}
if (IF_DEBUG(monitor))
dump("{monitor} Read from monitor", res->buffer + oldOffset, bytesRead);
res->bufferBytes+=bytesRead;
again:
// wait until we have the whole command line
if (res->state == STATE_INIT){
size_t i;
for(i=oldOffset;i<res->bufferBytes;i++){
if (res->buffer[i]=='\n'){
// skip any leading \n's
if ((char*)(res->buffer+i) == res->cmd){
res->cmd++;
continue;
}
res->buffer[i]=0;
res->dataBytes = 0;
res->cmdBytes = i + 1;
if (*res->cmd=='*'){
res->cmd++;
for (; isdigit(*res->cmd); ++res->cmd)
res->dataBytes = res->dataBytes * 10 + *res->cmd - '0';
if (res->dataBytes > MONITOR_CLIENT_BUFFER_SIZE)
return WHYF("Invalid data length %zd", res->dataBytes);
if (*res->cmd==':')
res->cmd++;
}
// find all arguments, initialise argc / argv && null terminate strings
{
char *p=res->cmd;
res->argc=0;
while (*p && res->argc<MAX_ARGS){
if (*p==':'){
*p=0;
res->argv[res->argc]=p+1;
res->argc++;
}
p++;
}
}
if (res->dataBytes){
res->data=(unsigned char *)&res->buffer[i+1];
res->state = STATE_DATA;
}else{
res->data=NULL;
res->state = STATE_READY;
}
break;
}
}
}
// make sure all the data has arrived
if (res->state == STATE_DATA){
if (res->bufferBytes >= res->dataBytes + res->cmdBytes){
res->state = STATE_READY;
}
}
// ok, now we can try to process the command
if (res->state == STATE_READY){
int handled=0;
int i;
// call all handlers that match (yes there might be more than one)
for (i=0;i<handler_count;i++){
/* since we know res->cmd is terminated with a '\0',
and there shouldn't be a '\n' in h->command,
this shouldn't run past the end of the buffer */
if (handlers[i].handler && (!handlers[i].command || strcase_startswith(res->cmd,handlers[i].command, NULL))){
if (handlers[i].handler(res->cmd, res->argc, res->argv, res->data, res->dataBytes, handlers[i].context)>0)
handled=1;
}
}
if (!handled){
INFOF("Event \"%s\" was not handled", res->cmd);
}
// shuffle any unprocessed bytes
int remaining = res->bufferBytes - (res->dataBytes + res->cmdBytes);
if (remaining>0){
bcopy(res->buffer+res->dataBytes + res->cmdBytes,res->buffer,remaining);
}
res->bufferBytes=remaining;
res->cmdBytes=0;
res->dataBytes=0;
res->state = STATE_INIT;
res->cmd = (char *)res->buffer;
oldOffset = 0;
goto again;
}
if (res->bufferBytes >= MONITOR_CLIENT_BUFFER_SIZE)
return WHY("Buffer full");
return 0;
}