mirror of
https://github.com/servalproject/serval-dna.git
synced 2024-12-19 21:27:57 +00:00
Split common msp code into a reusable header
This commit is contained in:
parent
9b5b82d972
commit
0563879707
@ -42,5 +42,6 @@ HDRS= fifo.h \
|
||||
monitor-client.h \
|
||||
mdp_client.h \
|
||||
msp_client.h \
|
||||
msp_common.h \
|
||||
radio_link.h \
|
||||
sqlite-amalgamation-3100200/sqlite3.h
|
||||
|
454
msp_client.c
454
msp_client.c
@ -31,55 +31,28 @@
|
||||
#include "log.h"
|
||||
#include "debug.h"
|
||||
|
||||
#define FLAG_SHUTDOWN (1<<0)
|
||||
#define FLAG_ACK (1<<1)
|
||||
#define FLAG_FIRST (1<<2)
|
||||
#define FLAG_STOP (1<<3)
|
||||
#define RETRANSMIT_TIME 1500
|
||||
#define HANDLER_KEEPALIVE 1000
|
||||
|
||||
struct msp_packet{
|
||||
struct msp_packet *_next;
|
||||
uint16_t seq;
|
||||
uint8_t flags;
|
||||
time_ms_t added;
|
||||
time_ms_t sent;
|
||||
const uint8_t *payload;
|
||||
size_t len;
|
||||
size_t offset;
|
||||
};
|
||||
#include "msp_common.h"
|
||||
|
||||
#define MAX_WINDOW_SIZE 4
|
||||
struct msp_window{
|
||||
unsigned packet_count;
|
||||
uint32_t base_rtt;
|
||||
uint32_t rtt;
|
||||
uint16_t next_seq; // seq of next expected TX or RX packet.
|
||||
time_ms_t last_activity;
|
||||
struct msp_packet *_head, *_tail;
|
||||
};
|
||||
|
||||
struct msp_sock{
|
||||
struct msp_sock *_next;
|
||||
struct msp_sock *_prev;
|
||||
struct msp_stream stream;
|
||||
msp_state_t last_state;
|
||||
unsigned salt;
|
||||
int mdp_sock;
|
||||
msp_state_t state;
|
||||
msp_state_t last_state;
|
||||
time_ms_t last_handler;
|
||||
struct msp_window tx;
|
||||
struct msp_window rx;
|
||||
uint16_t previous_ack;
|
||||
time_ms_t next_ack;
|
||||
MSP_HANDLER *handler;
|
||||
void *context;
|
||||
struct mdp_header header;
|
||||
time_ms_t timeout;
|
||||
time_ms_t next_action;
|
||||
};
|
||||
|
||||
#define SALT_INVALID 0xdeadbeef
|
||||
|
||||
|
||||
|
||||
|
||||
int msp_socket_is_valid(MSP_SOCKET handle)
|
||||
{
|
||||
// TODO Set up temporary SIGSEGV and SIGBUS handlers in case handle.ptr points to unmapped memory
|
||||
@ -124,16 +97,9 @@ MSP_SOCKET msp_socket(int mdp_sock, int flags)
|
||||
++salt_counter;
|
||||
sock->salt = salt_counter;
|
||||
sock->mdp_sock = mdp_sock;
|
||||
sock->state = MSP_STATE_UNINITIALISED;
|
||||
msp_stream_init(&sock->stream);
|
||||
sock->last_state = 0xFFFF;
|
||||
sock->last_handler = TIME_MS_NEVER_HAS;
|
||||
// TODO set base rtt to ensure that we send the first packet a few times before giving up
|
||||
sock->tx.base_rtt = sock->tx.rtt = 0xFFFFFFFF;
|
||||
sock->tx.last_activity = TIME_MS_NEVER_HAS;
|
||||
sock->rx.last_activity = TIME_MS_NEVER_HAS;
|
||||
sock->next_action = TIME_MS_NEVER_WILL;
|
||||
sock->timeout = gettime_ms() + 10000;
|
||||
sock->previous_ack = 0x7FFF;
|
||||
sock->_next = root;
|
||||
if (root)
|
||||
root->_prev = sock;
|
||||
@ -143,7 +109,7 @@ MSP_SOCKET msp_socket(int mdp_sock, int flags)
|
||||
|
||||
msp_state_t msp_get_state(MSP_SOCKET handle)
|
||||
{
|
||||
return handle_to_sock(&handle)->state;
|
||||
return handle_to_sock(&handle)->stream.state;
|
||||
}
|
||||
|
||||
int msp_socket_is_initialising(MSP_SOCKET handle)
|
||||
@ -156,7 +122,7 @@ int msp_socket_is_open(MSP_SOCKET handle)
|
||||
if (!msp_socket_is_valid(handle))
|
||||
return 0;
|
||||
msp_state_t state = msp_get_state(handle);
|
||||
return (state != MSP_STATE_UNINITIALISED || handle_to_sock(&handle)->tx.packet_count != 0)
|
||||
return (state != MSP_STATE_UNINITIALISED || handle_to_sock(&handle)->stream.tx.packet_count != 0)
|
||||
&& !(state & MSP_STATE_CLOSED);
|
||||
}
|
||||
|
||||
@ -174,7 +140,7 @@ int msp_socket_is_listening(MSP_SOCKET handle)
|
||||
int msp_socket_is_data(MSP_SOCKET handle)
|
||||
{
|
||||
return msp_socket_is_valid(handle)
|
||||
&& ((msp_get_state(handle) & MSP_STATE_DATAOUT) || handle_to_sock(&handle)->tx.packet_count != 0);
|
||||
&& ((msp_get_state(handle) & MSP_STATE_DATAOUT) || handle_to_sock(&handle)->stream.tx.packet_count != 0);
|
||||
}
|
||||
|
||||
int msp_socket_is_connected(MSP_SOCKET handle)
|
||||
@ -212,79 +178,29 @@ void msp_debug()
|
||||
DEBUGF(msp, "Msp sockets;");
|
||||
while(p){
|
||||
DEBUGF(msp, "State %d, from %s:%d to %s:%d, next %"PRId64"ms, ack %"PRId64"ms timeout %"PRId64"ms",
|
||||
p->state,
|
||||
p->stream.state,
|
||||
alloca_tohex_sid_t(p->header.local.sid), p->header.local.port,
|
||||
alloca_tohex_sid_t(p->header.remote.sid), p->header.remote.port,
|
||||
(p->next_action - now),
|
||||
(p->next_ack - now),
|
||||
(p->timeout - now));
|
||||
(p->stream.next_action - now),
|
||||
(p->stream.next_ack - now),
|
||||
(p->stream.timeout - now));
|
||||
p=p->_next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void free_all_packets(struct msp_window *window)
|
||||
{
|
||||
struct msp_packet *p = window->_head;
|
||||
while(p){
|
||||
struct msp_packet *free_me=p;
|
||||
p=p->_next;
|
||||
if (free_me->payload)
|
||||
free((void *)free_me->payload);
|
||||
free(free_me);
|
||||
}
|
||||
window->_head = NULL;
|
||||
window->packet_count=0;
|
||||
}
|
||||
|
||||
static void free_acked_packets(struct msp_window *window, uint16_t seq)
|
||||
{
|
||||
if (!window->_head)
|
||||
return;
|
||||
struct msp_packet *p = window->_head;
|
||||
uint32_t rtt=0xFFFFFFFF, rtt_max=0;
|
||||
time_ms_t now = gettime_ms();
|
||||
|
||||
while(p && compare_wrapped_uint16(p->seq, seq)<=0){
|
||||
if (p->sent!=TIME_MS_NEVER_HAS){
|
||||
uint32_t this_rtt=now - p->sent;
|
||||
if (rtt > this_rtt)
|
||||
rtt = this_rtt;
|
||||
if (rtt_max < this_rtt)
|
||||
rtt_max = this_rtt;
|
||||
}
|
||||
struct msp_packet *free_me=p;
|
||||
p=p->_next;
|
||||
if (free_me->payload)
|
||||
free((void *)free_me->payload);
|
||||
free(free_me);
|
||||
window->packet_count--;
|
||||
}
|
||||
window->_head = p;
|
||||
if (rtt!=0xFFFFFFFF){
|
||||
if (rtt < 10)
|
||||
rtt=10;
|
||||
window->rtt = rtt;
|
||||
if (window->base_rtt > rtt)
|
||||
window->base_rtt = rtt;
|
||||
DEBUGF(msp, "ACK %x, RTT %u-%u, base %u", seq, rtt, rtt_max, window->base_rtt);
|
||||
}
|
||||
if (!p)
|
||||
window->_tail = NULL;
|
||||
}
|
||||
|
||||
// call the handler if we need to
|
||||
static size_t call_handler(struct msp_sock *sock, const uint8_t *payload, size_t len)
|
||||
{
|
||||
// no handler? just consume everything
|
||||
size_t nconsumed = len;
|
||||
time_ms_t now = gettime_ms();
|
||||
if (sock->handler && (len || sock->last_state != sock->state || now - sock->last_handler > HANDLER_KEEPALIVE)) {
|
||||
if (sock->handler && (len || sock->last_state != sock->stream.state || now - sock->last_handler > HANDLER_KEEPALIVE)) {
|
||||
// remember what we are about to call, rather than what we just called
|
||||
// we don't want to miss a state change due to re-entrancy.
|
||||
sock->last_state = sock->state;
|
||||
sock->last_state = sock->stream.state;
|
||||
sock->last_handler = now;
|
||||
nconsumed = sock->handler(sock_to_handle(sock), sock->state, payload, len, sock->context);
|
||||
nconsumed = sock->handler(sock_to_handle(sock), sock->stream.state, payload, len, sock->context);
|
||||
assert(nconsumed <= len);
|
||||
}
|
||||
return nconsumed;
|
||||
@ -292,7 +208,7 @@ static size_t call_handler(struct msp_sock *sock, const uint8_t *payload, size_t
|
||||
|
||||
static void msp_free(struct msp_sock *sock)
|
||||
{
|
||||
sock->state |= MSP_STATE_CLOSED;
|
||||
sock->stream.state |= MSP_STATE_CLOSED;
|
||||
// remove from the list first
|
||||
if (sock->_prev)
|
||||
sock->_prev->_next = sock->_next;
|
||||
@ -301,8 +217,8 @@ static void msp_free(struct msp_sock *sock)
|
||||
if (sock->_next)
|
||||
sock->_next->_prev = sock->_prev;
|
||||
|
||||
free_all_packets(&sock->tx);
|
||||
free_all_packets(&sock->rx);
|
||||
free_all_packets(&sock->stream.tx);
|
||||
free_all_packets(&sock->stream.rx);
|
||||
|
||||
// one last chance for clients to free other resources
|
||||
call_handler(sock, NULL, 0);
|
||||
@ -313,14 +229,15 @@ static void msp_free(struct msp_sock *sock)
|
||||
void msp_stop(MSP_SOCKET handle)
|
||||
{
|
||||
struct msp_sock *sock = handle_to_sock(&handle);
|
||||
if (sock->state & MSP_STATE_STOPPED)
|
||||
if (sock->stream.state & MSP_STATE_STOPPED)
|
||||
return;
|
||||
|
||||
sock->state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED;
|
||||
sock->state &= ~MSP_STATE_DATAOUT;
|
||||
sock->stream.state |= MSP_STATE_STOPPED | MSP_STATE_CLOSED;
|
||||
sock->stream.state &= ~MSP_STATE_DATAOUT;
|
||||
free_all_packets(&sock->stream.tx);
|
||||
|
||||
// if this a connectable socket, send a stop packet
|
||||
if (sock->header.remote.port && !(sock->state & MSP_STATE_LISTENING)){
|
||||
if (sock->header.remote.port && !(sock->stream.state & MSP_STATE_LISTENING)){
|
||||
uint8_t response = FLAG_STOP;
|
||||
// we don't have a matching socket, reply with STOP flag to force breaking the connection
|
||||
// TODO global rate limit?
|
||||
@ -350,37 +267,37 @@ void msp_set_handler(MSP_SOCKET handle, MSP_HANDLER *handler, void *context)
|
||||
void msp_set_local(MSP_SOCKET handle, const struct mdp_sockaddr *local)
|
||||
{
|
||||
struct msp_sock *sock = handle_to_sock(&handle);
|
||||
assert(sock->state == MSP_STATE_UNINITIALISED);
|
||||
assert(sock->stream.state == MSP_STATE_UNINITIALISED);
|
||||
sock->header.local = *local;
|
||||
}
|
||||
|
||||
void msp_connect(MSP_SOCKET handle, const struct mdp_sockaddr *remote)
|
||||
{
|
||||
struct msp_sock *sock = handle_to_sock(&handle);
|
||||
assert(sock->state == MSP_STATE_UNINITIALISED);
|
||||
assert(sock->stream.state == MSP_STATE_UNINITIALISED);
|
||||
sock->header.remote = *remote;
|
||||
sock->state|=MSP_STATE_DATAOUT;
|
||||
sock->stream.state|=MSP_STATE_DATAOUT;
|
||||
// make sure we send a packet soon
|
||||
sock->next_ack = gettime_ms()+10;
|
||||
sock->next_action = sock->next_ack;
|
||||
sock->stream.next_ack = gettime_ms()+10;
|
||||
sock->stream.next_action = sock->stream.next_ack;
|
||||
}
|
||||
|
||||
int msp_listen(MSP_SOCKET handle)
|
||||
{
|
||||
struct msp_sock *sock = handle_to_sock(&handle);
|
||||
assert(sock->state == MSP_STATE_UNINITIALISED);
|
||||
assert(sock->stream.state == MSP_STATE_UNINITIALISED);
|
||||
assert(sock->header.local.port);
|
||||
|
||||
sock->state |= MSP_STATE_LISTENING;
|
||||
sock->stream.state |= MSP_STATE_LISTENING;
|
||||
sock->header.flags |= MDP_FLAG_BIND;
|
||||
|
||||
if (mdp_send(sock->mdp_sock, &sock->header, NULL, 0)==-1){
|
||||
sock->state|=MSP_STATE_ERROR|MSP_STATE_CLOSED;
|
||||
sock->stream.state|=MSP_STATE_ERROR|MSP_STATE_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
sock->timeout = gettime_ms()+1000;
|
||||
sock->next_action = sock->timeout;
|
||||
// allow 1s for the local serval daemon to respond
|
||||
sock->stream.timeout = gettime_ms()+1000;
|
||||
sock->stream.next_action = sock->stream.timeout;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -394,65 +311,6 @@ void msp_get_remote(MSP_SOCKET handle, struct mdp_sockaddr *remote)
|
||||
*remote = handle_to_sock(&handle)->header.remote;
|
||||
}
|
||||
|
||||
static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags, const uint8_t *payload, size_t len)
|
||||
{
|
||||
|
||||
struct msp_packet **insert_pos=NULL;
|
||||
|
||||
if (!window->_head){
|
||||
insert_pos = &window->_head;
|
||||
}else{
|
||||
if (window->_tail->seq == seq){
|
||||
// ignore duplicate packets
|
||||
DEBUGF(msp, "Ignore duplicate packet %02x", seq);
|
||||
return 0;
|
||||
}else if (compare_wrapped_uint16(window->_tail->seq, seq)<0){
|
||||
if (compare_wrapped_uint16(window->_head->seq, seq)>0){
|
||||
// this is ambiguous
|
||||
return WHYF("%04x is both < tail (%04x) and > head (%04x)", seq, window->_tail->seq, window->_head->seq);
|
||||
}
|
||||
insert_pos = &window->_tail->_next;
|
||||
}else{
|
||||
insert_pos = &window->_head;
|
||||
while(compare_wrapped_uint16((*insert_pos)->seq, seq)<0)
|
||||
insert_pos = &(*insert_pos)->_next;
|
||||
if ((*insert_pos)->seq == seq){
|
||||
// ignore duplicate packets
|
||||
DEBUGF(msp, "Ignore duplicate packet %02x", seq);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet));
|
||||
if (!packet)
|
||||
return -1;
|
||||
|
||||
packet->_next = (*insert_pos);
|
||||
*insert_pos = packet;
|
||||
if (!packet->_next)
|
||||
window->_tail = packet;
|
||||
packet->added = gettime_ms();
|
||||
packet->seq = seq;
|
||||
packet->flags = flags;
|
||||
packet->len = len;
|
||||
packet->offset = 0;
|
||||
packet->sent = TIME_MS_NEVER_HAS;
|
||||
|
||||
if (payload && len){
|
||||
uint8_t *p = emalloc(len);
|
||||
if (!p){
|
||||
free(packet);
|
||||
return -1;
|
||||
}
|
||||
packet->payload = p;
|
||||
bcopy(payload, p, len);
|
||||
}
|
||||
window->packet_count++;
|
||||
DEBUGF(msp, "Add packet %02x", seq);
|
||||
return 1;
|
||||
}
|
||||
|
||||
struct socket_address daemon_addr={.addrlen=0,};
|
||||
|
||||
static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
|
||||
@ -465,18 +323,7 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
|
||||
|
||||
uint8_t msp_header[MSP_PAYLOAD_PREAMBLE_SIZE];
|
||||
|
||||
msp_header[0]=packet->flags;
|
||||
|
||||
// only set the ack flag if we've received a sequenced packet
|
||||
if (sock->state & MSP_STATE_RECEIVED_DATA)
|
||||
msp_header[0]|=FLAG_ACK;
|
||||
// never received anything? set the connect flag
|
||||
if (!(sock->state & MSP_STATE_RECEIVED_PACKET))
|
||||
msp_header[0]|=FLAG_FIRST;
|
||||
|
||||
write_uint16(&msp_header[1], sock->rx.next_seq -1);
|
||||
write_uint16(&msp_header[3], packet->seq);
|
||||
sock->previous_ack = sock->rx.next_seq -1;
|
||||
msp_write_preamble(msp_header, &sock->stream, packet);
|
||||
|
||||
struct fragmented_data data={
|
||||
.fragment_count=3,
|
||||
@ -497,7 +344,7 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
|
||||
};
|
||||
|
||||
// allow for sending an empty payload body
|
||||
if (!(packet->payload && packet->len))
|
||||
if (!packet->len)
|
||||
data.fragment_count --;
|
||||
|
||||
ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data);
|
||||
@ -507,9 +354,6 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
|
||||
msp_close_all(sock->mdp_sock);
|
||||
return -1;
|
||||
}
|
||||
DEBUGF(msp, "Sent packet flags %02x seq %02x len %zd (acked %02x)", msp_header[0], packet->seq, packet->len, sock->rx.next_seq -1);
|
||||
sock->tx.last_activity = packet->sent = gettime_ms();
|
||||
sock->next_ack = packet->sent + RETRANSMIT_TIME;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -522,18 +366,7 @@ static int send_ack(struct msp_sock *sock)
|
||||
}
|
||||
|
||||
uint8_t msp_header[3];
|
||||
|
||||
msp_header[0]=0;
|
||||
// if we haven't heard a sequence number, we can't ack data
|
||||
// (but we can indicate the existence of the connection)
|
||||
if (sock->state & MSP_STATE_RECEIVED_DATA)
|
||||
msp_header[0]|=FLAG_ACK;
|
||||
|
||||
// never received anything? set the connect flag
|
||||
if (!(sock->state & MSP_STATE_RECEIVED_PACKET))
|
||||
msp_header[0]|=FLAG_FIRST;
|
||||
|
||||
write_uint16(&msp_header[1], sock->rx.next_seq -1);
|
||||
msp_write_ack_header(msp_header, &sock->stream);
|
||||
|
||||
struct fragmented_data data={
|
||||
.fragment_count=2,
|
||||
@ -555,10 +388,6 @@ static int send_ack(struct msp_sock *sock)
|
||||
msp_close_all(sock->mdp_sock);
|
||||
return -1;
|
||||
}
|
||||
DEBUGF(msp, "Sent packet flags %02x (acked %02x)", msp_header[0], sock->rx.next_seq -1);
|
||||
sock->previous_ack = sock->rx.next_seq -1;
|
||||
sock->tx.last_activity = gettime_ms();
|
||||
sock->next_ack = sock->tx.last_activity + RETRANSMIT_TIME;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -566,41 +395,15 @@ static int send_ack(struct msp_sock *sock)
|
||||
ssize_t msp_send(MSP_SOCKET handle, const uint8_t *payload, size_t len)
|
||||
{
|
||||
struct msp_sock *sock = handle_to_sock(&handle);
|
||||
assert(!(sock->state&MSP_STATE_LISTENING));
|
||||
assert(sock->header.remote.port);
|
||||
assert((sock->state & MSP_STATE_SHUTDOWN_LOCAL)==0);
|
||||
|
||||
if ((sock->state & MSP_STATE_CLOSED) || sock->tx.packet_count > MAX_WINDOW_SIZE)
|
||||
return -1;
|
||||
if (add_packet(&sock->tx, sock->tx.next_seq, 0, payload, len)==-1)
|
||||
return -1;
|
||||
|
||||
sock->tx.next_seq++;
|
||||
if (sock->tx.packet_count>=MAX_WINDOW_SIZE)
|
||||
sock->state&=~MSP_STATE_DATAOUT;
|
||||
// make sure we attempt to process packets from this sock soon
|
||||
// TODO calculate based on congestion window
|
||||
sock->next_action = gettime_ms();
|
||||
|
||||
return len;
|
||||
return msp_stream_send(&sock->stream, payload, len);
|
||||
}
|
||||
|
||||
int msp_shutdown(MSP_SOCKET handle)
|
||||
{
|
||||
struct msp_sock *sock = handle_to_sock(&handle);
|
||||
assert(!(sock->state&MSP_STATE_LISTENING));
|
||||
assert(!(sock->state&MSP_STATE_SHUTDOWN_LOCAL));
|
||||
if (sock->tx._tail && sock->tx._tail->sent==TIME_MS_NEVER_HAS){
|
||||
sock->tx._tail->flags |= FLAG_SHUTDOWN;
|
||||
}else{
|
||||
if (add_packet(&sock->tx, sock->tx.next_seq, FLAG_SHUTDOWN, NULL, 0)==-1)
|
||||
return -1;
|
||||
sock->tx.next_seq++;
|
||||
}
|
||||
sock->state|=MSP_STATE_SHUTDOWN_LOCAL;
|
||||
sock->state&=~MSP_STATE_DATAOUT;
|
||||
// make sure we send a packet soon
|
||||
sock->next_action = gettime_ms();
|
||||
msp_stream_shutdown(&sock->stream);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -618,32 +421,16 @@ static int pending_bind(int fd)
|
||||
|
||||
static int process_sock(struct msp_sock *sock)
|
||||
{
|
||||
time_ms_t now = gettime_ms();
|
||||
|
||||
if (sock->timeout < now){
|
||||
sock->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR);
|
||||
return WHY("MSP socket timed out");
|
||||
}
|
||||
|
||||
sock->next_action = sock->timeout;
|
||||
|
||||
if (sock->state & MSP_STATE_LISTENING)
|
||||
return 0;
|
||||
|
||||
struct msp_packet *p;
|
||||
int r = msp_stream_process(&sock->stream);
|
||||
if (r!=0)
|
||||
return r;
|
||||
|
||||
// deliver packets that have now arrived in order
|
||||
p = sock->rx._head;
|
||||
|
||||
// TODO ... ? (sock->state & MSP_STATE_POLLIN)
|
||||
while(p && p->seq == sock->rx.next_seq){
|
||||
struct msp_packet *packet=p;
|
||||
|
||||
// process packet flags when we are about to deliver the last packet
|
||||
if (packet->flags & FLAG_SHUTDOWN)
|
||||
sock->state|=MSP_STATE_SHUTDOWN_REMOTE;
|
||||
|
||||
assert(packet->offset <= packet->len);
|
||||
while(1){
|
||||
struct msp_packet *packet = msp_stream_next(&sock->stream);
|
||||
if (!packet)
|
||||
break;
|
||||
|
||||
size_t nconsumed = call_handler(sock, packet->payload + packet->offset, packet->len - packet->offset);
|
||||
|
||||
// stop calling the handler if nothing was consumed
|
||||
@ -651,35 +438,32 @@ static int process_sock(struct msp_sock *sock)
|
||||
if (nconsumed == 0 && packet->len > packet->offset)
|
||||
break;
|
||||
|
||||
packet->offset += nconsumed;
|
||||
// keep the packet if the handler has not consumed it all, let the handler try again
|
||||
if (packet->offset < packet->len)
|
||||
continue;
|
||||
assert(packet->offset == packet->len);
|
||||
|
||||
p=p->_next;
|
||||
sock->rx.next_seq++;
|
||||
msp_consume_packet(&sock->stream, packet, nconsumed);
|
||||
}
|
||||
free_acked_packets(&sock->rx, sock->rx.next_seq -1);
|
||||
|
||||
call_handler(sock, NULL, 0);
|
||||
if (sock->handler && sock->next_action > sock->last_handler + HANDLER_KEEPALIVE)
|
||||
sock->next_action = sock->last_handler + HANDLER_KEEPALIVE;
|
||||
unsigned count=0;
|
||||
p = sock->tx._head;
|
||||
if (sock->handler && sock->stream.next_action > sock->last_handler + HANDLER_KEEPALIVE)
|
||||
sock->stream.next_action = sock->last_handler + HANDLER_KEEPALIVE;
|
||||
|
||||
unsigned count = 0;
|
||||
|
||||
// do we need this?
|
||||
struct msp_packet *p = sock->stream.tx._head;
|
||||
while(p){
|
||||
count++;
|
||||
p=p->_next;
|
||||
}
|
||||
assert(count == sock->tx.packet_count);
|
||||
assert(count == sock->stream.tx.packet_count);
|
||||
|
||||
if (count >= MAX_WINDOW_SIZE || (sock->state & (MSP_STATE_CLOSED|MSP_STATE_SHUTDOWN_LOCAL)))
|
||||
assert(!(sock->state & MSP_STATE_DATAOUT));
|
||||
if (count >= MAX_WINDOW_SIZE || (sock->stream.state & (MSP_STATE_CLOSED|MSP_STATE_SHUTDOWN_LOCAL)))
|
||||
assert(!(sock->stream.state & MSP_STATE_DATAOUT));
|
||||
else
|
||||
assert(sock->state & MSP_STATE_DATAOUT);
|
||||
|
||||
assert(sock->stream.state & MSP_STATE_DATAOUT);
|
||||
|
||||
|
||||
// transmit packets that can now be sent
|
||||
p = sock->tx._head;
|
||||
time_ms_t now = gettime_ms();
|
||||
p = sock->stream.tx._head;
|
||||
while(p){
|
||||
if (p->sent + RETRANSMIT_TIME < now){
|
||||
if (!sock->header.local.port){
|
||||
@ -694,13 +478,13 @@ static int process_sock(struct msp_sock *sock)
|
||||
if (r)
|
||||
break;
|
||||
}
|
||||
if (sock->next_action > p->sent + RETRANSMIT_TIME)
|
||||
sock->next_action = p->sent + RETRANSMIT_TIME;
|
||||
if (sock->stream.next_action > p->sent + RETRANSMIT_TIME)
|
||||
sock->stream.next_action = p->sent + RETRANSMIT_TIME;
|
||||
p=p->_next;
|
||||
}
|
||||
|
||||
// should we send an ack now without sending a payload?
|
||||
if (now > sock->next_ack){
|
||||
if (now > sock->stream.next_ack){
|
||||
if (!sock->header.local.port){
|
||||
if (sock->header.flags & MDP_FLAG_BIND)
|
||||
// wait until we have heard back from the daemon with our port number before sending another packet.
|
||||
@ -712,18 +496,18 @@ static int process_sock(struct msp_sock *sock)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sock->next_action > sock->next_ack)
|
||||
sock->next_action = sock->next_ack;
|
||||
if (sock->stream.next_action > sock->stream.next_ack)
|
||||
sock->stream.next_action = sock->stream.next_ack;
|
||||
|
||||
// when we've delivered all local packets
|
||||
// and all our data packets have been acked, close.
|
||||
if ( (sock->state & MSP_STATE_SHUTDOWN_LOCAL)
|
||||
&& (sock->state & MSP_STATE_SHUTDOWN_REMOTE)
|
||||
&& sock->tx.packet_count == 0
|
||||
&& sock->rx.packet_count == 0
|
||||
&& sock->previous_ack == sock->rx.next_seq -1
|
||||
if ( (sock->stream.state & MSP_STATE_SHUTDOWN_LOCAL)
|
||||
&& (sock->stream.state & MSP_STATE_SHUTDOWN_REMOTE)
|
||||
&& sock->stream.tx.packet_count == 0
|
||||
&& sock->stream.rx.packet_count == 0
|
||||
&& sock->stream.previous_ack == sock->stream.rx.next_seq -1
|
||||
){
|
||||
sock->state |= MSP_STATE_CLOSED;
|
||||
sock->stream.state |= MSP_STATE_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -769,14 +553,14 @@ int msp_processing(time_ms_t *next_action)
|
||||
time_ms_t next=TIME_MS_NEVER_WILL;
|
||||
sock = root;
|
||||
while(sock){
|
||||
if (sock->state & MSP_STATE_CLOSED){
|
||||
if (sock->stream.state & MSP_STATE_CLOSED){
|
||||
struct msp_sock *s = sock->_next;
|
||||
msp_release(sock);
|
||||
msp_free(sock);
|
||||
sock=s;
|
||||
}else{
|
||||
if (sock->next_action < next)
|
||||
next=sock->next_action;
|
||||
if (sock->stream.next_action < next)
|
||||
next=sock->stream.next_action;
|
||||
sock = sock->_next;
|
||||
}
|
||||
}
|
||||
@ -792,7 +576,24 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
|
||||
return -1;
|
||||
}
|
||||
|
||||
uint8_t flags=0;
|
||||
if (header->flags & MDP_FLAG_BIND){
|
||||
// process bind response from the daemon
|
||||
struct msp_sock *s=root;
|
||||
while(s){
|
||||
if (s->mdp_sock == mdp_sock && (s->header.flags & MDP_FLAG_BIND)){
|
||||
s->header.local = header->local;
|
||||
s->header.flags &= ~MDP_FLAG_BIND;
|
||||
DEBUGF(msp, "Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port);
|
||||
if (s->stream.state & MSP_STATE_LISTENING)
|
||||
s->stream.next_action = s->stream.timeout = TIME_MS_NEVER_WILL;
|
||||
else
|
||||
s->stream.next_action = gettime_ms();
|
||||
return 0;
|
||||
}
|
||||
s = s->_next;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
// find or create mdp_sock...
|
||||
struct msp_sock *sock=NULL;
|
||||
@ -801,20 +602,7 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
|
||||
struct msp_sock *listen=NULL;
|
||||
while(s){
|
||||
if (s->mdp_sock == mdp_sock ){
|
||||
|
||||
if ((s->header.flags & MDP_FLAG_BIND) && (header->flags & MDP_FLAG_BIND)){
|
||||
// process bind response from the daemon
|
||||
s->header.local = header->local;
|
||||
s->header.flags &= ~MDP_FLAG_BIND;
|
||||
DEBUGF(msp, "Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port);
|
||||
if (s->state & MSP_STATE_LISTENING)
|
||||
s->next_action = s->timeout = TIME_MS_NEVER_WILL;
|
||||
else
|
||||
s->next_action = gettime_ms();
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (s->state & MSP_STATE_LISTENING){
|
||||
if (s->stream.state & MSP_STATE_LISTENING){
|
||||
// remember any matching listen socket so we can create a connection on first use
|
||||
if (s->header.local.port == header->local.port
|
||||
&& (is_sid_t_any(s->header.local.sid)
|
||||
@ -832,10 +620,10 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
|
||||
|
||||
if (len<1)
|
||||
return WHY("Expected at least 1 byte");
|
||||
flags = payload[0];
|
||||
uint8_t flags = payload[0];
|
||||
|
||||
// ignore any stop packet if we have no matching connection
|
||||
if (!sock && flags & FLAG_STOP){
|
||||
if (!sock && (flags & FLAG_STOP)){
|
||||
DEBUGF(msp, "Ignoring STOP packet, no matching connection");
|
||||
return 0;
|
||||
}
|
||||
@ -863,47 +651,7 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
|
||||
}
|
||||
}
|
||||
|
||||
sock->rx.last_activity = gettime_ms();
|
||||
sock->timeout = sock->rx.last_activity + 10000;
|
||||
sock->state |= MSP_STATE_RECEIVED_PACKET;
|
||||
|
||||
if (flags & FLAG_STOP){
|
||||
DEBUGF(msp, "Closing socket due to STOP packet");
|
||||
msp_stop(sock_to_handle(sock));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (len<3)
|
||||
return 0;
|
||||
|
||||
if (flags & FLAG_ACK){
|
||||
uint16_t ack_seq = read_uint16(&payload[1]);
|
||||
// release acknowledged packets
|
||||
free_acked_packets(&sock->tx, ack_seq);
|
||||
|
||||
// TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet
|
||||
}
|
||||
|
||||
// we might have space for more data now
|
||||
if (sock->tx.packet_count < MAX_WINDOW_SIZE
|
||||
&& !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)
|
||||
&& !(sock->state & MSP_STATE_CLOSED)){
|
||||
sock->state|=MSP_STATE_DATAOUT;
|
||||
}
|
||||
|
||||
// make sure we attempt to process packets from this sock soon
|
||||
// TODO calculate based on congestion window
|
||||
sock->next_action = gettime_ms();
|
||||
|
||||
if (len<MSP_PAYLOAD_PREAMBLE_SIZE)
|
||||
return 0;
|
||||
|
||||
sock->state |= MSP_STATE_RECEIVED_DATA;
|
||||
uint16_t seq = read_uint16(&payload[3]);
|
||||
|
||||
if (add_packet(&sock->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1)
|
||||
sock->next_ack = gettime_ms();
|
||||
return 0;
|
||||
return msp_process_packet(&sock->stream, payload, len);
|
||||
}
|
||||
|
||||
int msp_recv(int mdp_sock)
|
||||
|
@ -65,6 +65,9 @@ msp_state_t msp_get_state(MSP_SOCKET sock);
|
||||
#define MSP_STATE_DATAOUT ((msp_state_t) (1<<7))
|
||||
#define MSP_STATE_STOPPED ((msp_state_t) (1<<8))
|
||||
|
||||
// stream timeout
|
||||
#define MSP_TIMEOUT 10000
|
||||
|
||||
int msp_socket_is_initialising(MSP_SOCKET);
|
||||
int msp_socket_is_open(MSP_SOCKET);
|
||||
int msp_socket_is_closed(MSP_SOCKET);
|
||||
|
320
msp_common.h
Normal file
320
msp_common.h
Normal file
@ -0,0 +1,320 @@
|
||||
#ifndef __SERVAL_DNA__MSP_COMMON_H
|
||||
#define __SERVAL_DNA__MSP_COMMON_H
|
||||
|
||||
|
||||
#define FLAG_SHUTDOWN (1<<0)
|
||||
#define FLAG_ACK (1<<1)
|
||||
#define FLAG_FIRST (1<<2)
|
||||
#define FLAG_STOP (1<<3)
|
||||
#define RETRANSMIT_TIME 1500
|
||||
#define HANDLER_KEEPALIVE 1000
|
||||
|
||||
struct msp_packet{
|
||||
struct msp_packet *_next;
|
||||
uint16_t seq;
|
||||
uint8_t flags;
|
||||
time_ms_t added;
|
||||
time_ms_t sent;
|
||||
size_t len;
|
||||
size_t offset;
|
||||
uint8_t payload[];
|
||||
};
|
||||
|
||||
#define MAX_WINDOW_SIZE 4
|
||||
struct msp_window{
|
||||
unsigned packet_count;
|
||||
uint32_t base_rtt;
|
||||
uint32_t rtt;
|
||||
uint16_t next_seq; // seq of next expected TX or RX packet.
|
||||
time_ms_t last_activity;
|
||||
struct msp_packet *_head, *_tail;
|
||||
};
|
||||
|
||||
struct msp_stream{
|
||||
msp_state_t state;
|
||||
struct msp_window tx;
|
||||
struct msp_window rx;
|
||||
uint16_t previous_ack;
|
||||
time_ms_t next_ack;
|
||||
time_ms_t timeout;
|
||||
time_ms_t next_action;
|
||||
};
|
||||
|
||||
static void msp_stream_init(struct msp_stream *stream)
|
||||
{
|
||||
stream->state = MSP_STATE_UNINITIALISED;
|
||||
// TODO set base rtt to ensure that we send the first packet a few times before giving up
|
||||
stream->tx.base_rtt = stream->tx.rtt = 0xFFFFFFFF;
|
||||
stream->tx.last_activity = TIME_MS_NEVER_HAS;
|
||||
stream->rx.last_activity = TIME_MS_NEVER_HAS;
|
||||
stream->next_action = TIME_MS_NEVER_WILL;
|
||||
stream->timeout = gettime_ms() + 10000;
|
||||
stream->previous_ack = 0x7FFF;
|
||||
}
|
||||
|
||||
static void free_all_packets(struct msp_window *window)
|
||||
{
|
||||
struct msp_packet *p = window->_head;
|
||||
while(p){
|
||||
struct msp_packet *free_me=p;
|
||||
p=p->_next;
|
||||
free(free_me);
|
||||
}
|
||||
window->_head = NULL;
|
||||
window->packet_count=0;
|
||||
}
|
||||
|
||||
static void free_acked_packets(struct msp_window *window, uint16_t seq)
|
||||
{
|
||||
if (!window->_head)
|
||||
return;
|
||||
struct msp_packet *p = window->_head;
|
||||
uint32_t rtt=0xFFFFFFFF, rtt_max=0;
|
||||
time_ms_t now = gettime_ms();
|
||||
|
||||
while(p && compare_wrapped_uint16(p->seq, seq)<=0){
|
||||
if (p->sent!=TIME_MS_NEVER_HAS){
|
||||
uint32_t this_rtt=now - p->sent;
|
||||
if (rtt > this_rtt)
|
||||
rtt = this_rtt;
|
||||
if (rtt_max < this_rtt)
|
||||
rtt_max = this_rtt;
|
||||
}
|
||||
struct msp_packet *free_me=p;
|
||||
p=p->_next;
|
||||
free(free_me);
|
||||
window->packet_count--;
|
||||
}
|
||||
window->_head = p;
|
||||
if (rtt!=0xFFFFFFFF){
|
||||
if (rtt < 10)
|
||||
rtt=10;
|
||||
window->rtt = rtt;
|
||||
if (window->base_rtt > rtt)
|
||||
window->base_rtt = rtt;
|
||||
DEBUGF(msp, "ACK %x, RTT %u-%u, base %u", seq, rtt, rtt_max, window->base_rtt);
|
||||
}
|
||||
if (!p)
|
||||
window->_tail = NULL;
|
||||
}
|
||||
|
||||
static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags, const uint8_t *payload, size_t len)
|
||||
{
|
||||
assert(payload || len==0);
|
||||
struct msp_packet **insert_pos=NULL;
|
||||
|
||||
if (!window->_head){
|
||||
insert_pos = &window->_head;
|
||||
}else{
|
||||
if (window->_tail->seq == seq){
|
||||
// ignore duplicate packets
|
||||
DEBUGF(msp, "Ignore duplicate packet %02x", seq);
|
||||
return 0;
|
||||
}else if (compare_wrapped_uint16(window->_tail->seq, seq)<0){
|
||||
if (compare_wrapped_uint16(window->_head->seq, seq)>0){
|
||||
// this is ambiguous
|
||||
return WHYF("%04x is both < tail (%04x) and > head (%04x)", seq, window->_tail->seq, window->_head->seq);
|
||||
}
|
||||
insert_pos = &window->_tail->_next;
|
||||
}else{
|
||||
insert_pos = &window->_head;
|
||||
while(compare_wrapped_uint16((*insert_pos)->seq, seq)<0)
|
||||
insert_pos = &(*insert_pos)->_next;
|
||||
if ((*insert_pos)->seq == seq){
|
||||
// ignore duplicate packets
|
||||
DEBUGF(msp, "Ignore duplicate packet %02x", seq);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet) + len);
|
||||
if (!packet)
|
||||
return -1;
|
||||
|
||||
packet->_next = (*insert_pos);
|
||||
*insert_pos = packet;
|
||||
if (!packet->_next)
|
||||
window->_tail = packet;
|
||||
packet->added = gettime_ms();
|
||||
packet->seq = seq;
|
||||
packet->flags = flags;
|
||||
packet->len = len;
|
||||
packet->offset = 0;
|
||||
packet->sent = TIME_MS_NEVER_HAS;
|
||||
|
||||
if (len)
|
||||
bcopy(payload, packet->payload, len);
|
||||
window->packet_count++;
|
||||
DEBUGF(msp, "Add packet %02x", seq);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static size_t msp_write_ack_header(uint8_t *header, struct msp_stream *stream)
|
||||
{
|
||||
header[0]=0;
|
||||
// if we haven't heard a sequence number, we can't ack data
|
||||
// (but we can indicate the existence of the connection)
|
||||
if (stream->state & MSP_STATE_RECEIVED_DATA)
|
||||
header[0]|=FLAG_ACK;
|
||||
|
||||
// never received anything? set the connect flag
|
||||
if (!(stream->state & MSP_STATE_RECEIVED_PACKET))
|
||||
header[0]|=FLAG_FIRST;
|
||||
|
||||
write_uint16(&header[1], stream->rx.next_seq -1);
|
||||
|
||||
stream->previous_ack = stream->rx.next_seq -1;
|
||||
stream->tx.last_activity = gettime_ms();
|
||||
stream->next_ack = stream->tx.last_activity + RETRANSMIT_TIME;
|
||||
|
||||
DEBUGF(msp, "Sending packet flags %02x (acked %02x)",
|
||||
header[0], stream->rx.next_seq -1);
|
||||
return 3;
|
||||
}
|
||||
|
||||
static size_t msp_write_preamble(uint8_t *header, struct msp_stream *stream, struct msp_packet *packet)
|
||||
{
|
||||
msp_write_ack_header(header, stream);
|
||||
header[0]|=packet->flags;
|
||||
|
||||
write_uint16(&header[3], packet->seq);
|
||||
|
||||
DEBUGF(msp, "With packet flags %02x seq %02x len %zd",
|
||||
header[0], packet->seq, packet->len);
|
||||
packet->sent = stream->tx.last_activity;
|
||||
return 5;
|
||||
}
|
||||
|
||||
static ssize_t msp_stream_send(struct msp_stream *stream, const uint8_t *payload, size_t len)
|
||||
{
|
||||
assert(!(stream->state & MSP_STATE_LISTENING));
|
||||
assert((stream->state & MSP_STATE_SHUTDOWN_LOCAL)==0);
|
||||
|
||||
if ((stream->state & MSP_STATE_CLOSED) || stream->tx.packet_count > MAX_WINDOW_SIZE)
|
||||
return -1;
|
||||
if (add_packet(&stream->tx, stream->tx.next_seq, 0, payload, len)==-1)
|
||||
return -1;
|
||||
|
||||
stream->tx.next_seq++;
|
||||
if (stream->tx.packet_count>=MAX_WINDOW_SIZE)
|
||||
stream->state&=~MSP_STATE_DATAOUT;
|
||||
// make sure we attempt to process packets from this sock soon
|
||||
// TODO calculate based on congestion window
|
||||
stream->next_action = gettime_ms();
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
static int msp_stream_shutdown(struct msp_stream *stream)
|
||||
{
|
||||
assert(!(stream->state&MSP_STATE_LISTENING));
|
||||
assert(!(stream->state&MSP_STATE_SHUTDOWN_LOCAL));
|
||||
if (stream->tx._tail && stream->tx._tail->sent==TIME_MS_NEVER_HAS){
|
||||
stream->tx._tail->flags |= FLAG_SHUTDOWN;
|
||||
}else{
|
||||
if (add_packet(&stream->tx, stream->tx.next_seq, FLAG_SHUTDOWN, NULL, 0)==-1)
|
||||
return -1;
|
||||
stream->tx.next_seq++;
|
||||
}
|
||||
stream->state|=MSP_STATE_SHUTDOWN_LOCAL;
|
||||
stream->state&=~MSP_STATE_DATAOUT;
|
||||
// make sure we send a packet soon
|
||||
stream->next_action = gettime_ms();
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int msp_stream_process(struct msp_stream *stream)
|
||||
{
|
||||
time_ms_t now = gettime_ms();
|
||||
if (stream->timeout < now){
|
||||
stream->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR);
|
||||
return WHY("MSP socket timed out");
|
||||
}
|
||||
stream->next_action = stream->timeout;
|
||||
if (stream->state & MSP_STATE_LISTENING)
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// return the next in-order packet
|
||||
static struct msp_packet *msp_stream_next(struct msp_stream *stream)
|
||||
{
|
||||
struct msp_packet *packet = stream->rx._head;
|
||||
if (!packet || packet->seq != stream->rx.next_seq)
|
||||
return NULL;
|
||||
|
||||
assert(packet->offset <= packet->len);
|
||||
|
||||
if (packet->flags & FLAG_SHUTDOWN)
|
||||
stream->state|=MSP_STATE_SHUTDOWN_REMOTE;
|
||||
|
||||
return packet;
|
||||
}
|
||||
|
||||
static void msp_consume_packet(struct msp_stream *stream, struct msp_packet *packet, size_t consumed)
|
||||
{
|
||||
assert(packet->seq==stream->rx.next_seq);
|
||||
packet->offset += consumed;
|
||||
if (packet->offset < packet->len)
|
||||
return;
|
||||
|
||||
free_acked_packets(&stream->rx, stream->rx.next_seq);
|
||||
stream->rx.next_seq++;
|
||||
}
|
||||
|
||||
static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload, size_t len)
|
||||
{
|
||||
if (len<1)
|
||||
return WHY("Expected at least 1 byte");
|
||||
|
||||
uint8_t flags = payload[0];
|
||||
time_ms_t now = gettime_ms();
|
||||
stream->rx.last_activity = now;
|
||||
stream->timeout = stream->rx.last_activity + MSP_TIMEOUT;
|
||||
stream->state |= MSP_STATE_RECEIVED_PACKET;
|
||||
|
||||
if (flags & FLAG_STOP){
|
||||
DEBUGF(msp, "Closing socket due to STOP packet");
|
||||
stream->state |= (MSP_STATE_CLOSED|MSP_STATE_STOPPED);
|
||||
stream->state &= ~MSP_STATE_DATAOUT;
|
||||
free_all_packets(&stream->tx);
|
||||
stream->next_action = now;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (len<3)
|
||||
return 0;
|
||||
|
||||
if (flags & FLAG_ACK){
|
||||
uint16_t ack_seq = read_uint16(&payload[1]);
|
||||
// release acknowledged packets
|
||||
free_acked_packets(&stream->tx, ack_seq);
|
||||
|
||||
// TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet
|
||||
}
|
||||
|
||||
// Do we have space for more data now?
|
||||
if (stream->tx.packet_count < MAX_WINDOW_SIZE
|
||||
&& !(stream->state & MSP_STATE_SHUTDOWN_LOCAL)
|
||||
&& !(stream->state & MSP_STATE_CLOSED)){
|
||||
stream->state|=MSP_STATE_DATAOUT;
|
||||
}
|
||||
|
||||
// make sure we attempt to process packets from this sock soon
|
||||
// TODO calculate based on congestion window
|
||||
stream->next_action = now;
|
||||
|
||||
if (len<MSP_PAYLOAD_PREAMBLE_SIZE)
|
||||
return 0;
|
||||
|
||||
stream->state |= MSP_STATE_RECEIVED_DATA;
|
||||
uint16_t seq = read_uint16(&payload[3]);
|
||||
|
||||
if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1)
|
||||
stream->next_ack = now;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user