From 0563879707efe7c4e3d010a15cfdd144c31057b5 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Tue, 15 Mar 2016 10:48:10 +1030 Subject: [PATCH] Split common msp code into a reusable header --- headerfiles.mk | 1 + msp_client.c | 454 +++++++++++-------------------------------------- msp_client.h | 3 + msp_common.h | 320 ++++++++++++++++++++++++++++++++++ 4 files changed, 425 insertions(+), 353 deletions(-) create mode 100644 msp_common.h diff --git a/headerfiles.mk b/headerfiles.mk index a6722ba4..8d851ce1 100644 --- a/headerfiles.mk +++ b/headerfiles.mk @@ -42,5 +42,6 @@ HDRS= fifo.h \ monitor-client.h \ mdp_client.h \ msp_client.h \ + msp_common.h \ radio_link.h \ sqlite-amalgamation-3100200/sqlite3.h diff --git a/msp_client.c b/msp_client.c index 58fc8ac4..2ab1df11 100644 --- a/msp_client.c +++ b/msp_client.c @@ -31,55 +31,28 @@ #include "log.h" #include "debug.h" -#define FLAG_SHUTDOWN (1<<0) -#define FLAG_ACK (1<<1) -#define FLAG_FIRST (1<<2) -#define FLAG_STOP (1<<3) -#define RETRANSMIT_TIME 1500 -#define HANDLER_KEEPALIVE 1000 -struct msp_packet{ - struct msp_packet *_next; - uint16_t seq; - uint8_t flags; - time_ms_t added; - time_ms_t sent; - const uint8_t *payload; - size_t len; - size_t offset; -}; +#include "msp_common.h" -#define MAX_WINDOW_SIZE 4 -struct msp_window{ - unsigned packet_count; - uint32_t base_rtt; - uint32_t rtt; - uint16_t next_seq; // seq of next expected TX or RX packet. - time_ms_t last_activity; - struct msp_packet *_head, *_tail; -}; struct msp_sock{ struct msp_sock *_next; struct msp_sock *_prev; + struct msp_stream stream; + msp_state_t last_state; unsigned salt; int mdp_sock; - msp_state_t state; - msp_state_t last_state; time_ms_t last_handler; - struct msp_window tx; - struct msp_window rx; - uint16_t previous_ack; - time_ms_t next_ack; MSP_HANDLER *handler; void *context; struct mdp_header header; - time_ms_t timeout; - time_ms_t next_action; }; #define SALT_INVALID 0xdeadbeef + + + int msp_socket_is_valid(MSP_SOCKET handle) { // TODO Set up temporary SIGSEGV and SIGBUS handlers in case handle.ptr points to unmapped memory @@ -124,16 +97,9 @@ MSP_SOCKET msp_socket(int mdp_sock, int flags) ++salt_counter; sock->salt = salt_counter; sock->mdp_sock = mdp_sock; - sock->state = MSP_STATE_UNINITIALISED; + msp_stream_init(&sock->stream); sock->last_state = 0xFFFF; sock->last_handler = TIME_MS_NEVER_HAS; - // TODO set base rtt to ensure that we send the first packet a few times before giving up - sock->tx.base_rtt = sock->tx.rtt = 0xFFFFFFFF; - sock->tx.last_activity = TIME_MS_NEVER_HAS; - sock->rx.last_activity = TIME_MS_NEVER_HAS; - sock->next_action = TIME_MS_NEVER_WILL; - sock->timeout = gettime_ms() + 10000; - sock->previous_ack = 0x7FFF; sock->_next = root; if (root) root->_prev = sock; @@ -143,7 +109,7 @@ MSP_SOCKET msp_socket(int mdp_sock, int flags) msp_state_t msp_get_state(MSP_SOCKET handle) { - return handle_to_sock(&handle)->state; + return handle_to_sock(&handle)->stream.state; } int msp_socket_is_initialising(MSP_SOCKET handle) @@ -156,7 +122,7 @@ int msp_socket_is_open(MSP_SOCKET handle) if (!msp_socket_is_valid(handle)) return 0; msp_state_t state = msp_get_state(handle); - return (state != MSP_STATE_UNINITIALISED || handle_to_sock(&handle)->tx.packet_count != 0) + return (state != MSP_STATE_UNINITIALISED || handle_to_sock(&handle)->stream.tx.packet_count != 0) && !(state & MSP_STATE_CLOSED); } @@ -174,7 +140,7 @@ int msp_socket_is_listening(MSP_SOCKET handle) int msp_socket_is_data(MSP_SOCKET handle) { return msp_socket_is_valid(handle) - && ((msp_get_state(handle) & MSP_STATE_DATAOUT) || handle_to_sock(&handle)->tx.packet_count != 0); + && ((msp_get_state(handle) & MSP_STATE_DATAOUT) || handle_to_sock(&handle)->stream.tx.packet_count != 0); } int msp_socket_is_connected(MSP_SOCKET handle) @@ -212,79 +178,29 @@ void msp_debug() DEBUGF(msp, "Msp sockets;"); while(p){ DEBUGF(msp, "State %d, from %s:%d to %s:%d, next %"PRId64"ms, ack %"PRId64"ms timeout %"PRId64"ms", - p->state, + p->stream.state, alloca_tohex_sid_t(p->header.local.sid), p->header.local.port, alloca_tohex_sid_t(p->header.remote.sid), p->header.remote.port, - (p->next_action - now), - (p->next_ack - now), - (p->timeout - now)); + (p->stream.next_action - now), + (p->stream.next_ack - now), + (p->stream.timeout - now)); p=p->_next; } } } -static void free_all_packets(struct msp_window *window) -{ - struct msp_packet *p = window->_head; - while(p){ - struct msp_packet *free_me=p; - p=p->_next; - if (free_me->payload) - free((void *)free_me->payload); - free(free_me); - } - window->_head = NULL; - window->packet_count=0; -} - -static void free_acked_packets(struct msp_window *window, uint16_t seq) -{ - if (!window->_head) - return; - struct msp_packet *p = window->_head; - uint32_t rtt=0xFFFFFFFF, rtt_max=0; - time_ms_t now = gettime_ms(); - - while(p && compare_wrapped_uint16(p->seq, seq)<=0){ - if (p->sent!=TIME_MS_NEVER_HAS){ - uint32_t this_rtt=now - p->sent; - if (rtt > this_rtt) - rtt = this_rtt; - if (rtt_max < this_rtt) - rtt_max = this_rtt; - } - struct msp_packet *free_me=p; - p=p->_next; - if (free_me->payload) - free((void *)free_me->payload); - free(free_me); - window->packet_count--; - } - window->_head = p; - if (rtt!=0xFFFFFFFF){ - if (rtt < 10) - rtt=10; - window->rtt = rtt; - if (window->base_rtt > rtt) - window->base_rtt = rtt; - DEBUGF(msp, "ACK %x, RTT %u-%u, base %u", seq, rtt, rtt_max, window->base_rtt); - } - if (!p) - window->_tail = NULL; -} - // call the handler if we need to static size_t call_handler(struct msp_sock *sock, const uint8_t *payload, size_t len) { // no handler? just consume everything size_t nconsumed = len; time_ms_t now = gettime_ms(); - if (sock->handler && (len || sock->last_state != sock->state || now - sock->last_handler > HANDLER_KEEPALIVE)) { + if (sock->handler && (len || sock->last_state != sock->stream.state || now - sock->last_handler > HANDLER_KEEPALIVE)) { // remember what we are about to call, rather than what we just called // we don't want to miss a state change due to re-entrancy. - sock->last_state = sock->state; + sock->last_state = sock->stream.state; sock->last_handler = now; - nconsumed = sock->handler(sock_to_handle(sock), sock->state, payload, len, sock->context); + nconsumed = sock->handler(sock_to_handle(sock), sock->stream.state, payload, len, sock->context); assert(nconsumed <= len); } return nconsumed; @@ -292,7 +208,7 @@ static size_t call_handler(struct msp_sock *sock, const uint8_t *payload, size_t static void msp_free(struct msp_sock *sock) { - sock->state |= MSP_STATE_CLOSED; + sock->stream.state |= MSP_STATE_CLOSED; // remove from the list first if (sock->_prev) sock->_prev->_next = sock->_next; @@ -301,8 +217,8 @@ static void msp_free(struct msp_sock *sock) if (sock->_next) sock->_next->_prev = sock->_prev; - free_all_packets(&sock->tx); - free_all_packets(&sock->rx); + free_all_packets(&sock->stream.tx); + free_all_packets(&sock->stream.rx); // one last chance for clients to free other resources call_handler(sock, NULL, 0); @@ -313,14 +229,15 @@ static void msp_free(struct msp_sock *sock) void msp_stop(MSP_SOCKET handle) { struct msp_sock *sock = handle_to_sock(&handle); - if (sock->state & MSP_STATE_STOPPED) + if (sock->stream.state & MSP_STATE_STOPPED) return; - sock->state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED; - sock->state &= ~MSP_STATE_DATAOUT; + sock->stream.state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED; + sock->stream.state &= ~MSP_STATE_DATAOUT; + free_all_packets(&sock->stream.tx); // if this a connectable socket, send a stop packet - if (sock->header.remote.port && !(sock->state & MSP_STATE_LISTENING)){ + if (sock->header.remote.port && !(sock->stream.state & MSP_STATE_LISTENING)){ uint8_t response = FLAG_STOP; // we don't have a matching socket, reply with STOP flag to force breaking the connection // TODO global rate limit? @@ -350,37 +267,37 @@ void msp_set_handler(MSP_SOCKET handle, MSP_HANDLER *handler, void *context) void msp_set_local(MSP_SOCKET handle, const struct mdp_sockaddr *local) { struct msp_sock *sock = handle_to_sock(&handle); - assert(sock->state == MSP_STATE_UNINITIALISED); + assert(sock->stream.state == MSP_STATE_UNINITIALISED); sock->header.local = *local; } void msp_connect(MSP_SOCKET handle, const struct mdp_sockaddr *remote) { struct msp_sock *sock = handle_to_sock(&handle); - assert(sock->state == MSP_STATE_UNINITIALISED); + assert(sock->stream.state == MSP_STATE_UNINITIALISED); sock->header.remote = *remote; - sock->state|=MSP_STATE_DATAOUT; + sock->stream.state|=MSP_STATE_DATAOUT; // make sure we send a packet soon - sock->next_ack = gettime_ms()+10; - sock->next_action = sock->next_ack; + sock->stream.next_ack = gettime_ms()+10; + sock->stream.next_action = sock->stream.next_ack; } int msp_listen(MSP_SOCKET handle) { struct msp_sock *sock = handle_to_sock(&handle); - assert(sock->state == MSP_STATE_UNINITIALISED); + assert(sock->stream.state == MSP_STATE_UNINITIALISED); assert(sock->header.local.port); - sock->state |= MSP_STATE_LISTENING; + sock->stream.state |= MSP_STATE_LISTENING; sock->header.flags |= MDP_FLAG_BIND; if (mdp_send(sock->mdp_sock, &sock->header, NULL, 0)==-1){ - sock->state|=MSP_STATE_ERROR|MSP_STATE_CLOSED; + sock->stream.state|=MSP_STATE_ERROR|MSP_STATE_CLOSED; return -1; } - - sock->timeout = gettime_ms()+1000; - sock->next_action = sock->timeout; + // allow 1s for the local serval daemon to respond + sock->stream.timeout = gettime_ms()+1000; + sock->stream.next_action = sock->stream.timeout; return 0; } @@ -394,65 +311,6 @@ void msp_get_remote(MSP_SOCKET handle, struct mdp_sockaddr *remote) *remote = handle_to_sock(&handle)->header.remote; } -static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags, const uint8_t *payload, size_t len) -{ - - struct msp_packet **insert_pos=NULL; - - if (!window->_head){ - insert_pos = &window->_head; - }else{ - if (window->_tail->seq == seq){ - // ignore duplicate packets - DEBUGF(msp, "Ignore duplicate packet %02x", seq); - return 0; - }else if (compare_wrapped_uint16(window->_tail->seq, seq)<0){ - if (compare_wrapped_uint16(window->_head->seq, seq)>0){ - // this is ambiguous - return WHYF("%04x is both < tail (%04x) and > head (%04x)", seq, window->_tail->seq, window->_head->seq); - } - insert_pos = &window->_tail->_next; - }else{ - insert_pos = &window->_head; - while(compare_wrapped_uint16((*insert_pos)->seq, seq)<0) - insert_pos = &(*insert_pos)->_next; - if ((*insert_pos)->seq == seq){ - // ignore duplicate packets - DEBUGF(msp, "Ignore duplicate packet %02x", seq); - return 0; - } - } - } - - struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet)); - if (!packet) - return -1; - - packet->_next = (*insert_pos); - *insert_pos = packet; - if (!packet->_next) - window->_tail = packet; - packet->added = gettime_ms(); - packet->seq = seq; - packet->flags = flags; - packet->len = len; - packet->offset = 0; - packet->sent = TIME_MS_NEVER_HAS; - - if (payload && len){ - uint8_t *p = emalloc(len); - if (!p){ - free(packet); - return -1; - } - packet->payload = p; - bcopy(payload, p, len); - } - window->packet_count++; - DEBUGF(msp, "Add packet %02x", seq); - return 1; -} - struct socket_address daemon_addr={.addrlen=0,}; static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet) @@ -465,18 +323,7 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet) uint8_t msp_header[MSP_PAYLOAD_PREAMBLE_SIZE]; - msp_header[0]=packet->flags; - - // only set the ack flag if we've received a sequenced packet - if (sock->state & MSP_STATE_RECEIVED_DATA) - msp_header[0]|=FLAG_ACK; - // never received anything? set the connect flag - if (!(sock->state & MSP_STATE_RECEIVED_PACKET)) - msp_header[0]|=FLAG_FIRST; - - write_uint16(&msp_header[1], sock->rx.next_seq -1); - write_uint16(&msp_header[3], packet->seq); - sock->previous_ack = sock->rx.next_seq -1; + msp_write_preamble(msp_header, &sock->stream, packet); struct fragmented_data data={ .fragment_count=3, @@ -497,7 +344,7 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet) }; // allow for sending an empty payload body - if (!(packet->payload && packet->len)) + if (!packet->len) data.fragment_count --; ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data); @@ -507,9 +354,6 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet) msp_close_all(sock->mdp_sock); return -1; } - DEBUGF(msp, "Sent packet flags %02x seq %02x len %zd (acked %02x)", msp_header[0], packet->seq, packet->len, sock->rx.next_seq -1); - sock->tx.last_activity = packet->sent = gettime_ms(); - sock->next_ack = packet->sent + RETRANSMIT_TIME; return 0; } @@ -522,18 +366,7 @@ static int send_ack(struct msp_sock *sock) } uint8_t msp_header[3]; - - msp_header[0]=0; - // if we haven't heard a sequence number, we can't ack data - // (but we can indicate the existence of the connection) - if (sock->state & MSP_STATE_RECEIVED_DATA) - msp_header[0]|=FLAG_ACK; - - // never received anything? set the connect flag - if (!(sock->state & MSP_STATE_RECEIVED_PACKET)) - msp_header[0]|=FLAG_FIRST; - - write_uint16(&msp_header[1], sock->rx.next_seq -1); + msp_write_ack_header(msp_header, &sock->stream); struct fragmented_data data={ .fragment_count=2, @@ -555,10 +388,6 @@ static int send_ack(struct msp_sock *sock) msp_close_all(sock->mdp_sock); return -1; } - DEBUGF(msp, "Sent packet flags %02x (acked %02x)", msp_header[0], sock->rx.next_seq -1); - sock->previous_ack = sock->rx.next_seq -1; - sock->tx.last_activity = gettime_ms(); - sock->next_ack = sock->tx.last_activity + RETRANSMIT_TIME; return 0; } @@ -566,41 +395,15 @@ static int send_ack(struct msp_sock *sock) ssize_t msp_send(MSP_SOCKET handle, const uint8_t *payload, size_t len) { struct msp_sock *sock = handle_to_sock(&handle); - assert(!(sock->state&MSP_STATE_LISTENING)); assert(sock->header.remote.port); - assert((sock->state & MSP_STATE_SHUTDOWN_LOCAL)==0); - if ((sock->state & MSP_STATE_CLOSED) || sock->tx.packet_count > MAX_WINDOW_SIZE) - return -1; - if (add_packet(&sock->tx, sock->tx.next_seq, 0, payload, len)==-1) - return -1; - - sock->tx.next_seq++; - if (sock->tx.packet_count>=MAX_WINDOW_SIZE) - sock->state&=~MSP_STATE_DATAOUT; - // make sure we attempt to process packets from this sock soon - // TODO calculate based on congestion window - sock->next_action = gettime_ms(); - - return len; + return msp_stream_send(&sock->stream, payload, len); } int msp_shutdown(MSP_SOCKET handle) { struct msp_sock *sock = handle_to_sock(&handle); - assert(!(sock->state&MSP_STATE_LISTENING)); - assert(!(sock->state&MSP_STATE_SHUTDOWN_LOCAL)); - if (sock->tx._tail && sock->tx._tail->sent==TIME_MS_NEVER_HAS){ - sock->tx._tail->flags |= FLAG_SHUTDOWN; - }else{ - if (add_packet(&sock->tx, sock->tx.next_seq, FLAG_SHUTDOWN, NULL, 0)==-1) - return -1; - sock->tx.next_seq++; - } - sock->state|=MSP_STATE_SHUTDOWN_LOCAL; - sock->state&=~MSP_STATE_DATAOUT; - // make sure we send a packet soon - sock->next_action = gettime_ms(); + msp_stream_shutdown(&sock->stream); return 0; } @@ -618,32 +421,16 @@ static int pending_bind(int fd) static int process_sock(struct msp_sock *sock) { - time_ms_t now = gettime_ms(); - - if (sock->timeout < now){ - sock->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR); - return WHY("MSP socket timed out"); - } - - sock->next_action = sock->timeout; - - if (sock->state & MSP_STATE_LISTENING) - return 0; - - struct msp_packet *p; + int r = msp_stream_process(&sock->stream); + if (r!=0) + return r; // deliver packets that have now arrived in order - p = sock->rx._head; - - // TODO ... ? (sock->state & MSP_STATE_POLLIN) - while(p && p->seq == sock->rx.next_seq){ - struct msp_packet *packet=p; - - // process packet flags when we are about to deliver the last packet - if (packet->flags & FLAG_SHUTDOWN) - sock->state|=MSP_STATE_SHUTDOWN_REMOTE; - - assert(packet->offset <= packet->len); + while(1){ + struct msp_packet *packet = msp_stream_next(&sock->stream); + if (!packet) + break; + size_t nconsumed = call_handler(sock, packet->payload + packet->offset, packet->len - packet->offset); // stop calling the handler if nothing was consumed @@ -651,35 +438,32 @@ static int process_sock(struct msp_sock *sock) if (nconsumed == 0 && packet->len > packet->offset) break; - packet->offset += nconsumed; - // keep the packet if the handler has not consumed it all, let the handler try again - if (packet->offset < packet->len) - continue; - assert(packet->offset == packet->len); - - p=p->_next; - sock->rx.next_seq++; + msp_consume_packet(&sock->stream, packet, nconsumed); } - free_acked_packets(&sock->rx, sock->rx.next_seq -1); call_handler(sock, NULL, 0); - if (sock->handler && sock->next_action > sock->last_handler + HANDLER_KEEPALIVE) - sock->next_action = sock->last_handler + HANDLER_KEEPALIVE; - unsigned count=0; - p = sock->tx._head; + if (sock->handler && sock->stream.next_action > sock->last_handler + HANDLER_KEEPALIVE) + sock->stream.next_action = sock->last_handler + HANDLER_KEEPALIVE; + + unsigned count = 0; + + // do we need this? + struct msp_packet *p = sock->stream.tx._head; while(p){ count++; p=p->_next; } - assert(count == sock->tx.packet_count); + assert(count == sock->stream.tx.packet_count); - if (count >= MAX_WINDOW_SIZE || (sock->state & (MSP_STATE_CLOSED|MSP_STATE_SHUTDOWN_LOCAL))) - assert(!(sock->state & MSP_STATE_DATAOUT)); + if (count >= MAX_WINDOW_SIZE || (sock->stream.state & (MSP_STATE_CLOSED|MSP_STATE_SHUTDOWN_LOCAL))) + assert(!(sock->stream.state & MSP_STATE_DATAOUT)); else - assert(sock->state & MSP_STATE_DATAOUT); - + assert(sock->stream.state & MSP_STATE_DATAOUT); + + // transmit packets that can now be sent - p = sock->tx._head; + time_ms_t now = gettime_ms(); + p = sock->stream.tx._head; while(p){ if (p->sent + RETRANSMIT_TIME < now){ if (!sock->header.local.port){ @@ -694,13 +478,13 @@ static int process_sock(struct msp_sock *sock) if (r) break; } - if (sock->next_action > p->sent + RETRANSMIT_TIME) - sock->next_action = p->sent + RETRANSMIT_TIME; + if (sock->stream.next_action > p->sent + RETRANSMIT_TIME) + sock->stream.next_action = p->sent + RETRANSMIT_TIME; p=p->_next; } // should we send an ack now without sending a payload? - if (now > sock->next_ack){ + if (now > sock->stream.next_ack){ if (!sock->header.local.port){ if (sock->header.flags & MDP_FLAG_BIND) // wait until we have heard back from the daemon with our port number before sending another packet. @@ -712,18 +496,18 @@ static int process_sock(struct msp_sock *sock) return -1; } - if (sock->next_action > sock->next_ack) - sock->next_action = sock->next_ack; + if (sock->stream.next_action > sock->stream.next_ack) + sock->stream.next_action = sock->stream.next_ack; // when we've delivered all local packets // and all our data packets have been acked, close. - if ( (sock->state & MSP_STATE_SHUTDOWN_LOCAL) - && (sock->state & MSP_STATE_SHUTDOWN_REMOTE) - && sock->tx.packet_count == 0 - && sock->rx.packet_count == 0 - && sock->previous_ack == sock->rx.next_seq -1 + if ( (sock->stream.state & MSP_STATE_SHUTDOWN_LOCAL) + && (sock->stream.state & MSP_STATE_SHUTDOWN_REMOTE) + && sock->stream.tx.packet_count == 0 + && sock->stream.rx.packet_count == 0 + && sock->stream.previous_ack == sock->stream.rx.next_seq -1 ){ - sock->state |= MSP_STATE_CLOSED; + sock->stream.state |= MSP_STATE_CLOSED; return -1; } @@ -769,14 +553,14 @@ int msp_processing(time_ms_t *next_action) time_ms_t next=TIME_MS_NEVER_WILL; sock = root; while(sock){ - if (sock->state & MSP_STATE_CLOSED){ + if (sock->stream.state & MSP_STATE_CLOSED){ struct msp_sock *s = sock->_next; msp_release(sock); msp_free(sock); sock=s; }else{ - if (sock->next_action < next) - next=sock->next_action; + if (sock->stream.next_action < next) + next=sock->stream.next_action; sock = sock->_next; } } @@ -792,7 +576,24 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t return -1; } - uint8_t flags=0; + if (header->flags & MDP_FLAG_BIND){ + // process bind response from the daemon + struct msp_sock *s=root; + while(s){ + if (s->mdp_sock == mdp_sock && (s->header.flags & MDP_FLAG_BIND)){ + s->header.local = header->local; + s->header.flags &= ~MDP_FLAG_BIND; + DEBUGF(msp, "Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port); + if (s->stream.state & MSP_STATE_LISTENING) + s->stream.next_action = s->stream.timeout = TIME_MS_NEVER_WILL; + else + s->stream.next_action = gettime_ms(); + return 0; + } + s = s->_next; + } + return -1; + } // find or create mdp_sock... struct msp_sock *sock=NULL; @@ -801,20 +602,7 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t struct msp_sock *listen=NULL; while(s){ if (s->mdp_sock == mdp_sock ){ - - if ((s->header.flags & MDP_FLAG_BIND) && (header->flags & MDP_FLAG_BIND)){ - // process bind response from the daemon - s->header.local = header->local; - s->header.flags &= ~MDP_FLAG_BIND; - DEBUGF(msp, "Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port); - if (s->state & MSP_STATE_LISTENING) - s->next_action = s->timeout = TIME_MS_NEVER_WILL; - else - s->next_action = gettime_ms(); - return 0; - } - - if (s->state & MSP_STATE_LISTENING){ + if (s->stream.state & MSP_STATE_LISTENING){ // remember any matching listen socket so we can create a connection on first use if (s->header.local.port == header->local.port && (is_sid_t_any(s->header.local.sid) @@ -832,10 +620,10 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t if (len<1) return WHY("Expected at least 1 byte"); - flags = payload[0]; + uint8_t flags = payload[0]; // ignore any stop packet if we have no matching connection - if (!sock && flags & FLAG_STOP){ + if (!sock && (flags & FLAG_STOP)){ DEBUGF(msp, "Ignoring STOP packet, no matching connection"); return 0; } @@ -863,47 +651,7 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t } } - sock->rx.last_activity = gettime_ms(); - sock->timeout = sock->rx.last_activity + 10000; - sock->state |= MSP_STATE_RECEIVED_PACKET; - - if (flags & FLAG_STOP){ - DEBUGF(msp, "Closing socket due to STOP packet"); - msp_stop(sock_to_handle(sock)); - return 0; - } - - if (len<3) - return 0; - - if (flags & FLAG_ACK){ - uint16_t ack_seq = read_uint16(&payload[1]); - // release acknowledged packets - free_acked_packets(&sock->tx, ack_seq); - - // TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet - } - - // we might have space for more data now - if (sock->tx.packet_count < MAX_WINDOW_SIZE - && !(sock->state & MSP_STATE_SHUTDOWN_LOCAL) - && !(sock->state & MSP_STATE_CLOSED)){ - sock->state|=MSP_STATE_DATAOUT; - } - - // make sure we attempt to process packets from this sock soon - // TODO calculate based on congestion window - sock->next_action = gettime_ms(); - - if (lenstate |= MSP_STATE_RECEIVED_DATA; - uint16_t seq = read_uint16(&payload[3]); - - if (add_packet(&sock->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1) - sock->next_ack = gettime_ms(); - return 0; + return msp_process_packet(&sock->stream, payload, len); } int msp_recv(int mdp_sock) diff --git a/msp_client.h b/msp_client.h index f160c046..3f2c5b4d 100644 --- a/msp_client.h +++ b/msp_client.h @@ -65,6 +65,9 @@ msp_state_t msp_get_state(MSP_SOCKET sock); #define MSP_STATE_DATAOUT ((msp_state_t) (1<<7)) #define MSP_STATE_STOPPED ((msp_state_t) (1<<8)) +// stream timeout +#define MSP_TIMEOUT 10000 + int msp_socket_is_initialising(MSP_SOCKET); int msp_socket_is_open(MSP_SOCKET); int msp_socket_is_closed(MSP_SOCKET); diff --git a/msp_common.h b/msp_common.h new file mode 100644 index 00000000..19a4b94d --- /dev/null +++ b/msp_common.h @@ -0,0 +1,320 @@ +#ifndef __SERVAL_DNA__MSP_COMMON_H +#define __SERVAL_DNA__MSP_COMMON_H + + +#define FLAG_SHUTDOWN (1<<0) +#define FLAG_ACK (1<<1) +#define FLAG_FIRST (1<<2) +#define FLAG_STOP (1<<3) +#define RETRANSMIT_TIME 1500 +#define HANDLER_KEEPALIVE 1000 + +struct msp_packet{ + struct msp_packet *_next; + uint16_t seq; + uint8_t flags; + time_ms_t added; + time_ms_t sent; + size_t len; + size_t offset; + uint8_t payload[]; +}; + +#define MAX_WINDOW_SIZE 4 +struct msp_window{ + unsigned packet_count; + uint32_t base_rtt; + uint32_t rtt; + uint16_t next_seq; // seq of next expected TX or RX packet. + time_ms_t last_activity; + struct msp_packet *_head, *_tail; +}; + +struct msp_stream{ + msp_state_t state; + struct msp_window tx; + struct msp_window rx; + uint16_t previous_ack; + time_ms_t next_ack; + time_ms_t timeout; + time_ms_t next_action; +}; + +static void msp_stream_init(struct msp_stream *stream) +{ + stream->state = MSP_STATE_UNINITIALISED; + // TODO set base rtt to ensure that we send the first packet a few times before giving up + stream->tx.base_rtt = stream->tx.rtt = 0xFFFFFFFF; + stream->tx.last_activity = TIME_MS_NEVER_HAS; + stream->rx.last_activity = TIME_MS_NEVER_HAS; + stream->next_action = TIME_MS_NEVER_WILL; + stream->timeout = gettime_ms() + 10000; + stream->previous_ack = 0x7FFF; +} + +static void free_all_packets(struct msp_window *window) +{ + struct msp_packet *p = window->_head; + while(p){ + struct msp_packet *free_me=p; + p=p->_next; + free(free_me); + } + window->_head = NULL; + window->packet_count=0; +} + +static void free_acked_packets(struct msp_window *window, uint16_t seq) +{ + if (!window->_head) + return; + struct msp_packet *p = window->_head; + uint32_t rtt=0xFFFFFFFF, rtt_max=0; + time_ms_t now = gettime_ms(); + + while(p && compare_wrapped_uint16(p->seq, seq)<=0){ + if (p->sent!=TIME_MS_NEVER_HAS){ + uint32_t this_rtt=now - p->sent; + if (rtt > this_rtt) + rtt = this_rtt; + if (rtt_max < this_rtt) + rtt_max = this_rtt; + } + struct msp_packet *free_me=p; + p=p->_next; + free(free_me); + window->packet_count--; + } + window->_head = p; + if (rtt!=0xFFFFFFFF){ + if (rtt < 10) + rtt=10; + window->rtt = rtt; + if (window->base_rtt > rtt) + window->base_rtt = rtt; + DEBUGF(msp, "ACK %x, RTT %u-%u, base %u", seq, rtt, rtt_max, window->base_rtt); + } + if (!p) + window->_tail = NULL; +} + +static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags, const uint8_t *payload, size_t len) +{ + assert(payload || len==0); + struct msp_packet **insert_pos=NULL; + + if (!window->_head){ + insert_pos = &window->_head; + }else{ + if (window->_tail->seq == seq){ + // ignore duplicate packets + DEBUGF(msp, "Ignore duplicate packet %02x", seq); + return 0; + }else if (compare_wrapped_uint16(window->_tail->seq, seq)<0){ + if (compare_wrapped_uint16(window->_head->seq, seq)>0){ + // this is ambiguous + return WHYF("%04x is both < tail (%04x) and > head (%04x)", seq, window->_tail->seq, window->_head->seq); + } + insert_pos = &window->_tail->_next; + }else{ + insert_pos = &window->_head; + while(compare_wrapped_uint16((*insert_pos)->seq, seq)<0) + insert_pos = &(*insert_pos)->_next; + if ((*insert_pos)->seq == seq){ + // ignore duplicate packets + DEBUGF(msp, "Ignore duplicate packet %02x", seq); + return 0; + } + } + } + + struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet) + len); + if (!packet) + return -1; + + packet->_next = (*insert_pos); + *insert_pos = packet; + if (!packet->_next) + window->_tail = packet; + packet->added = gettime_ms(); + packet->seq = seq; + packet->flags = flags; + packet->len = len; + packet->offset = 0; + packet->sent = TIME_MS_NEVER_HAS; + + if (len) + bcopy(payload, packet->payload, len); + window->packet_count++; + DEBUGF(msp, "Add packet %02x", seq); + return 1; +} + +static size_t msp_write_ack_header(uint8_t *header, struct msp_stream *stream) +{ + header[0]=0; + // if we haven't heard a sequence number, we can't ack data + // (but we can indicate the existence of the connection) + if (stream->state & MSP_STATE_RECEIVED_DATA) + header[0]|=FLAG_ACK; + + // never received anything? set the connect flag + if (!(stream->state & MSP_STATE_RECEIVED_PACKET)) + header[0]|=FLAG_FIRST; + + write_uint16(&header[1], stream->rx.next_seq -1); + + stream->previous_ack = stream->rx.next_seq -1; + stream->tx.last_activity = gettime_ms(); + stream->next_ack = stream->tx.last_activity + RETRANSMIT_TIME; + + DEBUGF(msp, "Sending packet flags %02x (acked %02x)", + header[0], stream->rx.next_seq -1); + return 3; +} + +static size_t msp_write_preamble(uint8_t *header, struct msp_stream *stream, struct msp_packet *packet) +{ + msp_write_ack_header(header, stream); + header[0]|=packet->flags; + + write_uint16(&header[3], packet->seq); + + DEBUGF(msp, "With packet flags %02x seq %02x len %zd", + header[0], packet->seq, packet->len); + packet->sent = stream->tx.last_activity; + return 5; +} + +static ssize_t msp_stream_send(struct msp_stream *stream, const uint8_t *payload, size_t len) +{ + assert(!(stream->state & MSP_STATE_LISTENING)); + assert((stream->state & MSP_STATE_SHUTDOWN_LOCAL)==0); + + if ((stream->state & MSP_STATE_CLOSED) || stream->tx.packet_count > MAX_WINDOW_SIZE) + return -1; + if (add_packet(&stream->tx, stream->tx.next_seq, 0, payload, len)==-1) + return -1; + + stream->tx.next_seq++; + if (stream->tx.packet_count>=MAX_WINDOW_SIZE) + stream->state&=~MSP_STATE_DATAOUT; + // make sure we attempt to process packets from this sock soon + // TODO calculate based on congestion window + stream->next_action = gettime_ms(); + + return len; +} + +static int msp_stream_shutdown(struct msp_stream *stream) +{ + assert(!(stream->state&MSP_STATE_LISTENING)); + assert(!(stream->state&MSP_STATE_SHUTDOWN_LOCAL)); + if (stream->tx._tail && stream->tx._tail->sent==TIME_MS_NEVER_HAS){ + stream->tx._tail->flags |= FLAG_SHUTDOWN; + }else{ + if (add_packet(&stream->tx, stream->tx.next_seq, FLAG_SHUTDOWN, NULL, 0)==-1) + return -1; + stream->tx.next_seq++; + } + stream->state|=MSP_STATE_SHUTDOWN_LOCAL; + stream->state&=~MSP_STATE_DATAOUT; + // make sure we send a packet soon + stream->next_action = gettime_ms(); + return 0; +} + +static int msp_stream_process(struct msp_stream *stream) +{ + time_ms_t now = gettime_ms(); + if (stream->timeout < now){ + stream->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR); + return WHY("MSP socket timed out"); + } + stream->next_action = stream->timeout; + if (stream->state & MSP_STATE_LISTENING) + return 1; + return 0; +} + +// return the next in-order packet +static struct msp_packet *msp_stream_next(struct msp_stream *stream) +{ + struct msp_packet *packet = stream->rx._head; + if (!packet || packet->seq != stream->rx.next_seq) + return NULL; + + assert(packet->offset <= packet->len); + + if (packet->flags & FLAG_SHUTDOWN) + stream->state|=MSP_STATE_SHUTDOWN_REMOTE; + + return packet; +} + +static void msp_consume_packet(struct msp_stream *stream, struct msp_packet *packet, size_t consumed) +{ + assert(packet->seq==stream->rx.next_seq); + packet->offset += consumed; + if (packet->offset < packet->len) + return; + + free_acked_packets(&stream->rx, stream->rx.next_seq); + stream->rx.next_seq++; +} + +static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload, size_t len) +{ + if (len<1) + return WHY("Expected at least 1 byte"); + + uint8_t flags = payload[0]; + time_ms_t now = gettime_ms(); + stream->rx.last_activity = now; + stream->timeout = stream->rx.last_activity + MSP_TIMEOUT; + stream->state |= MSP_STATE_RECEIVED_PACKET; + + if (flags & FLAG_STOP){ + DEBUGF(msp, "Closing socket due to STOP packet"); + stream->state |= (MSP_STATE_CLOSED|MSP_STATE_STOPPED); + stream->state &= ~MSP_STATE_DATAOUT; + free_all_packets(&stream->tx); + stream->next_action = now; + return 0; + } + + if (len<3) + return 0; + + if (flags & FLAG_ACK){ + uint16_t ack_seq = read_uint16(&payload[1]); + // release acknowledged packets + free_acked_packets(&stream->tx, ack_seq); + + // TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet + } + + // Do we have space for more data now? + if (stream->tx.packet_count < MAX_WINDOW_SIZE + && !(stream->state & MSP_STATE_SHUTDOWN_LOCAL) + && !(stream->state & MSP_STATE_CLOSED)){ + stream->state|=MSP_STATE_DATAOUT; + } + + // make sure we attempt to process packets from this sock soon + // TODO calculate based on congestion window + stream->next_action = now; + + if (lenstate |= MSP_STATE_RECEIVED_DATA; + uint16_t seq = read_uint16(&payload[3]); + + if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1) + stream->next_ack = now; + + return 0; +} + +#endif \ No newline at end of file