Add support for STOPping a connection before shutdown

This commit is contained in:
Jeremy Lakeman 2014-05-21 17:10:00 +09:30
parent 998a40938b
commit c0c5412931
6 changed files with 287 additions and 109 deletions

View File

@ -36,7 +36,7 @@ the other. In other words, an MSP message stream is a stream of bytes that
preserves write boundaries as read boundaries. Any application can easily use
an MSP connection as a simple ordered byte stream, like [TCP][], by ignoring
incoming message boundaries and buffering all input and output. (In future, an
MSP *buffered* mode may be provided to facilitate this.)
MSP *buffered* mode may be provided to facilitate and optimise this usage.)
[MSP][] is built on the [Mesh Datagram Protocol][MDP] which carries packets
unreliably between the two end points. MSP uses a combination of [sliding
@ -320,11 +320,11 @@ handler function one last time with the CLOSED flag set.
The `msp_processing()` function will never close a listening socket
automatically; that must be done explicitly by the application.
An application may close any open socket at any time by calling `msp_close()`.
This will cause `msp_processing()` to stop both directions of an open data
socket without alerting the remote end point, discard all queued messages
locally, and invoke the socket's handler function one last time with the CLOSED
flag set.
An application may close any open socket at any time by calling `msp_stop()`.
This will cause `msp_processing()` to stop the flow of data in both directions,
discard all queued messages and alert the remote end point to do the same.
The socket's handler function will be one last time with the CLOSED and STOPPED
flags set.
#### Finalisation
@ -632,7 +632,7 @@ size_t handler_function(MSP_SOCKET sock,
// Connection is no longer working and cannot be recovered. Do not
// release resources here; that will be done in the MSP_STATE_CLOSED case
// below.
msp_close(sock);
msp_stop(sock);
}
if (payload && len) {
// Process incoming message and return the number of bytes processed.
@ -841,21 +841,23 @@ When the remote party shuts down the socket at its end and all remaining data
has been transferred, including the *shutdown* packet from the remote end, the
socket will close automatically during `msp_processing()`.
#### `msp_close()` - Close a single MSP connection
#### `msp_stop()` - Close a single MSP connection
void msp_close(MSP_SOCKET sock);
void msp_stop(MSP_SOCKET sock);
Marks the given socket as closed. The socket is not actually cleaned up until
the next call to `msp_processing()`. If called from within a handler function,
the close takes effect as soon as the function returns.
The next call to `msp_processing()` will immediately terminate all i/o activity
for the socket without negotiating with or notifying the remote end, will
for the socket sending a notification to the remote end to do the same, will
discard all locally queued incoming and outgoing messages, and will make the
final invocation to the socket's handler function with the CLOSED flag set.
The remote end will have to rely on its MSP timeout logic to detect that the
MSP connection is finished. The effect is as though the local end point had
lost contact with the remote end with no warning.
final invocation to the socket's handler function with the CLOSED and STOPPED
flags set.
The remote end may miss the initial STOP notification due to packet loss,
for this reason, msp_recv will also send a STOP notification in response to
any connection it doesn't recognise.
#### `msp_close_all()` - Close all MSP connections on a given MDP socket
@ -866,6 +868,11 @@ socket. This function is intended to be used after an application's main loop
has terminated, and just before the application itself terminates, so it does
not require any subsequent call to `mdp_processing()`.
No notification will be sent to any remote parties about the state of any
connected MSP sockets. The remote end will have to rely on its MSP timeout
logic to detect that the MSP connection is finished. The effect is as
though the local end point had lost contact with the remote end with no warning.
Calling `msp_close_all()` from within a handler function will have *undefined
results*.

View File

@ -363,7 +363,7 @@ int fd_poll()
/* If file descriptors are ready, then call the appropriate functions */
if (r>0) {
for(i=fdcount -1;i>=0;i--){
if (fds[i].revents) {
if (fd_callbacks[i] && fd_callbacks[i]->poll.fd == fds[i].fd && fds[i].revents) {
// if any handles have POLLIN set, don't process any other handles
if (!(fds[i].revents&POLLIN || in_count==0))
continue;

View File

@ -30,6 +30,8 @@
#define FLAG_SHUTDOWN (1<<0)
#define FLAG_ACK (1<<1)
#define FLAG_FIRST (1<<2)
#define FLAG_STOP (1<<3)
#define RETRANSMIT_TIME 1500
struct msp_packet{
@ -59,6 +61,7 @@ struct msp_sock{
unsigned salt;
int mdp_sock;
msp_state_t state;
msp_state_t last_state;
struct msp_window tx;
struct msp_window rx;
uint16_t previous_ack;
@ -117,6 +120,7 @@ MSP_SOCKET msp_socket(int mdp_sock, int flags)
sock->salt = salt_counter;
sock->mdp_sock = mdp_sock;
sock->state = MSP_STATE_UNINITIALISED;
sock->last_state = 0xFFFF;
// TODO set base rtt to ensure that we send the first packet a few times before giving up
sock->tx.base_rtt = sock->tx.rtt = 0xFFFFFFFF;
sock->tx.last_activity = TIME_MS_NEVER_HAS;
@ -220,6 +224,8 @@ static void free_all_packets(struct msp_window *window)
free((void *)free_me->payload);
free(free_me);
}
window->_head = NULL;
window->packet_count=0;
}
static void free_acked_packets(struct msp_window *window, uint16_t seq)
@ -267,24 +273,33 @@ static void msp_free(struct msp_sock *sock)
if (sock->_next)
sock->_next->_prev = sock->_prev;
// last chance to free other resources
if (sock->handler) {
size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context);
assert(nconsumed == 0);
}
free_all_packets(&sock->tx);
free_all_packets(&sock->rx);
// one last chance for clients to free other resources
if (sock->handler && sock->last_state != sock->state) {
size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context);
assert(nconsumed == 0);
}
sock->salt = SALT_INVALID; // invalidate all handles that point here
free(sock);
}
void msp_close(MSP_SOCKET handle)
void msp_stop(MSP_SOCKET handle)
{
struct msp_sock *sock = handle_to_sock(&handle);
// TODO if never sent / received, just free it
sock->state |= MSP_STATE_CLOSED;
if (sock->state & MSP_STATE_STOPPED)
return;
sock->state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED;
// if this a connectable socket, send a stop packet
if (sock->header.remote.port && !(sock->state & MSP_STATE_LISTENING)){
uint8_t response = FLAG_STOP;
// we don't have a matching socket, reply with STOP flag to force breaking the connection
// TODO global rate limit?
mdp_send(sock->mdp_sock, &sock->header, &response, 1);
if (config.debug.msp)
DEBUGF("Sending STOP packet");
}
}
void msp_close_all(int mdp_sock)
@ -424,6 +439,9 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
// only set the ack flag if we've received a sequenced packet
if (sock->state & MSP_STATE_RECEIVED_DATA)
msp_header[0]|=FLAG_ACK;
// never received anything? set the connect flag
if (!(sock->state & MSP_STATE_RECEIVED_PACKET))
msp_header[0]|=FLAG_FIRST;
write_uint16(&msp_header[1], sock->rx.next_seq);
write_uint16(&msp_header[3], packet->seq);
@ -459,7 +477,7 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
return -1;
}
if (config.debug.msp)
DEBUGF("Sent packet seq %02x len %zd (acked %02x)", packet->seq, packet->len, sock->rx.next_seq);
DEBUGF("Sent packet flags %02x seq %02x len %zd (acked %02x)", msp_header[0], packet->seq, packet->len, sock->rx.next_seq);
sock->tx.last_activity = packet->sent = gettime_ms();
sock->next_ack = packet->sent + RETRANSMIT_TIME;
return 0;
@ -480,6 +498,10 @@ static int send_ack(struct msp_sock *sock)
// (but we can indicate the existence of the connection)
if (sock->state & MSP_STATE_RECEIVED_DATA)
msp_header[0]|=FLAG_ACK;
// never received anything? set the connect flag
if (!(sock->state & MSP_STATE_RECEIVED_PACKET))
msp_header[0]|=FLAG_FIRST;
write_uint16(&msp_header[1], sock->rx.next_seq);
@ -504,7 +526,7 @@ static int send_ack(struct msp_sock *sock)
return -1;
}
if (config.debug.msp)
DEBUGF("Sent packet (acked %02x)", sock->rx.next_seq);
DEBUGF("Sent packet flags %02x (acked %02x)", msp_header[0], sock->rx.next_seq);
sock->previous_ack = sock->rx.next_seq;
sock->tx.last_activity = gettime_ms();
sock->next_ack = sock->tx.last_activity + RETRANSMIT_TIME;
@ -519,9 +541,8 @@ ssize_t msp_send(MSP_SOCKET handle, const uint8_t *payload, size_t len)
assert(sock->header.remote.port);
assert((sock->state & MSP_STATE_SHUTDOWN_LOCAL)==0);
if (sock->tx.packet_count > MAX_WINDOW_SIZE)
if ((sock->state & MSP_STATE_CLOSED) || sock->tx.packet_count > MAX_WINDOW_SIZE)
return -1;
if (add_packet(&sock->tx, sock->tx.next_seq, 0, payload, len)==-1)
return -1;
@ -559,9 +580,8 @@ static int process_sock(struct msp_sock *sock)
time_ms_t now = gettime_ms();
if (sock->timeout < now){
WHY("MSP socket timed out");
sock->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR);
return -1;
return WHY("MSP socket timed out");
}
sock->next_action = sock->timeout;
@ -588,8 +608,10 @@ static int process_sock(struct msp_sock *sock)
else
assert(packet->offset == 0);
size_t len = packet->len - packet->offset;
size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, packet->payload, len, sock->context);
msp_state_t state = sock->state;
size_t nconsumed = sock->handler(sock_to_handle(sock), state, packet->payload, len, sock->context);
assert(nconsumed <= len);
sock->last_state = state;
// stop calling the handler if nothing was consumed
if (nconsumed == 0 && len != 0)
break;
@ -599,12 +621,19 @@ static int process_sock(struct msp_sock *sock)
continue;
assert(packet->offset == packet->len);
}
// no handler? just discard any data
p=p->_next;
sock->rx.next_seq++;
}
free_acked_packets(&sock->rx, sock->rx.next_seq -1);
if (sock->handler && sock->last_state != sock->state){
msp_state_t state = sock->state;
size_t nconsumed = sock->handler(sock_to_handle(sock), state, NULL, 0, sock->context);
assert(nconsumed == 0);
sock->last_state = state;
}
// transmit packets that can now be sent
p = sock->tx._head;
while(p){
@ -642,9 +671,11 @@ static int process_sock(struct msp_sock *sock)
if (sock->next_action > sock->next_ack)
sock->next_action = sock->next_ack;
// when we've delivered all local packets
// and all our data packets have been acked, close.
if ( (sock->state & MSP_STATE_SHUTDOWN_LOCAL)
&& (sock->state & MSP_STATE_SHUTDOWN_REMOTE)
&& sock->tx.packet_count == 0
&& sock->tx.packet_count == 0
&& sock->rx.packet_count == 0
&& sock->previous_ack == sock->rx.next_seq
){
@ -660,18 +691,17 @@ int msp_processing(time_ms_t *next_action)
time_ms_t next=TIME_MS_NEVER_WILL;
struct msp_sock *sock = root;
while(sock){
if (!(sock->state & MSP_STATE_CLOSED)) {
// this might cause the socket to be closed
// remember the time of the next thing we need to do.
if (process_sock(sock)==0 && sock->next_action < next)
next=sock->next_action;
}else if (sock->next_action < next)
next=sock->next_action;
// this might cause the socket to be closed
// remember the time of the next thing we need to do.
process_sock(sock);
if (sock->state & MSP_STATE_CLOSED){
struct msp_sock *s = sock->_next;
msp_free(sock);
sock=s;
}else{
if (sock->next_action < next)
next=sock->next_action;
sock = sock->_next;
}
}
@ -681,13 +711,15 @@ int msp_processing(time_ms_t *next_action)
static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t *payload, size_t len)
{
// any kind of error reported by the daemon, close all related msp connections
// any kind of error reported by the daemon, close all related msp connections on this mdp socket
if (header->flags & MDP_FLAG_ERROR){
WHY("Error returned from daemon");
msp_close_all(mdp_sock);
return -1;
}
uint8_t flags=0;
// find or create mdp_sock...
struct msp_sock *sock=NULL;
{
@ -702,9 +734,10 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
s->header.flags &= ~MDP_FLAG_BIND;
if (config.debug.msp)
DEBUGF("Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port);
s->next_action = gettime_ms();
if (s->state & MSP_STATE_LISTENING)
s->timeout = TIME_MS_NEVER_WILL;
s->next_action = s->timeout = TIME_MS_NEVER_WILL;
else
s->next_action = gettime_ms();
return 0;
}
@ -723,9 +756,20 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
}
s = s->_next;
}
if (listen && !sock){
// create a new socket for the incoming connection
if (len<1)
return WHY("Expected at least 1 byte");
flags = payload[0];
// ignore any stop packet if we have no matching connection
if (!sock && flags & FLAG_STOP){
if (config.debug.msp)
DEBUGF("Ignoring STOP packet, no matching connection");
return 0;
}
if (listen && (flags&FLAG_FIRST) && !sock){
// create a new socket for incoming connections
MSP_SOCKET handle = msp_socket(listen->mdp_sock, 0);
sock = handle.ptr;
if (sock) {
@ -737,19 +781,29 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
}
if (!sock){
WARNF("Unexpected packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port);
// TODO reply with shutdown ack to forcefully break the connection?
uint8_t response = FLAG_STOP;
// we don't have a matching socket, reply with STOP flag to force breaking the connection
// TODO global rate limit?
mdp_send(mdp_sock, header, &response, 1);
if (config.debug.msp)
DEBUGF("Replying to unexpected packet with STOP packet");
return 0;
}
}
if (len<3)
return WHY("Expected at least 3 bytes");
sock->rx.last_activity = gettime_ms();
sock->timeout = sock->rx.last_activity + 10000;
sock->state |= MSP_STATE_RECEIVED_PACKET;
uint8_t flags = payload[0];
if (flags & FLAG_STOP){
if (config.debug.msp)
DEBUGF("Closing socket due to STOP packet");
msp_stop(sock_to_handle(sock));
return 0;
}
if (len<3)
return 0;
if (flags & FLAG_ACK){
uint16_t ack_seq = read_uint16(&payload[1]);
@ -759,18 +813,21 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
// TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet
}
// we might have space for more data now
if (sock->tx.packet_count < MAX_WINDOW_SIZE
&& !(sock->state & MSP_STATE_DATAOUT)
&& !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)){
sock->state|=MSP_STATE_DATAOUT;
if (sock->handler)
sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context);
// TODO Why isn't the handler getting called in the next msp_processing???
if (sock->handler && sock->last_state != sock->state){
msp_state_t state = sock->state;
ssize_t nconsumed = sock->handler(sock_to_handle(sock), state, NULL, 0, sock->context);
assert(nconsumed == 0);
sock->last_state = state;
}
}
// make sure we attempt to process packets from this sock soon
// TODO calculate based on congestion window
sock->state |= MSP_STATE_RECEIVED_PACKET;
sock->next_action = gettime_ms();
if (len<MSP_PAYLOAD_PREAMBLE_SIZE)

View File

@ -63,6 +63,7 @@ msp_state_t msp_get_state(MSP_SOCKET sock);
#define MSP_STATE_ERROR ((msp_state_t) (1<<6))
// is there space for sending more data?
#define MSP_STATE_DATAOUT ((msp_state_t) (1<<7))
#define MSP_STATE_STOPPED ((msp_state_t) (1<<8))
int msp_socket_is_initialising(MSP_SOCKET);
int msp_socket_is_open(MSP_SOCKET);
@ -87,7 +88,7 @@ int msp_listen(MSP_SOCKET sock);
// close socket(s)
int msp_shutdown(MSP_SOCKET sock);
void msp_close(MSP_SOCKET);
void msp_stop(MSP_SOCKET sock);
void msp_close_all(int mdp_sock);
void msp_debug(void);

View File

@ -34,6 +34,8 @@ struct buffer{
};
struct connection{
struct connection *_next;
struct connection *_prev;
struct sched_ent alarm_in;
struct sched_ent alarm_out;
MSP_SOCKET sock;
@ -43,11 +45,13 @@ struct connection{
int last_state;
};
struct connection *connections=NULL;
int saw_error=0;
int once =0;
MSP_SOCKET listener = MSP_SOCKET_NULL;
struct mdp_sockaddr remote_addr;
struct socket_address ip_addr;
char quit=0;
static int try_send(struct connection *conn);
static void msp_poll(struct sched_ent *alarm);
@ -136,6 +140,10 @@ static struct connection *alloc_connection(
conn->in->position = conn->out->position = 0;
conn->in->limit = conn->out->limit = 0;
conn->in->capacity = conn->out->capacity = 1024;
if (connections)
connections->_prev = conn;
conn->_next = connections;
connections = conn;
return conn;
}
@ -145,27 +153,37 @@ static void free_connection(struct connection *conn)
return;
if (!msp_socket_is_closed(conn->sock)){
msp_set_handler(conn->sock, msp_handler, NULL);
msp_close(conn->sock);
msp_stop(conn->sock);
}
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (is_watching(&conn->alarm_out))
unwatch(&conn->alarm_out);
if (conn->in)
free(conn->in);
if (conn->out)
free(conn->out);
conn->in=NULL;
conn->out=NULL;
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (is_watching(&conn->alarm_out))
unwatch(&conn->alarm_out);
if (conn->alarm_in.poll.fd!=-1)
close(conn->alarm_in.poll.fd);
if (conn->alarm_out.poll.fd!=-1 && conn->alarm_out.poll.fd != conn->alarm_in.poll.fd)
close(conn->alarm_out.poll.fd);
conn->in=NULL;
conn->out=NULL;
conn->alarm_in.poll.fd=-1;
conn->alarm_out.poll.fd=-1;
if (conn->_next)
conn->_next->_prev = conn->_prev;
if (conn->_prev)
conn->_prev->_next = conn->_next;
if (conn==connections)
connections = conn->_next;
free(conn);
if (msp_socket_count()==0 && is_watching(&mdp_sock))
if (!connections && !msp_socket_is_listening(listener))
unwatch(&mdp_sock);
}
@ -180,8 +198,10 @@ static void process_msp_asap()
static void remote_shutdown(struct connection *conn)
{
struct mdp_sockaddr remote;
if (shutdown(conn->alarm_out.poll.fd, SHUT_WR))
WARNF_perror("shutdown(%d)", conn->alarm_out.poll.fd);
if (conn->alarm_out.poll.fd != STDOUT_FILENO){
if (shutdown(conn->alarm_out.poll.fd, SHUT_WR))
WARNF_perror("shutdown(%d)", conn->alarm_out.poll.fd);
}
msp_get_remote(conn->sock, &remote);
INFOF(" - Connection with %s:%d remote shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
}
@ -230,13 +250,17 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay
if (state & MSP_STATE_CLOSED){
struct mdp_sockaddr remote;
msp_get_remote(sock, &remote);
INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port);
INFOF(" - Connection with %s:%d closed %s",
alloca_tohex_sid_t(remote.sid), remote.port,
(state & MSP_STATE_STOPPED) ? "suddenly":"gracefully");
conn->sock = MSP_SOCKET_NULL;
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (!is_watching(&conn->alarm_out))
if (!is_watching(&conn->alarm_out)){
// gracefully close now if we have no pending data
free_connection(conn);
}
assert(len == 0);
return 0;
@ -262,6 +286,11 @@ static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *pa
return len;
}
if (once){
// stop listening after the first incoming connection
msp_stop(listener);
}
struct mdp_sockaddr remote;
msp_get_remote(sock, &remote);
INFOF(" - New connection from %s:%d", alloca_tohex_sid_t(remote.sid), remote.port);
@ -271,11 +300,11 @@ static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *pa
if (ip_addr.addrlen){
int fd = esocket(PF_INET, SOCK_STREAM, 0);
if (fd==-1){
msp_close(sock);
msp_stop(sock);
return 0;
}
if (socket_connect(fd, &ip_addr)==-1){
msp_close(sock);
msp_stop(sock);
close(fd);
return 0;
}
@ -290,10 +319,6 @@ static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *pa
if (payload)
return msp_handler(sock, state, payload, len, conn);
if (once){
// stop listening after the first incoming connection
msp_close(listener);
}
assert(len == 0);
return 0;
}
@ -366,12 +391,12 @@ static void io_poll(struct sched_ent *alarm)
remaining);
if (r<0){
WARNF_perror("read(%d)", alarm->poll.fd);
alarm->poll.revents = POLLERR;
alarm->poll.revents |= POLLERR;
}
if (r==0){
// EOF
r=-1;
alarm->poll.revents = POLLHUP;
alarm->poll.revents |= POLLHUP;
}
if (r>0){
conn->in->limit+=r;
@ -397,8 +422,10 @@ static void io_poll(struct sched_ent *alarm)
ssize_t r = write(alarm->poll.fd,
conn->out->bytes+conn->out->position,
data);
if (r < 0)
if (r < 0 && errno != EAGAIN && errno != EWOULDBLOCK){
WARNF_perror("write(%d)", alarm->poll.fd);
alarm->poll.revents |= POLLERR;
}
if (r > 0)
conn->out->position+=r;
}
@ -419,6 +446,7 @@ static void io_poll(struct sched_ent *alarm)
if (!msp_socket_is_null(conn->sock)){
process_msp_asap();
}else{
// gracefully close after flushing the last of the data
free_connection(conn);
}
}
@ -439,7 +467,19 @@ static void io_poll(struct sched_ent *alarm)
}
if (alarm->poll.revents & POLLERR) {
free_connection(conn);
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (is_watching(&conn->alarm_out))
unwatch(&conn->alarm_out);
if (conn->alarm_in.poll.fd!=-1)
close(conn->alarm_in.poll.fd);
if (conn->alarm_out.poll.fd!=-1 && conn->alarm_out.poll.fd != conn->alarm_in.poll.fd)
close(conn->alarm_out.poll.fd);
conn->alarm_in.poll.fd=-1;
conn->alarm_in.poll.events=0;
conn->alarm_out.poll.fd=-1;
conn->alarm_out.poll.events=0;
msp_stop(conn->sock);
process_msp_asap();
}
}
@ -462,7 +502,7 @@ static void listen_poll(struct sched_ent *alarm)
struct connection *connection = alloc_connection(sock, fd, io_poll, fd, io_poll);
if (!connection){
msp_close(sock);
msp_stop(sock);
return;
}
@ -478,6 +518,25 @@ static void listen_poll(struct sched_ent *alarm)
}
}
void sigQuit(int UNUSED(signal))
{
struct connection *c = connections;
while(c){
if (!msp_socket_is_closed(c->sock))
msp_stop(c->sock);
c->out->limit = c->out->position = 0;
c->in->limit = c->in->position = 0;
c->alarm_in.poll.events = 0;
c->alarm_out.poll.events = 0;
if (is_watching(&c->alarm_in))
unwatch(&c->alarm_in);
if (is_watching(&c->alarm_out))
unwatch(&c->alarm_out);
c=c->_next;
}
quit=1;
}
int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *sidhex, *port_string, *local_port_string;
@ -584,26 +643,26 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
}
process_msp_asap();
sigIntFlag = 0;
signal(SIGINT, sigIntHandler);
signal(SIGTERM, sigIntHandler);
while(sigIntFlag==0 && fd_poll()){
signal(SIGINT, sigQuit);
signal(SIGTERM, sigQuit);
quit=0;
while(!quit && fd_poll()){
;
}
time_ms_t dummy;
msp_processing(&dummy);
ret = saw_error;
signal(SIGINT, SIG_DFL);
sigIntFlag = 0;
end:
listener = MSP_SOCKET_NULL;
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
if (mdp_sock.poll.fd!=-1){
msp_close_all(mdp_sock.poll.fd);
mdp_close(mdp_sock.poll.fd);
mdp_sock.poll.fd=-1;
}
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
if (service_sock.poll.fd!=-1){
if (is_watching(&service_sock))
unwatch(&service_sock);

View File

@ -66,18 +66,19 @@ server_hello() {
Hello from the server
EOF
assertStdoutGrep --matches=1 "^Hello from the client$"
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stderr
}
test_hello() {
set_instance +A
fork server_hello
wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log"
fork %listen server_hello
set_instance +B
executeOk_servald --timeout=20 msp connect $SIDA 512 <<EOF
Hello from the client
EOF
assertStdoutGrep --matches=1 "^Hello from the server$"
fork_wait_all
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
fork_wait %listen
}
doc_client_no_data="Client connection with no data"
@ -89,18 +90,19 @@ setup_client_no_data() {
}
server_client_no_data() {
executeOk_servald --timeout=20 msp listen 512 <file1
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stderr
}
test_client_no_data() {
set_instance +A
fork server_client_no_data
wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log"
fork %listen server_client_no_data
set_instance +B
executeOk_servald --timeout=20 msp connect $SIDA 512 <<EOF
EOF
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stderr
assert diff file1 "$TFWSTDOUT"
fork_wait_all
fork_wait %listen
}
doc_server_no_data="Server sends no data"
@ -113,17 +115,18 @@ setup_server_no_data() {
server_server_no_data() {
executeOk_servald --timeout=20 msp listen 512 <<EOF
EOF
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stderr
assert diff file1 "$TFWSTDOUT"
}
test_server_no_data() {
set_instance +A
fork server_server_no_data
wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log"
fork %listen server_server_no_data
set_instance +B
executeOk_servald --timeout=20 msp connect $SIDA 512 <file1
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stderr
fork_wait_all
fork_wait %listen
}
doc_keep_alive="Keep the connection alive with no data"
@ -134,19 +137,20 @@ setup_keep_alive() {
}
listen_pipe() {
executeOk_servald msp listen 512 < <(echo "START" && sleep 20 && echo "END")
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stdout --stderr
}
connect_pipe() {
executeOk_servald msp connect $1 512 < <(echo "START" && sleep 20 && echo "END")
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stdout --stderr
}
test_keep_alive() {
set_instance +A
fork listen_pipe
wait_until --timeout=10 grep "Bind MDP $SIDA:512" "$instance_servald_log"
fork %listen listen_pipe
set_instance +B
fork connect_pipe $SIDA
fork_wait_all
fork_wait %listen
}
doc_forward="Forward TCP connections to a remote server"
@ -157,18 +161,18 @@ setup_forward() {
}
client_forward() {
executeOk --timeout=20 $servald msp connect --once --forward=$1 $2 512
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stdout --stderr
}
test_forward() {
set_instance +A
fork server_hello
wait_until --timeout=10 grep "Bind MDP $SIDA:512" "$instance_servald_log"
fork %listen server_hello
set_instance +B
fork client_forward 2048 $SIDA
sleep 1
executeOk nc -v 127.0.0.1 2048 < <(echo "Hello from the client")
assertStdoutGrep --matches=1 "^Hello from the server$"
fork_wait_all
fork_wait %listen
}
doc_tcp_tunnel="Tunnel a tcp connection"
@ -179,6 +183,7 @@ setup_tcp_tunnel() {
}
server_forward() {
executeOk_servald msp listen --once --forward=$1 512
assertStderrGrep --matches=1 " Connection with .* closed gracefully$"
tfw_cat --stderr
}
nc_listen() {
@ -187,15 +192,64 @@ nc_listen() {
}
test_tcp_tunnel() {
fork nc_listen 6000
fork %listen nc_listen 6000
set_instance +A
fork server_forward 6000
fork %server server_forward 6000
set_instance +B
fork client_forward 6001 $SIDA
fork %client client_forward 6001 $SIDA
sleep 1
executeOk nc -v 127.0.0.1 6001 < <(echo "Hello from the client")
assertStdoutGrep --matches=1 "^Hello from the server$"
fork_wait_all
fork_wait %listen %server %client
}
doc_refused="TCP connection refused on forwarded stream"
setup_refused(){
setup_servald
assert_no_servald_processes
start_servald_instances +A +B
}
server_refused(){
executeOk $servald msp listen --once --forward=4001 512
assertStderrGrep --matches=1 " Connection refused"
tfw_cat --stderr
}
test_refused(){
set_instance +A
fork %listen server_refused
set_instance +B
executeOk_servald msp connect $SIDA 512 < <(echo "Hello")
assertStderrGrep --matches=1 " Connection with .* closed suddenly$"
tfw_cat --stdout --stderr
fork_wait %listen
}
doc_terminate="Terminate a connection mid stream"
setup_terminate() {
setup_servald
assert_no_servald_processes
start_servald_instances +A +B
}
client_terminate() {
executeOk_servald msp connect --once --forward=3001 $SIDA 512
assertStderrGrep --matches=1 " Connection with .* closed suddenly$"
tfw_cat --stdout --stderr
}
server_terminate() {
executeOk_servald msp listen 512 < <(dd if=/dev/urandom bs=1k count=500 | hexdump -C)
assertStderrGrep --matches=1 " Connection with .* closed suddenly$"
tfw_cat --stdout --stderr
}
test_terminate() {
set_instance +A
fork %msplisten server_terminate
set_instance +B
fork %mspconnect client_terminate
sleep 1
# todo capture nc output nicely, while still being able to terminate it
fork %ncconnect nc -v 127.0.0.1 3001
fork_terminate %ncconnect
fork_wait %msplisten %mspconnect %ncconnect
}
runTests "$@"