serval-dna/msp_client.c
2014-05-13 08:55:57 +09:30

695 lines
18 KiB
C

/*
Mesh Streaming Protocol (MSP)
Copyright (C) 2013-2014 Serval Project Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <assert.h>
#include <inttypes.h>
#include "serval.h"
#include "conf.h"
#include "mdp_client.h"
#include "msp_client.h"
#include "str.h"
#include "dataformats.h"
#include "socket.h"
#include "log.h"
#define FLAG_SHUTDOWN (1<<0)
#define FLAG_ACK (1<<1)
#define RETRANSMIT_TIME 1500
struct msp_packet{
struct msp_packet *_next;
uint16_t seq;
uint8_t flags;
time_ms_t added;
time_ms_t sent;
const uint8_t *payload;
size_t len;
};
#define MAX_WINDOW_SIZE 4
struct msp_window{
int packet_count;
uint32_t base_rtt;
uint32_t rtt;
uint16_t next_seq; // seq of next expected TX or RX packet.
time_ms_t last_activity;
struct msp_packet *_head, *_tail;
};
struct msp_sock{
struct msp_sock *_next;
struct msp_sock *_prev;
int mdp_sock;
msp_state_t state;
struct msp_window tx;
struct msp_window rx;
uint16_t previous_ack;
time_ms_t next_ack;
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context);
void *context;
struct mdp_header header;
time_ms_t timeout;
time_ms_t next_action;
};
struct msp_sock *root=NULL;
struct msp_sock * msp_socket(int mdp_sock)
{
struct msp_sock *ret = emalloc_zero(sizeof(struct msp_sock));
ret->mdp_sock = mdp_sock;
ret->state = MSP_STATE_UNINITIALISED;
ret->_next = root;
// TODO set base rtt to ensure that we send the first packet a few times before giving up
ret->tx.base_rtt = ret->tx.rtt = 0xFFFFFFFF;
ret->tx.last_activity = TIME_MS_NEVER_HAS;
ret->rx.last_activity = TIME_MS_NEVER_HAS;
ret->next_action = TIME_MS_NEVER_WILL;
ret->timeout = gettime_ms() + 10000;
ret->previous_ack = 0x7FFF;
if (root)
root->_prev=ret;
root = ret;
return ret;
}
unsigned msp_socket_count()
{
unsigned i=0;
struct msp_sock *p=root;
while(p){
i++;
p=p->_next;
}
return i;
}
void msp_debug()
{
time_ms_t now = gettime_ms();
struct msp_sock *p=root;
DEBUGF("Msp sockets;");
while(p){
DEBUGF("State %d, from %s:%d to %s:%d, next %"PRId64"ms, ack %"PRId64"ms timeout %"PRId64"ms",
p->state,
alloca_tohex_sid_t(p->header.local.sid), p->header.local.port,
alloca_tohex_sid_t(p->header.remote.sid), p->header.remote.port,
(p->next_action - now),
(p->next_ack - now),
(p->timeout - now));
p=p->_next;
}
}
static void free_all_packets(struct msp_window *window)
{
struct msp_packet *p = window->_head;
while(p){
struct msp_packet *free_me=p;
p=p->_next;
if (free_me->payload)
free((void *)free_me->payload);
free(free_me);
}
}
static void free_acked_packets(struct msp_window *window, uint16_t seq)
{
if (!window->_head)
return;
struct msp_packet *p = window->_head;
uint32_t rtt=0;
time_ms_t now = gettime_ms();
while(p && compare_wrapped_uint16(p->seq, seq)<=0){
if (p->sent)
rtt = now - p->sent;
struct msp_packet *free_me=p;
p=p->_next;
if (free_me->payload)
free((void *)free_me->payload);
free(free_me);
window->packet_count--;
}
window->_head = p;
if (rtt){
if (rtt < 10)
rtt=10;
window->rtt = rtt;
if (window->base_rtt > rtt)
window->base_rtt = rtt;
if (config.debug.msp)
DEBUGF("RTT %u, base %u", rtt, window->base_rtt);
}
if (!p)
window->_tail = NULL;
}
static void msp_free(struct msp_sock *sock)
{
sock->state |= MSP_STATE_CLOSED;
// remove from the list first
if (sock->_prev)
sock->_prev->_next = sock->_next;
else
root=sock->_next;
if (sock->_next)
sock->_next->_prev = sock->_prev;
// last chance to free other resources
if (sock->handler)
sock->handler(sock, sock->state, NULL, 0, sock->context);
free_all_packets(&sock->tx);
free_all_packets(&sock->rx);
free(sock);
}
void msp_close(struct msp_sock *sock)
{
// TODO if never sent / received, just free it
sock->state |= MSP_STATE_CLOSED;
}
void msp_close_all(int mdp_sock)
{
struct msp_sock *p = root;
while(p){
struct msp_sock *sock=p;
p=p->_next;
if (sock->mdp_sock == mdp_sock)
msp_free(sock);
}
}
int msp_set_handler(struct msp_sock *sock,
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context),
void *context)
{
sock->handler = handler;
sock->context = context;
return 0;
}
msp_state_t msp_get_state(struct msp_sock *sock)
{
return sock->state;
}
int msp_set_local(struct msp_sock *sock, struct mdp_sockaddr local)
{
assert(sock->state == MSP_STATE_UNINITIALISED);
sock->header.local = local;
return 0;
}
int msp_set_remote(struct msp_sock *sock, struct mdp_sockaddr remote)
{
assert(sock->state == MSP_STATE_UNINITIALISED);
sock->header.remote = remote;
sock->state|=MSP_STATE_DATAOUT;
// make sure we send a packet soon
sock->next_ack = gettime_ms()+10;
sock->next_action = sock->next_ack;
return 0;
}
int msp_listen(struct msp_sock *sock)
{
assert(sock->state == MSP_STATE_UNINITIALISED);
assert(sock->header.local.port);
sock->state |= MSP_STATE_LISTENING;
sock->header.flags |= MDP_FLAG_BIND;
if (mdp_send(sock->mdp_sock, &sock->header, NULL, 0)==-1){
sock->state|=MSP_STATE_ERROR|MSP_STATE_CLOSED;
return -1;
}
sock->timeout = gettime_ms()+1000;
sock->next_action = sock->timeout;
return 0;
}
int msp_get_remote_adr(struct msp_sock *sock, struct mdp_sockaddr *remote)
{
*remote = sock->header.remote;
return 0;
}
static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags,
const uint8_t *payload, size_t len)
{
struct msp_packet **insert_pos=NULL;
if (!window->_head){
insert_pos = &window->_head;
}else{
if (window->_tail->seq == seq){
// ignore duplicate packets
return 0;
}else if (compare_wrapped_uint16(window->_tail->seq, seq)<0){
if (compare_wrapped_uint16(window->_head->seq, seq)>0){
// this is ambiguous
return WHYF("%04x is both < tail (%04x) and > head (%04x)", seq, window->_tail->seq, window->_head->seq);
}
insert_pos = &window->_tail->_next;
}else{
insert_pos = &window->_head;
while(compare_wrapped_uint16((*insert_pos)->seq, seq)<0)
insert_pos = &(*insert_pos)->_next;
if ((*insert_pos)->seq == seq){
// ignore duplicate packets
return 0;
}
}
}
struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet));
if (!packet)
return -1;
packet->_next = (*insert_pos);
*insert_pos = packet;
if (!packet->_next)
window->_tail = packet;
packet->added = gettime_ms();
packet->seq = seq;
packet->flags = flags;
packet->len = len;
if (payload && len){
uint8_t *p = emalloc(len);
if (!p){
free(packet);
return -1;
}
packet->payload = p;
bcopy(payload, p, len);
}
window->packet_count++;
return 1;
}
struct socket_address daemon_addr={.addrlen=0,};
static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
{
assert(sock->header.remote.port);
if (daemon_addr.addrlen == 0){
if (make_local_sockaddr(&daemon_addr, "mdp.2.socket") == -1)
return -1;
}
uint8_t msp_header[5];
msp_header[0]=packet->flags;
// only set the ack flag if we've received a sequenced packet
if (sock->state & MSP_STATE_RECEIVED_DATA)
msp_header[0]|=FLAG_ACK;
write_uint16(&msp_header[1], sock->rx.next_seq);
write_uint16(&msp_header[3], packet->seq);
sock->previous_ack = sock->rx.next_seq;
struct fragmented_data data={
.fragment_count=3,
.iov={
{
.iov_base = (void*)&sock->header,
.iov_len = sizeof(struct mdp_header)
},
{
.iov_base = &msp_header,
.iov_len = sizeof(msp_header)
},
{
.iov_base = (void*)packet->payload,
.iov_len = packet->len
}
}
};
// allow for sending an empty payload body
if (!(packet->payload && packet->len))
data.fragment_count --;
ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data);
if (r==-1){
if (errno==11)
return 1;
msp_close_all(sock->mdp_sock);
return -1;
}
if (config.debug.msp)
DEBUGF("Sent packet seq %02x len %zd (acked %02x)", packet->seq, packet->len, sock->rx.next_seq);
sock->tx.last_activity = packet->sent = gettime_ms();
sock->next_ack = packet->sent + RETRANSMIT_TIME;
return 0;
}
static int send_ack(struct msp_sock *sock)
{
assert(sock->header.remote.port);
if (daemon_addr.addrlen == 0){
if (make_local_sockaddr(&daemon_addr, "mdp.2.socket") == -1)
return -1;
}
uint8_t msp_header[3];
msp_header[0]=0;
// if we haven't heard a sequence number, we can't ack data
// (but we can indicate the existence of the connection)
if (sock->state & MSP_STATE_RECEIVED_DATA)
msp_header[0]|=FLAG_ACK;
write_uint16(&msp_header[1], sock->rx.next_seq);
struct fragmented_data data={
.fragment_count=2,
.iov={
{
.iov_base = (void*)&sock->header,
.iov_len = sizeof(struct mdp_header)
},
{
.iov_base = &msp_header,
.iov_len = sizeof(msp_header)
}
}
};
ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data);
if (r==-1){
if (errno!=11)
msp_close_all(sock->mdp_sock);
return -1;
}
if (config.debug.msp)
DEBUGF("Sent packet (acked %02x)", sock->rx.next_seq);
sock->previous_ack = sock->rx.next_seq;
sock->tx.last_activity = gettime_ms();
sock->next_ack = sock->tx.last_activity + RETRANSMIT_TIME;
return 0;
}
// add a packet to the transmit buffer
int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len)
{
assert(sock->header.remote.port);
assert((sock->state & MSP_STATE_SHUTDOWN_LOCAL)==0);
if (sock->tx.packet_count > MAX_WINDOW_SIZE)
return -1;
if (add_packet(&sock->tx, sock->tx.next_seq, 0, payload, len)==-1)
return -1;
sock->tx.next_seq++;
if (sock->tx.packet_count>=MAX_WINDOW_SIZE)
sock->state&=~MSP_STATE_DATAOUT;
// make sure we attempt to process packets from this sock soon
// TODO calculate based on congestion window
sock->next_action = gettime_ms();
return 0;
}
int msp_shutdown(struct msp_sock *sock)
{
assert(!(sock->state&MSP_STATE_SHUTDOWN_LOCAL));
if (sock->tx._tail && sock->tx._tail->sent==0){
sock->tx._tail->flags |= FLAG_SHUTDOWN;
}else{
if (add_packet(&sock->tx, sock->tx.next_seq, FLAG_SHUTDOWN, NULL, 0)==-1)
return -1;
sock->tx.next_seq++;
}
sock->state|=MSP_STATE_SHUTDOWN_LOCAL;
sock->state&=~MSP_STATE_DATAOUT;
// make sure we send a packet soon
sock->next_action = gettime_ms();
return 0;
}
static int process_sock(struct msp_sock *sock)
{
time_ms_t now = gettime_ms();
if (sock->timeout < now){
WHY("MSP socket timed out");
sock->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR);
return -1;
}
sock->next_action = sock->timeout;
if (sock->state & MSP_STATE_LISTENING)
return 0;
struct msp_packet *p;
// deliver packets that have now arrived in order
p = sock->rx._head;
// TODO ... ? (sock->state & MSP_STATE_POLLIN)
while(p && p->seq == sock->rx.next_seq){
struct msp_packet *packet=p;
// process packet flags when we are about to deliver the last packet
if (packet->flags & FLAG_SHUTDOWN)
sock->state|=MSP_STATE_SHUTDOWN_REMOTE;
if (sock->handler){
int r = sock->handler(sock, sock->state, packet->payload, packet->len, sock->context);
if (r==-1){
sock->state |= MSP_STATE_CLOSED;
return -1;
}
// keep the packet if the handler refused to accept it.
if (r){
sock->next_action=gettime_ms()+1;
break;
}
}
p=p->_next;
sock->rx.next_seq++;
}
free_acked_packets(&sock->rx, sock->rx.next_seq -1);
// transmit packets that can now be sent
p = sock->tx._head;
while(p){
if (p->sent==0 || p->sent + RETRANSMIT_TIME < now){
if (!sock->header.local.port){
if (sock->header.flags & MDP_FLAG_BIND)
// wait until we have heard back from the daemon with our port number before sending another packet.
break;
sock->header.flags |= MDP_FLAG_BIND;
}
int r = msp_send_packet(sock, p);
if (r==-1)
return -1;
if (r)
break;
}
if (sock->next_action > p->sent + RETRANSMIT_TIME)
sock->next_action = p->sent + RETRANSMIT_TIME;
p=p->_next;
}
// should we send an ack now without sending a payload?
if (now > sock->next_ack){
if (!sock->header.local.port){
if (sock->header.flags & MDP_FLAG_BIND)
// wait until we have heard back from the daemon with our port number before sending another packet.
return 0;
sock->header.flags |= MDP_FLAG_BIND;
}
int r = send_ack(sock);
if (r==-1)
return -1;
}
if (sock->next_action > sock->next_ack)
sock->next_action = sock->next_ack;
if (sock->state & MSP_STATE_SHUTDOWN_LOCAL
&& sock->state & MSP_STATE_SHUTDOWN_REMOTE
&& sock->tx.packet_count == 0
&& sock->rx.packet_count == 0
&& sock->previous_ack == sock->rx.next_seq){
sock->state |= MSP_STATE_CLOSED;
return -1;
}
return 0;
}
int msp_processing(time_ms_t *next_action)
{
*next_action=TIME_MS_NEVER_WILL;
struct msp_sock *sock = root;
time_ms_t now = gettime_ms();
while(sock){
if (!(sock->state & MSP_STATE_CLOSED)
&& sock->next_action <= now){
// this might cause the socket to be closed.
if (process_sock(sock)==0){
// remember the time of the next thing we need to do.
if (sock->next_action < *next_action)
*next_action=sock->next_action;
}
}else if (sock->next_action < *next_action)
*next_action=sock->next_action;
if (sock->state & MSP_STATE_CLOSED){
struct msp_sock *s = sock->_next;
msp_free(sock);
sock=s;
}else{
sock = sock->_next;
}
}
return 0;
}
static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t *payload, size_t len)
{
// any kind of error reported by the daemon, close all related msp connections
if (header->flags & MDP_FLAG_ERROR){
WHY("Error returned from daemon");
msp_close_all(mdp_sock);
return -1;
}
// find or create mdp_sock...
struct msp_sock *sock=NULL;
{
struct msp_sock *s=root;
struct msp_sock *listen=NULL;
while(s){
if (s->mdp_sock == mdp_sock ){
if ((s->header.flags & MDP_FLAG_BIND) && (header->flags & MDP_FLAG_BIND)){
// process bind response from the daemon
s->header.local = header->local;
s->header.flags &= ~MDP_FLAG_BIND;
if (config.debug.msp)
DEBUGF("Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port);
s->next_action = gettime_ms();
if (s->state & MSP_STATE_LISTENING)
s->timeout = TIME_MS_NEVER_WILL;
return 0;
}
if (s->state & MSP_STATE_LISTENING){
// remember any matching listen socket so we can create a connection on first use
if (s->header.local.port == header->local.port
&& (is_sid_t_any(s->header.local.sid)
|| memcmp(&s->header.local.sid, &header->local.sid, SID_SIZE)==0))
listen=s;
}else if (memcmp(&s->header.remote, &header->remote, sizeof header->remote)==0
&& memcmp(&s->header.local, &header->local, sizeof header->local)==0){
// if the addresses match, we found it.
sock=s;
break;
}
}
s = s->_next;
}
if (listen && !sock){
// create a new socket for the incoming connection
sock = msp_socket(listen->mdp_sock);
sock->header = *header;
// use the same handler initially
sock->handler = listen->handler;
sock->context = listen->context;
}
if (!sock){
WARNF("Unexpected packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port);
// TODO reply with shutdown ack to forcefully break the connection?
return 0;
}
}
if (len<3)
return WHY("Expected at least 3 bytes");
sock->rx.last_activity = gettime_ms();
sock->timeout = sock->rx.last_activity + 10000;
uint8_t flags = payload[0];
if (flags & FLAG_ACK){
uint16_t ack_seq = read_uint16(&payload[1]);
// release acknowledged packets
free_acked_packets(&sock->tx, ack_seq);
// TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet
}
if (sock->tx.packet_count < MAX_WINDOW_SIZE
&& !(sock->state & MSP_STATE_DATAOUT)
&& !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)){
sock->state|=MSP_STATE_DATAOUT;
if (sock->handler)
sock->handler(sock, sock->state, NULL, 0, sock->context);
}
// make sure we attempt to process packets from this sock soon
// TODO calculate based on congestion window
sock->state |= MSP_STATE_RECEIVED_PACKET;
sock->next_action = gettime_ms();
if (len<5)
return 0;
sock->state |= MSP_STATE_RECEIVED_DATA;
uint16_t seq = read_uint16(&payload[3]);
if (add_packet(&sock->rx, seq, flags, &payload[5], len - 5)==1)
sock->next_ack = gettime_ms();
return 0;
}
int msp_recv(int mdp_sock)
{
struct mdp_header header;
uint8_t payload[1200];
ssize_t len = mdp_recv(mdp_sock, &header, payload, sizeof(payload));
if (len<0)
return -1;
return process_packet(mdp_sock, &header, payload, len);
}