Fix an intermittent server test failure

The quit-on-monitor-client-disconnect test was non-deterministic
depending on load (eg, other concurrently running tests).  Under load,
it was likely that the server did not process the "monitor quit" command
before checking for disconnect, so the server did not terminate.

The fix was to make the monitor interface read and process all queued
input from the client before checking for HUP or ERR condition on the
socket.  With this fix, the "sleep 1" kludges before and after the echo
"monitor quit" to the console command are no longer needed.

In the process the monitor interface code was modernised: eg, now it
calls read_nonblock() instead of read(2).
This commit is contained in:
Andrew Bettison 2016-09-06 12:24:00 +09:30
parent 64286bd5e6
commit 7f0fef2209
4 changed files with 67 additions and 74 deletions

@ -92,7 +92,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MONITOR_PEERS (1<<2)
#define MONITOR_DNAHELPER (1<<3)
#define MONITOR_LINKS (1<<4)
#define MONITOR_QUIT (1<<5)
#define MONITOR_QUIT_ON_DISCONNECT (1<<5)
#define MONITOR_INTERFACE (1<<6)
#define MAX_SIGNATURES 16

@ -143,22 +143,12 @@ int monitor_client_read(int fd, struct monitor_state *res, struct monitor_comman
if (res->bufferBytes==0)
res->cmd = (char *)res->buffer;
ssize_t bytesRead = read(fd, res->buffer + oldOffset, MONITOR_CLIENT_BUFFER_SIZE - oldOffset);
if (bytesRead == -1){
switch(errno) {
case ENOTRECOVERABLE:
/* transient errors */
break;
case EINTR:
case EAGAIN:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return 0;
}
WHYF_perror("read(%d, %p, %zd)", fd, res->buffer + oldOffset, MONITOR_CLIENT_BUFFER_SIZE - oldOffset);
ssize_t bytesRead = read_nonblock(fd, res->buffer + oldOffset, MONITOR_CLIENT_BUFFER_SIZE - oldOffset);
if (bytesRead == -2)
return 0;
if (bytesRead == -1)
return -1;
} else if (bytesRead == 0) {
if (bytesRead == 0) {
WARNF("read(%d, %p, %zd) returned %zd", fd, res->buffer + oldOffset, MONITOR_CLIENT_BUFFER_SIZE - oldOffset, (size_t)bytesRead);
return -1;
}

@ -90,10 +90,11 @@ struct monitor_context {
char line[MONITOR_LINE_LENGTH];
int line_length;
#define MONITOR_STATE_UNUSED 0
#define MONITOR_STATE_COMMAND 1
#define MONITOR_STATE_DATA 2
int state;
enum {
MONITOR_STATE_UNUSED,
MONITOR_STATE_COMMAND,
MONITOR_STATE_DATA
} state;
unsigned char buffer[MONITOR_DATA_SIZE];
int data_expected;
int data_offset;
@ -171,8 +172,8 @@ void monitor_poll(struct sched_ent *alarm)
static void monitor_close(struct monitor_context *c){
INFOF("Tearing down monitor client fd=%d", c->alarm.poll.fd);
if (serverMode && c->flags & MONITOR_QUIT){
INFOF("Quitting due to client disconnecting");
if (serverMode && (c->flags & MONITOR_QUIT_ON_DISCONNECT)){
INFOF("Stopping server due to client disconnecting");
serverMode=SERVER_CLOSING;
}
unwatch(&c->alarm);
@ -188,38 +189,30 @@ void monitor_client_poll(struct sched_ent *alarm)
struct monitor_context *c=(struct monitor_context *)alarm;
errno=0;
int bytes;
if (alarm->poll.revents & POLLIN) {
switch(c->state) {
case MONITOR_STATE_UNUSED:
FATAL("should not poll unused client");
case MONITOR_STATE_COMMAND:
bytes = 1;
while(bytes == 1) {
if (c->line_length >= MONITOR_LINE_LENGTH) {
c->line_length=0;
monitor_write_error(c,"Command too long");
DEBUG(monitor, "close monitor because command too long");
monitor_close(c);
return;
}
bytes = read(c->alarm.poll.fd, &c->line[c->line_length], 1);
if (bytes<0){
switch(errno) {
case EINTR:
case ENOTRECOVERABLE:
/* transient errors */
WHY_perror("read");
break;
case EAGAIN:
break;
default:
WHY_perror("read");
/* all other errors; close socket */
monitor_close(c);
}
bytes = read_nonblock(c->alarm.poll.fd, &c->line[c->line_length], 1);
if (bytes == -1) {
DEBUG(monitor, "close monitor due to read error");
monitor_close(c);
return;
}
if (bytes<1)
break;
if (bytes == -2 || bytes == 0)
continue; // no bytes available to read
// silently skip all \r characters
if (c->line[c->line_length] == '\r')
@ -235,7 +228,7 @@ void monitor_client_poll(struct sched_ent *alarm)
}
if (c->line[c->line_length] == '\n') {
/* got whole command line, start reading data if required */
// got whole command line, start reading data if required
c->line[c->line_length]=0;
c->state=MONITOR_STATE_DATA;
c->data_offset=0;
@ -245,53 +238,49 @@ void monitor_client_poll(struct sched_ent *alarm)
c->line_length += bytes;
}
if (c->state!=MONITOR_STATE_DATA)
// if run out of characters to read before reaching the end of a command, then check for HUP
// now in case the client terminated abnormally
if (c->state != MONITOR_STATE_DATA)
break;
// else fall through
// else fall through...
case MONITOR_STATE_DATA:
if (c->data_expected - c->data_offset >0){
bytes = read(c->alarm.poll.fd,
&c->buffer[c->data_offset],
c->data_expected - c->data_offset);
if (bytes < 1) {
switch(errno) {
case EAGAIN: case EINTR:
/* transient errors */
break;
default:
/* all other errors; close socket */
WHYF("Tearing down monitor client due to errno=%d",
errno);
monitor_close(c);
return;
}
bytes = read_nonblock(c->alarm.poll.fd, &c->buffer[c->data_offset], c->data_expected - c->data_offset);
if (bytes == -2 || bytes == 0)
break; // no bytes available to read
if (bytes == -1) {
DEBUG(monitor, "close monitor due to read error");
monitor_close(c);
return;
}
c->data_offset += bytes;
}
// if run out of characters to read before reaching the expected number, then check for HUP
// now in case the client terminated abnormally
if (c->data_offset < c->data_expected)
break;
/* we have the next command and all of the binary data we were expecting. Now we can process it */
// we have received all of the binary data we were expecting
monitor_process_command(c);
// fall through
default:
// reset parsing state
c->state = MONITOR_STATE_COMMAND;
c->data_expected = 0;
c->data_offset = 0;
c->line_length = 0;
// poll again to finish processing all queued commands before checking for HUP, so that any
// queued "quit" command (quit on HUP) is processed before the HUP is handled
return;
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
DEBUGF(monitor, "client disconnection (%s)", alloca_poll_events(alarm->poll.revents));
monitor_close(c);
}
return;
}
static void monitor_new_client(int s) {
@ -369,7 +358,7 @@ static void monitor_new_client(int s) {
client_stats.name = "monitor_client_poll";
c->alarm.stats=&client_stats;
c->alarm.poll.fd = s;
c->alarm.poll.events=POLLIN;
c->alarm.poll.events = POLLIN | POLLHUP;
c->line_length = 0;
c->state = MONITOR_STATE_COMMAND;
write_str(s,"\nINFO:You are talking to servald\n");
@ -440,7 +429,7 @@ static int monitor_set(const struct cli_parsed *parsed, struct cli_context *cont
c->flags|=MONITOR_LINKS;
enum_subscribers(NULL, monitor_announce_all_peers, NULL);
}else if (strcase_startswith(parsed->args[1],"quit", NULL)){
c->flags|=MONITOR_QUIT;
c->flags|=MONITOR_QUIT_ON_DISCONNECT;
}else if (strcase_startswith(parsed->args[1],"interface", NULL)){
c->flags|=MONITOR_INTERFACE;
overlay_interface_monitor_up();
@ -469,7 +458,7 @@ static int monitor_clear(const struct cli_parsed *parsed, struct cli_context *co
else if (strcase_startswith(parsed->args[1],"links", NULL))
c->flags&=~MONITOR_LINKS;
else if (strcase_startswith(parsed->args[1],"quit", NULL))
c->flags&=~MONITOR_QUIT;
c->flags&=~MONITOR_QUIT_ON_DISCONNECT;
else if (strcase_startswith(parsed->args[1],"interface", NULL))
c->flags&=~MONITOR_INTERFACE;
else
@ -664,7 +653,7 @@ int monitor_tell_clients(char *msg, int msglen, int mask)
if (monitor_sockets[i].flags & mask) {
// DEBUG("Writing AUDIOPACKET to client");
if ( write_all_nonblock(monitor_sockets[i].alarm.poll.fd, msg, msglen) == -1) {
INFOF("Tearing down monitor client #%d", i);
INFOF("Tear down monitor client #%d due to write error", i);
monitor_close(&monitor_sockets[i]);
}
}

@ -33,13 +33,19 @@ teardown() {
report_all_servald_servers
}
# make sure servald config is blank
configure_servald_server() {
:
executeOk_servald config \
set log.console.level debug \
set log.console.show_time true \
set log.console.show_pid true
}
doc_StartCreateInstanceDir="Starting server creates instance directory"
setup_StartCreateInstanceDir() {
# Ensure there is initially no instance directory.
configure_servald_server() {
:
}
setup
assert [ ! -d "$SERVALINSTANCE_PATH" ]
}
@ -145,11 +151,18 @@ test_StartTwice() {
doc_MonitorQuit="Server stops due to monitor client disconnection"
setup_MonitorQuit() {
configure_servald_server() {
executeOk_servald config \
set log.console.level debug \
set log.console.show_time true \
set log.console.show_pid true \
set debug.monitor on
}
setup
start_servald_server
}
test_MonitorQuit() {
executeOk_servald console < <(sleep 1 && echo "monitor quit" && sleep 1)
executeOk_servald console < <(echo "monitor quit")
tfw_cat --stdout --stderr
wait_until ! kill -0 $servald_pid 2>/dev/null
}
@ -191,6 +204,7 @@ EOF
setup
executeOk_servald config \
set log.console.level debug \
set log.console.show_time true \
set log.console.show_pid true \
set debug.watchdog on \
set server.watchdog.executable "$PWD/watchdog" \