Call msp handler on a timer to prevent stalling

This commit is contained in:
Jeremy Lakeman 2014-05-22 14:21:09 +09:30
parent 885cf56c95
commit 506b6ed57c
2 changed files with 75 additions and 56 deletions

View File

@ -33,6 +33,7 @@
#define FLAG_FIRST (1<<2)
#define FLAG_STOP (1<<3)
#define RETRANSMIT_TIME 1500
#define HANDLER_KEEPALIVE 1000
struct msp_packet{
struct msp_packet *_next;
@ -47,7 +48,7 @@ struct msp_packet{
#define MAX_WINDOW_SIZE 4
struct msp_window{
int packet_count;
unsigned packet_count;
uint32_t base_rtt;
uint32_t rtt;
uint16_t next_seq; // seq of next expected TX or RX packet.
@ -62,6 +63,7 @@ struct msp_sock{
int mdp_sock;
msp_state_t state;
msp_state_t last_state;
time_ms_t last_handler;
struct msp_window tx;
struct msp_window rx;
uint16_t previous_ack;
@ -121,6 +123,7 @@ MSP_SOCKET msp_socket(int mdp_sock, int flags)
sock->mdp_sock = mdp_sock;
sock->state = MSP_STATE_UNINITIALISED;
sock->last_state = 0xFFFF;
sock->last_handler = TIME_MS_NEVER_HAS;
// TODO set base rtt to ensure that we send the first packet a few times before giving up
sock->tx.base_rtt = sock->tx.rtt = 0xFFFFFFFF;
sock->tx.last_activity = TIME_MS_NEVER_HAS;
@ -232,15 +235,18 @@ static void free_acked_packets(struct msp_window *window, uint16_t seq)
{
if (!window->_head)
return;
struct msp_packet *p = window->_head;
uint32_t rtt=0;
uint32_t rtt=0xFFFFFFFF, rtt_max=0;
time_ms_t now = gettime_ms();
while(p && compare_wrapped_uint16(p->seq, seq)<=0){
if (p->sent)
rtt = now - p->sent;
if (p->sent!=TIME_MS_NEVER_HAS){
uint32_t this_rtt=now - p->sent;
if (rtt > this_rtt)
rtt = this_rtt;
if (rtt_max < this_rtt)
rtt_max = this_rtt;
}
struct msp_packet *free_me=p;
p=p->_next;
if (free_me->payload)
@ -249,19 +255,34 @@ static void free_acked_packets(struct msp_window *window, uint16_t seq)
window->packet_count--;
}
window->_head = p;
if (rtt){
if (rtt!=0xFFFFFFFF){
if (rtt < 10)
rtt=10;
window->rtt = rtt;
if (window->base_rtt > rtt)
window->base_rtt = rtt;
if (config.debug.msp)
DEBUGF("RTT %u, base %u", rtt, window->base_rtt);
DEBUGF("ACK %x, RTT %u-%u, base %u", seq, rtt, rtt_max, window->base_rtt);
}
if (!p)
window->_tail = NULL;
}
// call the handler if we need to
static size_t call_handler(struct msp_sock *sock, const uint8_t *payload, size_t len)
{
// no handler? just consume everything
size_t nconsumed = len;
time_ms_t now = gettime_ms();
if (sock->handler && (len || sock->last_state != sock->state || now - sock->last_handler > HANDLER_KEEPALIVE)) {
sock->last_state = sock->state;
sock->last_handler = now;
nconsumed = sock->handler(sock_to_handle(sock), sock->state, payload, len, sock->context);
assert(nconsumed <= len);
}
return nconsumed;
}
static void msp_free(struct msp_sock *sock)
{
sock->state |= MSP_STATE_CLOSED;
@ -277,10 +298,7 @@ static void msp_free(struct msp_sock *sock)
free_all_packets(&sock->rx);
// one last chance for clients to free other resources
if (sock->handler && sock->last_state != sock->state) {
size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context);
assert(nconsumed == 0);
}
call_handler(sock, NULL, 0);
sock->salt = SALT_INVALID; // invalidate all handles that point here
free(sock);
}
@ -290,7 +308,10 @@ void msp_stop(MSP_SOCKET handle)
struct msp_sock *sock = handle_to_sock(&handle);
if (sock->state & MSP_STATE_STOPPED)
return;
sock->state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED;
sock->state &= ~MSP_STATE_DATAOUT;
// if this a connectable socket, send a stop packet
if (sock->header.remote.port && !(sock->state & MSP_STATE_LISTENING)){
uint8_t response = FLAG_STOP;
@ -408,6 +429,7 @@ static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags, co
packet->flags = flags;
packet->len = len;
packet->offset = 0;
packet->sent = TIME_MS_NEVER_HAS;
if (payload && len){
uint8_t *p = emalloc(len);
@ -561,7 +583,7 @@ int msp_shutdown(MSP_SOCKET handle)
struct msp_sock *sock = handle_to_sock(&handle);
assert(!(sock->state&MSP_STATE_LISTENING));
assert(!(sock->state&MSP_STATE_SHUTDOWN_LOCAL));
if (sock->tx._tail && sock->tx._tail->sent==0){
if (sock->tx._tail && sock->tx._tail->sent==TIME_MS_NEVER_HAS){
sock->tx._tail->flags |= FLAG_SHUTDOWN;
}else{
if (add_packet(&sock->tx, sock->tx.next_seq, FLAG_SHUTDOWN, NULL, 0)==-1)
@ -602,42 +624,45 @@ static int process_sock(struct msp_sock *sock)
if (packet->flags & FLAG_SHUTDOWN)
sock->state|=MSP_STATE_SHUTDOWN_REMOTE;
if (sock->handler){
if (packet->len)
assert(packet->offset < packet->len);
else
assert(packet->offset == 0);
size_t len = packet->len - packet->offset;
msp_state_t state = sock->state;
size_t nconsumed = sock->handler(sock_to_handle(sock), state, packet->payload, len, sock->context);
assert(nconsumed <= len);
sock->last_state = state;
// stop calling the handler if nothing was consumed
if (nconsumed == 0 && len != 0)
break;
packet->offset += nconsumed;
// keep the packet if the handler has not consumed it all
if (packet->offset < packet->len)
continue;
assert(packet->offset == packet->len);
}
// no handler? just discard any data
assert(packet->offset <= packet->len);
size_t nconsumed = call_handler(sock, packet->payload + packet->offset, packet->len - packet->offset);
// stop calling the handler if nothing was consumed
// TODO wait for the library to call back deliberately?
if (nconsumed == 0 && packet->len > packet->offset)
break;
packet->offset += nconsumed;
// keep the packet if the handler has not consumed it all, let the handler try again
if (packet->offset < packet->len)
continue;
assert(packet->offset == packet->len);
p=p->_next;
sock->rx.next_seq++;
}
free_acked_packets(&sock->rx, sock->rx.next_seq -1);
if (sock->handler && sock->last_state != sock->state){
msp_state_t state = sock->state;
size_t nconsumed = sock->handler(sock_to_handle(sock), state, NULL, 0, sock->context);
assert(nconsumed == 0);
sock->last_state = state;
call_handler(sock, NULL, 0);
if (sock->handler && sock->next_action > sock->last_handler + HANDLER_KEEPALIVE)
sock->next_action = sock->last_handler + HANDLER_KEEPALIVE;
unsigned count=0;
p = sock->tx._head;
while(p){
count++;
p=p->_next;
}
assert(count == sock->tx.packet_count);
if (count >= MAX_WINDOW_SIZE || (sock->state & (MSP_STATE_CLOSED|MSP_STATE_SHUTDOWN_LOCAL)))
assert(!(sock->state & MSP_STATE_DATAOUT));
else
assert(sock->state & MSP_STATE_DATAOUT);
// transmit packets that can now be sent
p = sock->tx._head;
while(p){
if (p->sent==0 || p->sent + RETRANSMIT_TIME < now){
if (p->sent + RETRANSMIT_TIME < now){
if (!sock->header.local.port){
if (sock->header.flags & MDP_FLAG_BIND)
// wait until we have heard back from the daemon with our port number before sending another packet.
@ -815,15 +840,9 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
// we might have space for more data now
if (sock->tx.packet_count < MAX_WINDOW_SIZE
&& !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)){
&& !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)
&& !(sock->state & MSP_STATE_CLOSED)){
sock->state|=MSP_STATE_DATAOUT;
// TODO Why isn't the handler getting called in the next msp_processing???
if (sock->handler && sock->last_state != sock->state){
msp_state_t state = sock->state;
ssize_t nconsumed = sock->handler(sock_to_handle(sock), state, NULL, 0, sock->context);
assert(nconsumed == 0);
sock->last_state = state;
}
}
// make sure we attempt to process packets from this sock soon

View File

@ -229,11 +229,13 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay
conn->alarm_out.poll.revents=POLLOUT;
conn->alarm_out.function(&conn->alarm_out);
}
if (conn->out->capacity < len + conn->out->limit)
return 0;
if (len > conn->out->capacity - conn->out->limit)
len = conn->out->capacity - conn->out->limit;
bcopy(payload, &conn->out->bytes[conn->out->limit], len);
conn->out->limit+=len;
if (len){
bcopy(payload, &conn->out->bytes[conn->out->limit], len);
conn->out->limit+=len;
}
conn->alarm_out.poll.events|=POLLOUT;
watch(&conn->alarm_out);
@ -247,6 +249,9 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay
conn->last_state=state;
if (state&MSP_STATE_DATAOUT)
try_send(conn);
if (state & MSP_STATE_CLOSED){
struct mdp_sockaddr remote;
msp_get_remote(sock, &remote);
@ -261,13 +266,8 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay
// gracefully close now if we have no pending data
free_connection(conn);
}
assert(len == 0);
return 0;
}
if (state&MSP_STATE_DATAOUT)
try_send(conn);
return len;
}