Improve msp failure handling

This commit is contained in:
Jeremy Lakeman 2013-12-05 11:25:47 +10:30
parent db7f68afd7
commit 3a533187b7
4 changed files with 135 additions and 89 deletions

View File

@ -9,6 +9,7 @@
#include "log.h"
#define FLAG_SHUTDOWN (1<<0)
#define FLAG_ACK (1<<1)
struct msp_packet{
struct msp_packet *_next;
@ -26,6 +27,7 @@ struct msp_window{
uint32_t base_rtt;
uint32_t rtt;
uint16_t next_seq; // seq of next expected TX or RX packet.
time_ms_t last_activity;
struct msp_packet *_head, *_tail;
};
@ -40,7 +42,6 @@ struct msp_sock{
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context);
void *context;
struct mdp_header header;
time_ms_t last_rx;
time_ms_t timeout;
time_ms_t next_action;
};
@ -55,6 +56,9 @@ struct msp_sock * msp_socket(int mdp_sock)
ret->_next = root;
// TODO set base rtt to ensure that we send the first packet a few times before giving up
ret->tx.base_rtt = ret->tx.rtt = 0xFFFFFFFF;
ret->tx.last_activity = TIME_NEVER_HAS;
ret->rx.last_activity = TIME_NEVER_HAS;
ret->timeout = gettime_ms() + 10000;
if (root)
root->_prev=ret;
root = ret;
@ -118,6 +122,18 @@ void msp_close(struct msp_sock *sock)
free(sock);
}
void msp_close_all(int mdp_sock)
{
struct msp_sock **p = &root;
while(*p){
if ((*p)->mdp_sock == mdp_sock){
msp_close(*p);
}else{
p=&(*p)->_next;
}
}
}
int msp_set_handler(struct msp_sock *sock,
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context),
void *context)
@ -161,8 +177,14 @@ int msp_listen(struct msp_sock *sock)
sock->state |= MSP_STATE_LISTENING;
sock->header.flags |= MDP_FLAG_BIND;
mdp_send(sock->mdp_sock, &sock->header, NULL, 0);
if (mdp_send(sock->mdp_sock, &sock->header, NULL, 0)==-1){
sock->state|=MSP_STATE_ERROR;
msp_close(sock);
return -1;
}
sock->timeout = TIME_NEVER_WILL;
return 0;
}
@ -241,6 +263,11 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
uint8_t msp_header[5];
msp_header[0]=packet->flags;
// only set the ack flag if we've received a sequenced packet
if (sock->state & MSP_STATE_RECEIVED_DATA)
msp_header[0]|=FLAG_ACK;
write_uint16(&msp_header[1], sock->rx.next_seq);
write_uint16(&msp_header[3], packet->seq);
sock->previous_ack = sock->rx.next_seq;
@ -269,17 +296,16 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
data.fragment_count --;
ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data);
if (r==-1)
if (r==-1){
msp_close_all(sock->mdp_sock);
return -1;
packet->sent = gettime_ms();
if (!sock->timeout)
sock->timeout = packet->sent + 10000;
}
sock->tx.last_activity = packet->sent = gettime_ms();
return 0;
}
static int send_ack(struct msp_sock *sock)
{
D;
if (daemon_addr.addrlen == 0){
if (make_local_sockaddr(&daemon_addr, "mdp.2.socket") == -1)
return -1;
@ -288,6 +314,11 @@ static int send_ack(struct msp_sock *sock)
uint8_t msp_header[3];
msp_header[0]=0;
// if we haven't heard a sequence number, we can't ack data
// (but we can indicate the existence of the connection)
if (sock->state & MSP_STATE_RECEIVED_DATA)
msp_header[0]|=FLAG_ACK;
write_uint16(&msp_header[1], sock->rx.next_seq);
sock->previous_ack = sock->rx.next_seq;
@ -306,7 +337,12 @@ static int send_ack(struct msp_sock *sock)
};
ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data);
return r<0?-1:0;
if (r==-1){
msp_close_all(sock->mdp_sock);
return -1;
}
sock->tx.last_activity = gettime_ms();
return 0;
}
// add a packet to the transmit buffer
@ -347,7 +383,7 @@ static int process_sock(struct msp_sock *sock)
{
time_ms_t now = gettime_ms();
if (sock->timeout && sock->timeout < now){
if (sock->timeout < now){
msp_close(sock);
return -1;
}
@ -383,14 +419,11 @@ static int process_sock(struct msp_sock *sock)
p = sock->tx._head;
while(p){
if (p->sent==0 || p->sent + sock->tx.rtt*2 < now){
if (!(sock->state & (MSP_STATE_CONNECTING|MSP_STATE_CONNECTED))){
sock->state |= MSP_STATE_CONNECTING;
if (!sock->header.local.port){
if (sock->header.flags & MDP_FLAG_BIND)
// wait until we have heard back from the daemon with our port number before sending another packet.
break;
sock->header.flags |= MDP_FLAG_BIND;
}else if (!sock->header.local.port){
// wait until we have heard back from the daemon with our port number before sending another packet.
break;
}
if (msp_send_packet(sock, p)==-1)
return -1;
@ -437,82 +470,89 @@ int msp_processing(time_ms_t *next_action)
return 0;
}
static struct msp_sock * find_connection(int mdp_sock, struct mdp_header *header)
{
struct msp_sock *s=root;
struct msp_sock *listen=NULL;
while(s){
if (s->mdp_sock == mdp_sock ){
if ((s->header.flags & MDP_FLAG_BIND) && (header->flags & MDP_FLAG_BIND)){
// bind response from the daemon
s->header.local = header->local;
s->header.flags &= ~MDP_FLAG_BIND;
DEBUGF("Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port);
return NULL;
}
if (s->state & MSP_STATE_LISTENING){
// remember any matching listen socket so we can create a connection on first use
if (s->header.local.port == header->local.port
&& (is_sid_t_any(s->header.local.sid)
|| memcmp(&s->header.local.sid, &header->local.sid, SID_SIZE)==0))
listen=s;
}else if (memcmp(&s->header.remote, &header->remote, sizeof header->remote)==0
&& memcmp(&s->header.local, &header->local, sizeof header->local)==0){
// if the addresses match, we found it.
return s;
}
}
s = s->_next;
}
if (listen){
// create socket for incoming connection
s = msp_socket(listen->mdp_sock);
s->header = *header;
// use the same handler initially
s->handler = listen->handler;
s->context = listen->context;
return s;
}
WARNF("Unexpected packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port);
return NULL;
}
static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t *payload, size_t len)
{
DEBUGF("packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port);
// find or create mdp_sock...
struct msp_sock *sock=find_connection(mdp_sock, header);
if (!sock)
// any kind of error reported by the daemon, close all related msp connections
if (header->flags & MDP_FLAG_ERROR){
msp_close_all(mdp_sock);
return 0;
}
// find or create mdp_sock...
struct msp_sock *sock=NULL;
{
struct msp_sock *s=root;
struct msp_sock *listen=NULL;
while(s){
if (s->mdp_sock == mdp_sock ){
if ((s->header.flags & MDP_FLAG_BIND) && (header->flags & MDP_FLAG_BIND)){
// process bind response from the daemon
s->header.local = header->local;
s->header.flags &= ~MDP_FLAG_BIND;
DEBUGF("Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port);
return 0;
}
if (s->state & MSP_STATE_LISTENING){
// remember any matching listen socket so we can create a connection on first use
if (s->header.local.port == header->local.port
&& (is_sid_t_any(s->header.local.sid)
|| memcmp(&s->header.local.sid, &header->local.sid, SID_SIZE)==0))
listen=s;
}else if (memcmp(&s->header.remote, &header->remote, sizeof header->remote)==0
&& memcmp(&s->header.local, &header->local, sizeof header->local)==0){
// if the addresses match, we found it.
sock=s;
break;
}
}
s = s->_next;
}
if (listen && !sock){
// create a new socket for the incoming connection
sock = msp_socket(listen->mdp_sock);
sock->header = *header;
// use the same handler initially
sock->handler = listen->handler;
sock->context = listen->context;
}
if (!sock){
WARNF("Unexpected packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port);
// TODO reply with shutdown ack to forcefully break the connection?
return 0;
}
}
if (len<3)
return WHY("Expected at least 3 bytes");
sock->state |= MSP_STATE_CONNECTED;
sock->last_rx = gettime_ms();
sock->timeout = sock->last_rx + 10000;
sock->rx.last_activity = gettime_ms();
sock->timeout = sock->rx.last_activity + 10000;
uint8_t flags = payload[0];
uint16_t ack_seq = read_uint16(&payload[1]);
// release acknowledged packets
free_acked_packets(&sock->tx, ack_seq);
// TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet
if (flags & FLAG_ACK){
uint16_t ack_seq = read_uint16(&payload[1]);
// release acknowledged packets
free_acked_packets(&sock->tx, ack_seq);
// TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet
}
// make sure we attempt to process packets from this sock soon
// TODO calculate based on congestion window
sock->state |= MSP_STATE_RECEIVED_PACKET;
sock->next_action = gettime_ms();
if (len<5)
return 0;
sock->state |= MSP_STATE_RECEIVED_DATA;
uint16_t seq = read_uint16(&payload[3]);
add_packet(&sock->rx, seq, flags, &payload[5], len - 5);

View File

@ -4,8 +4,8 @@
#define MSP_STATE_UNINITIALISED 0
#define MSP_STATE_LISTENING (1<<0)
#define MSP_STATE_CONNECTING (1<<1)
#define MSP_STATE_CONNECTED (1<<2)
#define MSP_STATE_RECEIVED_DATA (1<<1)
#define MSP_STATE_RECEIVED_PACKET (1<<2)
#define MSP_STATE_SHUTDOWN_LOCAL (1<<3)
#define MSP_STATE_SHUTDOWN_REMOTE (1<<4)
@ -13,7 +13,7 @@
// this connection is about to be free'd, release any other resources or references to the state
#define MSP_STATE_CLOSED (1<<5)
// something has gone wrong somewhere and the connection will be closed
// something has gone wrong somewhere
#define MSP_STATE_ERROR (1<<6)
// does the client want to be interupted to reading data?
@ -33,6 +33,7 @@ typedef uint16_t msp_state_t;
// allocate a new socket
struct msp_sock * msp_socket(int mdp_sock);
void msp_close(struct msp_sock *sock);
void msp_close_all(int mdp_sock);
int msp_set_handler(struct msp_sock *sock,
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context),

View File

@ -152,7 +152,7 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t
if (payload)
return msp_handler(sock, state, payload, len, conn);
// stop listening after an incoming connection
// stop listening after the first incoming connection
msp_close(listener);
return 0;
@ -315,7 +315,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
if (sidhex && *sidhex){
stdio_connection = alloc_connection();
if (!stdio_connection)
return -1;
goto end;
stdio_connection->sock = sock;
msp_set_handler(sock, msp_handler, stdio_connection);
msp_set_remote(sock, addr);
@ -327,11 +327,14 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
}else{
msp_set_handler(sock, msp_listener, NULL);
msp_set_local(sock, addr);
msp_listen(sock);
// sock will be closed if listen fails
if (msp_listen(sock)==-1)
goto end;
listener=sock;
INFOF(" - Listening on port %d", addr.port);
}
sock=NULL;
while(fd_poll()){
;
@ -339,9 +342,9 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
ret=0;
end:
if (sock)
msp_close(sock);
listener=NULL;
if (mdp_sock.poll.fd>=0){
msp_close_all(mdp_sock.poll.fd);
if (mdp_sock.poll.events){
unwatch(&mdp_sock);
INFOF("Unwatching mdp socket");

View File

@ -40,21 +40,23 @@ setup_hello() {
set log.console.level DEBUG \
set log.console.show_time on
start_servald_instances +A +B
set_instance +A
}
test_hello() {
fork executeOk_servald --timeout=5 msp listen 512 <<EOF
server_hello() {
executeOk_servald --timeout=5 msp listen 512 <<EOF
Hello from the server
EOF
sleep 1
assertStdoutGrep --matches=1 "^Hello from the client$"
}
test_hello() {
set_instance +A
fork server_hello
wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log"
set_instance +B
executeOk_servald --timeout=7 msp connect $SIDA 512 <<EOF
Hello from the client
EOF
tfw_cat --stdout --stderr
assertStdoutGrep --matches=1 "^Hello from the server$"
fork_wait_all
}
runTests "$@"