From c0c541293178b95af69034b604415e3e1ccd27e1 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 21 May 2014 17:10:00 +0930 Subject: [PATCH] Add support for STOPping a connection before shutdown --- doc/Mesh-Stream-Protocol.md | 35 +++++---- fdqueue.c | 2 +- msp_client.c | 143 +++++++++++++++++++++++++----------- msp_client.h | 3 +- msp_proxy.c | 121 ++++++++++++++++++++++-------- tests/msp | 92 ++++++++++++++++++----- 6 files changed, 287 insertions(+), 109 deletions(-) diff --git a/doc/Mesh-Stream-Protocol.md b/doc/Mesh-Stream-Protocol.md index 24a17b4c..78f48ebc 100644 --- a/doc/Mesh-Stream-Protocol.md +++ b/doc/Mesh-Stream-Protocol.md @@ -36,7 +36,7 @@ the other. In other words, an MSP message stream is a stream of bytes that preserves write boundaries as read boundaries. Any application can easily use an MSP connection as a simple ordered byte stream, like [TCP][], by ignoring incoming message boundaries and buffering all input and output. (In future, an -MSP *buffered* mode may be provided to facilitate this.) +MSP *buffered* mode may be provided to facilitate and optimise this usage.) [MSP][] is built on the [Mesh Datagram Protocol][MDP] which carries packets unreliably between the two end points. MSP uses a combination of [sliding @@ -320,11 +320,11 @@ handler function one last time with the CLOSED flag set. The `msp_processing()` function will never close a listening socket automatically; that must be done explicitly by the application. -An application may close any open socket at any time by calling `msp_close()`. -This will cause `msp_processing()` to stop both directions of an open data -socket without alerting the remote end point, discard all queued messages -locally, and invoke the socket's handler function one last time with the CLOSED -flag set. +An application may close any open socket at any time by calling `msp_stop()`. +This will cause `msp_processing()` to stop the flow of data in both directions, +discard all queued messages and alert the remote end point to do the same. +The socket's handler function will be one last time with the CLOSED and STOPPED +flags set. #### Finalisation @@ -632,7 +632,7 @@ size_t handler_function(MSP_SOCKET sock, // Connection is no longer working and cannot be recovered. Do not // release resources here; that will be done in the MSP_STATE_CLOSED case // below. - msp_close(sock); + msp_stop(sock); } if (payload && len) { // Process incoming message and return the number of bytes processed. @@ -841,21 +841,23 @@ When the remote party shuts down the socket at its end and all remaining data has been transferred, including the *shutdown* packet from the remote end, the socket will close automatically during `msp_processing()`. -#### `msp_close()` - Close a single MSP connection +#### `msp_stop()` - Close a single MSP connection - void msp_close(MSP_SOCKET sock); + void msp_stop(MSP_SOCKET sock); Marks the given socket as closed. The socket is not actually cleaned up until the next call to `msp_processing()`. If called from within a handler function, the close takes effect as soon as the function returns. The next call to `msp_processing()` will immediately terminate all i/o activity -for the socket without negotiating with or notifying the remote end, will +for the socket sending a notification to the remote end to do the same, will discard all locally queued incoming and outgoing messages, and will make the -final invocation to the socket's handler function with the CLOSED flag set. -The remote end will have to rely on its MSP timeout logic to detect that the -MSP connection is finished. The effect is as though the local end point had -lost contact with the remote end with no warning. +final invocation to the socket's handler function with the CLOSED and STOPPED +flags set. + +The remote end may miss the initial STOP notification due to packet loss, +for this reason, msp_recv will also send a STOP notification in response to +any connection it doesn't recognise. #### `msp_close_all()` - Close all MSP connections on a given MDP socket @@ -866,6 +868,11 @@ socket. This function is intended to be used after an application's main loop has terminated, and just before the application itself terminates, so it does not require any subsequent call to `mdp_processing()`. +No notification will be sent to any remote parties about the state of any +connected MSP sockets. The remote end will have to rely on its MSP timeout +logic to detect that the MSP connection is finished. The effect is as +though the local end point had lost contact with the remote end with no warning. + Calling `msp_close_all()` from within a handler function will have *undefined results*. diff --git a/fdqueue.c b/fdqueue.c index 45bb239e..d7481a6a 100644 --- a/fdqueue.c +++ b/fdqueue.c @@ -363,7 +363,7 @@ int fd_poll() /* If file descriptors are ready, then call the appropriate functions */ if (r>0) { for(i=fdcount -1;i>=0;i--){ - if (fds[i].revents) { + if (fd_callbacks[i] && fd_callbacks[i]->poll.fd == fds[i].fd && fds[i].revents) { // if any handles have POLLIN set, don't process any other handles if (!(fds[i].revents&POLLIN || in_count==0)) continue; diff --git a/msp_client.c b/msp_client.c index dcb6708d..bdc89762 100644 --- a/msp_client.c +++ b/msp_client.c @@ -30,6 +30,8 @@ #define FLAG_SHUTDOWN (1<<0) #define FLAG_ACK (1<<1) +#define FLAG_FIRST (1<<2) +#define FLAG_STOP (1<<3) #define RETRANSMIT_TIME 1500 struct msp_packet{ @@ -59,6 +61,7 @@ struct msp_sock{ unsigned salt; int mdp_sock; msp_state_t state; + msp_state_t last_state; struct msp_window tx; struct msp_window rx; uint16_t previous_ack; @@ -117,6 +120,7 @@ MSP_SOCKET msp_socket(int mdp_sock, int flags) sock->salt = salt_counter; sock->mdp_sock = mdp_sock; sock->state = MSP_STATE_UNINITIALISED; + sock->last_state = 0xFFFF; // TODO set base rtt to ensure that we send the first packet a few times before giving up sock->tx.base_rtt = sock->tx.rtt = 0xFFFFFFFF; sock->tx.last_activity = TIME_MS_NEVER_HAS; @@ -220,6 +224,8 @@ static void free_all_packets(struct msp_window *window) free((void *)free_me->payload); free(free_me); } + window->_head = NULL; + window->packet_count=0; } static void free_acked_packets(struct msp_window *window, uint16_t seq) @@ -267,24 +273,33 @@ static void msp_free(struct msp_sock *sock) if (sock->_next) sock->_next->_prev = sock->_prev; - // last chance to free other resources - if (sock->handler) { - size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context); - assert(nconsumed == 0); - } - free_all_packets(&sock->tx); free_all_packets(&sock->rx); + // one last chance for clients to free other resources + if (sock->handler && sock->last_state != sock->state) { + size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context); + assert(nconsumed == 0); + } sock->salt = SALT_INVALID; // invalidate all handles that point here free(sock); } -void msp_close(MSP_SOCKET handle) +void msp_stop(MSP_SOCKET handle) { struct msp_sock *sock = handle_to_sock(&handle); - // TODO if never sent / received, just free it - sock->state |= MSP_STATE_CLOSED; + if (sock->state & MSP_STATE_STOPPED) + return; + sock->state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED; + // if this a connectable socket, send a stop packet + if (sock->header.remote.port && !(sock->state & MSP_STATE_LISTENING)){ + uint8_t response = FLAG_STOP; + // we don't have a matching socket, reply with STOP flag to force breaking the connection + // TODO global rate limit? + mdp_send(sock->mdp_sock, &sock->header, &response, 1); + if (config.debug.msp) + DEBUGF("Sending STOP packet"); + } } void msp_close_all(int mdp_sock) @@ -424,6 +439,9 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet) // only set the ack flag if we've received a sequenced packet if (sock->state & MSP_STATE_RECEIVED_DATA) msp_header[0]|=FLAG_ACK; + // never received anything? set the connect flag + if (!(sock->state & MSP_STATE_RECEIVED_PACKET)) + msp_header[0]|=FLAG_FIRST; write_uint16(&msp_header[1], sock->rx.next_seq); write_uint16(&msp_header[3], packet->seq); @@ -459,7 +477,7 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet) return -1; } if (config.debug.msp) - DEBUGF("Sent packet seq %02x len %zd (acked %02x)", packet->seq, packet->len, sock->rx.next_seq); + DEBUGF("Sent packet flags %02x seq %02x len %zd (acked %02x)", msp_header[0], packet->seq, packet->len, sock->rx.next_seq); sock->tx.last_activity = packet->sent = gettime_ms(); sock->next_ack = packet->sent + RETRANSMIT_TIME; return 0; @@ -480,6 +498,10 @@ static int send_ack(struct msp_sock *sock) // (but we can indicate the existence of the connection) if (sock->state & MSP_STATE_RECEIVED_DATA) msp_header[0]|=FLAG_ACK; + + // never received anything? set the connect flag + if (!(sock->state & MSP_STATE_RECEIVED_PACKET)) + msp_header[0]|=FLAG_FIRST; write_uint16(&msp_header[1], sock->rx.next_seq); @@ -504,7 +526,7 @@ static int send_ack(struct msp_sock *sock) return -1; } if (config.debug.msp) - DEBUGF("Sent packet (acked %02x)", sock->rx.next_seq); + DEBUGF("Sent packet flags %02x (acked %02x)", msp_header[0], sock->rx.next_seq); sock->previous_ack = sock->rx.next_seq; sock->tx.last_activity = gettime_ms(); sock->next_ack = sock->tx.last_activity + RETRANSMIT_TIME; @@ -519,9 +541,8 @@ ssize_t msp_send(MSP_SOCKET handle, const uint8_t *payload, size_t len) assert(sock->header.remote.port); assert((sock->state & MSP_STATE_SHUTDOWN_LOCAL)==0); - if (sock->tx.packet_count > MAX_WINDOW_SIZE) + if ((sock->state & MSP_STATE_CLOSED) || sock->tx.packet_count > MAX_WINDOW_SIZE) return -1; - if (add_packet(&sock->tx, sock->tx.next_seq, 0, payload, len)==-1) return -1; @@ -559,9 +580,8 @@ static int process_sock(struct msp_sock *sock) time_ms_t now = gettime_ms(); if (sock->timeout < now){ - WHY("MSP socket timed out"); sock->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR); - return -1; + return WHY("MSP socket timed out"); } sock->next_action = sock->timeout; @@ -588,8 +608,10 @@ static int process_sock(struct msp_sock *sock) else assert(packet->offset == 0); size_t len = packet->len - packet->offset; - size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, packet->payload, len, sock->context); + msp_state_t state = sock->state; + size_t nconsumed = sock->handler(sock_to_handle(sock), state, packet->payload, len, sock->context); assert(nconsumed <= len); + sock->last_state = state; // stop calling the handler if nothing was consumed if (nconsumed == 0 && len != 0) break; @@ -599,12 +621,19 @@ static int process_sock(struct msp_sock *sock) continue; assert(packet->offset == packet->len); } - + // no handler? just discard any data p=p->_next; sock->rx.next_seq++; } free_acked_packets(&sock->rx, sock->rx.next_seq -1); + if (sock->handler && sock->last_state != sock->state){ + msp_state_t state = sock->state; + size_t nconsumed = sock->handler(sock_to_handle(sock), state, NULL, 0, sock->context); + assert(nconsumed == 0); + sock->last_state = state; + } + // transmit packets that can now be sent p = sock->tx._head; while(p){ @@ -642,9 +671,11 @@ static int process_sock(struct msp_sock *sock) if (sock->next_action > sock->next_ack) sock->next_action = sock->next_ack; + // when we've delivered all local packets + // and all our data packets have been acked, close. if ( (sock->state & MSP_STATE_SHUTDOWN_LOCAL) && (sock->state & MSP_STATE_SHUTDOWN_REMOTE) - && sock->tx.packet_count == 0 + && sock->tx.packet_count == 0 && sock->rx.packet_count == 0 && sock->previous_ack == sock->rx.next_seq ){ @@ -660,18 +691,17 @@ int msp_processing(time_ms_t *next_action) time_ms_t next=TIME_MS_NEVER_WILL; struct msp_sock *sock = root; while(sock){ - if (!(sock->state & MSP_STATE_CLOSED)) { - // this might cause the socket to be closed - // remember the time of the next thing we need to do. - if (process_sock(sock)==0 && sock->next_action < next) - next=sock->next_action; - }else if (sock->next_action < next) - next=sock->next_action; + // this might cause the socket to be closed + // remember the time of the next thing we need to do. + process_sock(sock); + if (sock->state & MSP_STATE_CLOSED){ struct msp_sock *s = sock->_next; msp_free(sock); sock=s; }else{ + if (sock->next_action < next) + next=sock->next_action; sock = sock->_next; } } @@ -681,13 +711,15 @@ int msp_processing(time_ms_t *next_action) static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t *payload, size_t len) { - // any kind of error reported by the daemon, close all related msp connections + // any kind of error reported by the daemon, close all related msp connections on this mdp socket if (header->flags & MDP_FLAG_ERROR){ WHY("Error returned from daemon"); msp_close_all(mdp_sock); return -1; } - + + uint8_t flags=0; + // find or create mdp_sock... struct msp_sock *sock=NULL; { @@ -702,9 +734,10 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t s->header.flags &= ~MDP_FLAG_BIND; if (config.debug.msp) DEBUGF("Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port); - s->next_action = gettime_ms(); if (s->state & MSP_STATE_LISTENING) - s->timeout = TIME_MS_NEVER_WILL; + s->next_action = s->timeout = TIME_MS_NEVER_WILL; + else + s->next_action = gettime_ms(); return 0; } @@ -723,9 +756,20 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t } s = s->_next; } - - if (listen && !sock){ - // create a new socket for the incoming connection + + if (len<1) + return WHY("Expected at least 1 byte"); + flags = payload[0]; + + // ignore any stop packet if we have no matching connection + if (!sock && flags & FLAG_STOP){ + if (config.debug.msp) + DEBUGF("Ignoring STOP packet, no matching connection"); + return 0; + } + + if (listen && (flags&FLAG_FIRST) && !sock){ + // create a new socket for incoming connections MSP_SOCKET handle = msp_socket(listen->mdp_sock, 0); sock = handle.ptr; if (sock) { @@ -737,19 +781,29 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t } if (!sock){ - WARNF("Unexpected packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port); - // TODO reply with shutdown ack to forcefully break the connection? + uint8_t response = FLAG_STOP; + // we don't have a matching socket, reply with STOP flag to force breaking the connection + // TODO global rate limit? + mdp_send(mdp_sock, header, &response, 1); + if (config.debug.msp) + DEBUGF("Replying to unexpected packet with STOP packet"); return 0; } } - if (len<3) - return WHY("Expected at least 3 bytes"); - sock->rx.last_activity = gettime_ms(); sock->timeout = sock->rx.last_activity + 10000; + sock->state |= MSP_STATE_RECEIVED_PACKET; - uint8_t flags = payload[0]; + if (flags & FLAG_STOP){ + if (config.debug.msp) + DEBUGF("Closing socket due to STOP packet"); + msp_stop(sock_to_handle(sock)); + return 0; + } + + if (len<3) + return 0; if (flags & FLAG_ACK){ uint16_t ack_seq = read_uint16(&payload[1]); @@ -759,18 +813,21 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t // TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet } + // we might have space for more data now if (sock->tx.packet_count < MAX_WINDOW_SIZE - && !(sock->state & MSP_STATE_DATAOUT) && !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)){ sock->state|=MSP_STATE_DATAOUT; - if (sock->handler) - sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context); + // TODO Why isn't the handler getting called in the next msp_processing??? + if (sock->handler && sock->last_state != sock->state){ + msp_state_t state = sock->state; + ssize_t nconsumed = sock->handler(sock_to_handle(sock), state, NULL, 0, sock->context); + assert(nconsumed == 0); + sock->last_state = state; + } } // make sure we attempt to process packets from this sock soon // TODO calculate based on congestion window - sock->state |= MSP_STATE_RECEIVED_PACKET; - sock->next_action = gettime_ms(); if (lenin->position = conn->out->position = 0; conn->in->limit = conn->out->limit = 0; conn->in->capacity = conn->out->capacity = 1024; + if (connections) + connections->_prev = conn; + conn->_next = connections; + connections = conn; return conn; } @@ -145,27 +153,37 @@ static void free_connection(struct connection *conn) return; if (!msp_socket_is_closed(conn->sock)){ msp_set_handler(conn->sock, msp_handler, NULL); - msp_close(conn->sock); + msp_stop(conn->sock); } - if (is_watching(&conn->alarm_in)) - unwatch(&conn->alarm_in); - if (is_watching(&conn->alarm_out)) - unwatch(&conn->alarm_out); + if (conn->in) free(conn->in); if (conn->out) free(conn->out); + conn->in=NULL; + conn->out=NULL; + + if (is_watching(&conn->alarm_in)) + unwatch(&conn->alarm_in); + if (is_watching(&conn->alarm_out)) + unwatch(&conn->alarm_out); + if (conn->alarm_in.poll.fd!=-1) close(conn->alarm_in.poll.fd); if (conn->alarm_out.poll.fd!=-1 && conn->alarm_out.poll.fd != conn->alarm_in.poll.fd) close(conn->alarm_out.poll.fd); - conn->in=NULL; - conn->out=NULL; conn->alarm_in.poll.fd=-1; conn->alarm_out.poll.fd=-1; + + if (conn->_next) + conn->_next->_prev = conn->_prev; + if (conn->_prev) + conn->_prev->_next = conn->_next; + if (conn==connections) + connections = conn->_next; free(conn); - if (msp_socket_count()==0 && is_watching(&mdp_sock)) + if (!connections && !msp_socket_is_listening(listener)) unwatch(&mdp_sock); } @@ -180,8 +198,10 @@ static void process_msp_asap() static void remote_shutdown(struct connection *conn) { struct mdp_sockaddr remote; - if (shutdown(conn->alarm_out.poll.fd, SHUT_WR)) - WARNF_perror("shutdown(%d)", conn->alarm_out.poll.fd); + if (conn->alarm_out.poll.fd != STDOUT_FILENO){ + if (shutdown(conn->alarm_out.poll.fd, SHUT_WR)) + WARNF_perror("shutdown(%d)", conn->alarm_out.poll.fd); + } msp_get_remote(conn->sock, &remote); INFOF(" - Connection with %s:%d remote shutdown", alloca_tohex_sid_t(remote.sid), remote.port); } @@ -230,13 +250,17 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay if (state & MSP_STATE_CLOSED){ struct mdp_sockaddr remote; msp_get_remote(sock, &remote); - INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port); + INFOF(" - Connection with %s:%d closed %s", + alloca_tohex_sid_t(remote.sid), remote.port, + (state & MSP_STATE_STOPPED) ? "suddenly":"gracefully"); conn->sock = MSP_SOCKET_NULL; if (is_watching(&conn->alarm_in)) unwatch(&conn->alarm_in); - if (!is_watching(&conn->alarm_out)) + if (!is_watching(&conn->alarm_out)){ + // gracefully close now if we have no pending data free_connection(conn); + } assert(len == 0); return 0; @@ -262,6 +286,11 @@ static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *pa return len; } + if (once){ + // stop listening after the first incoming connection + msp_stop(listener); + } + struct mdp_sockaddr remote; msp_get_remote(sock, &remote); INFOF(" - New connection from %s:%d", alloca_tohex_sid_t(remote.sid), remote.port); @@ -271,11 +300,11 @@ static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *pa if (ip_addr.addrlen){ int fd = esocket(PF_INET, SOCK_STREAM, 0); if (fd==-1){ - msp_close(sock); + msp_stop(sock); return 0; } if (socket_connect(fd, &ip_addr)==-1){ - msp_close(sock); + msp_stop(sock); close(fd); return 0; } @@ -290,10 +319,6 @@ static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *pa if (payload) return msp_handler(sock, state, payload, len, conn); - if (once){ - // stop listening after the first incoming connection - msp_close(listener); - } assert(len == 0); return 0; } @@ -366,12 +391,12 @@ static void io_poll(struct sched_ent *alarm) remaining); if (r<0){ WARNF_perror("read(%d)", alarm->poll.fd); - alarm->poll.revents = POLLERR; + alarm->poll.revents |= POLLERR; } if (r==0){ // EOF r=-1; - alarm->poll.revents = POLLHUP; + alarm->poll.revents |= POLLHUP; } if (r>0){ conn->in->limit+=r; @@ -397,8 +422,10 @@ static void io_poll(struct sched_ent *alarm) ssize_t r = write(alarm->poll.fd, conn->out->bytes+conn->out->position, data); - if (r < 0) + if (r < 0 && errno != EAGAIN && errno != EWOULDBLOCK){ WARNF_perror("write(%d)", alarm->poll.fd); + alarm->poll.revents |= POLLERR; + } if (r > 0) conn->out->position+=r; } @@ -419,6 +446,7 @@ static void io_poll(struct sched_ent *alarm) if (!msp_socket_is_null(conn->sock)){ process_msp_asap(); }else{ + // gracefully close after flushing the last of the data free_connection(conn); } } @@ -439,7 +467,19 @@ static void io_poll(struct sched_ent *alarm) } if (alarm->poll.revents & POLLERR) { - free_connection(conn); + if (is_watching(&conn->alarm_in)) + unwatch(&conn->alarm_in); + if (is_watching(&conn->alarm_out)) + unwatch(&conn->alarm_out); + if (conn->alarm_in.poll.fd!=-1) + close(conn->alarm_in.poll.fd); + if (conn->alarm_out.poll.fd!=-1 && conn->alarm_out.poll.fd != conn->alarm_in.poll.fd) + close(conn->alarm_out.poll.fd); + conn->alarm_in.poll.fd=-1; + conn->alarm_in.poll.events=0; + conn->alarm_out.poll.fd=-1; + conn->alarm_out.poll.events=0; + msp_stop(conn->sock); process_msp_asap(); } } @@ -462,7 +502,7 @@ static void listen_poll(struct sched_ent *alarm) struct connection *connection = alloc_connection(sock, fd, io_poll, fd, io_poll); if (!connection){ - msp_close(sock); + msp_stop(sock); return; } @@ -478,6 +518,25 @@ static void listen_poll(struct sched_ent *alarm) } } +void sigQuit(int UNUSED(signal)) +{ + struct connection *c = connections; + while(c){ + if (!msp_socket_is_closed(c->sock)) + msp_stop(c->sock); + c->out->limit = c->out->position = 0; + c->in->limit = c->in->position = 0; + c->alarm_in.poll.events = 0; + c->alarm_out.poll.events = 0; + if (is_watching(&c->alarm_in)) + unwatch(&c->alarm_in); + if (is_watching(&c->alarm_out)) + unwatch(&c->alarm_out); + c=c->_next; + } + quit=1; +} + int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUSED(context)) { const char *sidhex, *port_string, *local_port_string; @@ -584,26 +643,26 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS } process_msp_asap(); - sigIntFlag = 0; - signal(SIGINT, sigIntHandler); - signal(SIGTERM, sigIntHandler); - - while(sigIntFlag==0 && fd_poll()){ + signal(SIGINT, sigQuit); + signal(SIGTERM, sigQuit); + quit=0; + while(!quit && fd_poll()){ ; } + time_ms_t dummy; + msp_processing(&dummy); ret = saw_error; signal(SIGINT, SIG_DFL); - sigIntFlag = 0; end: listener = MSP_SOCKET_NULL; - if (is_watching(&mdp_sock)) - unwatch(&mdp_sock); if (mdp_sock.poll.fd!=-1){ msp_close_all(mdp_sock.poll.fd); mdp_close(mdp_sock.poll.fd); mdp_sock.poll.fd=-1; } + if (is_watching(&mdp_sock)) + unwatch(&mdp_sock); if (service_sock.poll.fd!=-1){ if (is_watching(&service_sock)) unwatch(&service_sock); diff --git a/tests/msp b/tests/msp index 482c860a..4ad3fc42 100755 --- a/tests/msp +++ b/tests/msp @@ -66,18 +66,19 @@ server_hello() { Hello from the server EOF assertStdoutGrep --matches=1 "^Hello from the client$" + assertStderrGrep --matches=1 " Connection with .* closed gracefully$" tfw_cat --stderr } test_hello() { set_instance +A - fork server_hello - wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log" + fork %listen server_hello set_instance +B executeOk_servald --timeout=20 msp connect $SIDA 512 <