Add API for msp from within the daemon

This commit is contained in:
Jeremy Lakeman 2016-03-16 10:31:09 +10:30
parent 0563879707
commit 76be37a04e
12 changed files with 345 additions and 41 deletions

View File

@ -139,6 +139,29 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Multiple replies can be used to respond with more. */
#define MDP_MAX_SID_REQUEST 59
#define MSP_PAYLOAD_PREAMBLE_SIZE 5
#define MSP_MESSAGE_SIZE 1024
//should be something like this...; (MDP_MTU - MSP_PAYLOAD_PREAMBLE_SIZE)
#define MSP_STATE_UNINITIALISED (0)
#define MSP_STATE_LISTENING (1<<0)
#define MSP_STATE_RECEIVED_DATA (1<<1)
#define MSP_STATE_RECEIVED_PACKET (1<<2)
#define MSP_STATE_SHUTDOWN_LOCAL (1<<3)
#define MSP_STATE_SHUTDOWN_REMOTE (1<<4)
// this connection is about to be free'd, release any other resources or references to the state
#define MSP_STATE_CLOSED (1<<5)
// something has gone wrong somewhere
#define MSP_STATE_ERROR (1<<6)
// is there space for sending more data?
#define MSP_STATE_DATAOUT (1<<7)
#define MSP_STATE_STOPPED (1<<8)
// stream timeout
#define MSP_TIMEOUT 10000
/* Maximum amount of audio to cram into a VoMP audio packet.
More lets us include preemptive retransmissions.
Less reduces the chance of packets getting lost, and reduces

View File

@ -43,8 +43,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define DEBUG_perror(FLAG,X) DEBUGF_perror(FLAG, "%s", (X))
#define DEBUG_argv(FLAG,X,ARGC,ARGV) do { if (IF_DEBUG(FLAG)) _DEBUG_TAG_argv(#FLAG, X, (ARGC), (ARGV)); } while (0)
#define D (DEBUG("D"), 1)
#define T (IF_DEBUG(trace) ? (DEBUG("T"), 1) : 1)
#define D(FLAG) DEBUG(FLAG, "D")
#define T DEBUG(trace, "T")
/* These IDEBUG macros use the IF_IDEBUG(IND) macro as the conditional.
*

View File

@ -43,5 +43,6 @@ HDRS= fifo.h \
mdp_client.h \
msp_client.h \
msp_common.h \
msp_server.h \
radio_link.h \
sqlite-amalgamation-3100200/sqlite3.h

View File

@ -442,6 +442,8 @@ static int process_sock(struct msp_sock *sock)
}
call_handler(sock, NULL, 0);
sock->stream.next_action = sock->stream.timeout;
if (sock->handler && sock->stream.next_action > sock->last_handler + HANDLER_KEEPALIVE)
sock->stream.next_action = sock->last_handler + HANDLER_KEEPALIVE;
@ -499,18 +501,6 @@ static int process_sock(struct msp_sock *sock)
if (sock->stream.next_action > sock->stream.next_ack)
sock->stream.next_action = sock->stream.next_ack;
// when we've delivered all local packets
// and all our data packets have been acked, close.
if ( (sock->stream.state & MSP_STATE_SHUTDOWN_LOCAL)
&& (sock->stream.state & MSP_STATE_SHUTDOWN_REMOTE)
&& sock->stream.tx.packet_count == 0
&& sock->stream.rx.packet_count == 0
&& sock->stream.previous_ack == sock->stream.rx.next_seq -1
){
sock->stream.state |= MSP_STATE_CLOSED;
return -1;
}
return 0;
}

View File

@ -30,9 +30,6 @@
# endif
#endif
#define MSP_PAYLOAD_PREAMBLE_SIZE 5
#define MSP_MESSAGE_SIZE (MDP_MTU - MSP_MESSAGE_PREAMBLE_SIZE)
typedef uint16_t msp_state_t;
struct msp_sock;
@ -51,22 +48,6 @@ int msp_socket_is_valid(MSP_SOCKET);
// socket lifecycle
msp_state_t msp_get_state(MSP_SOCKET sock);
#define MSP_STATE_UNINITIALISED ((msp_state_t) 0)
#define MSP_STATE_LISTENING ((msp_state_t) (1<<0))
#define MSP_STATE_RECEIVED_DATA ((msp_state_t) (1<<1))
#define MSP_STATE_RECEIVED_PACKET ((msp_state_t) (1<<2))
#define MSP_STATE_SHUTDOWN_LOCAL ((msp_state_t) (1<<3))
#define MSP_STATE_SHUTDOWN_REMOTE ((msp_state_t) (1<<4))
// this connection is about to be free'd, release any other resources or references to the state
#define MSP_STATE_CLOSED ((msp_state_t) (1<<5))
// something has gone wrong somewhere
#define MSP_STATE_ERROR ((msp_state_t) (1<<6))
// is there space for sending more data?
#define MSP_STATE_DATAOUT ((msp_state_t) (1<<7))
#define MSP_STATE_STOPPED ((msp_state_t) (1<<8))
// stream timeout
#define MSP_TIMEOUT 10000
int msp_socket_is_initialising(MSP_SOCKET);
int msp_socket_is_open(MSP_SOCKET);

View File

@ -9,6 +9,8 @@
#define RETRANSMIT_TIME 1500
#define HANDLER_KEEPALIVE 1000
typedef uint16_t msp_state_t;
struct msp_packet{
struct msp_packet *_next;
uint16_t seq;
@ -183,7 +185,7 @@ static size_t msp_write_preamble(uint8_t *header, struct msp_stream *stream, str
DEBUGF(msp, "With packet flags %02x seq %02x len %zd",
header[0], packet->seq, packet->len);
packet->sent = stream->tx.last_activity;
return 5;
return MSP_PAYLOAD_PREAMBLE_SIZE;
}
static ssize_t msp_stream_send(struct msp_stream *stream, const uint8_t *payload, size_t len)
@ -231,9 +233,20 @@ static int msp_stream_process(struct msp_stream *stream)
stream->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR);
return WHY("MSP socket timed out");
}
stream->next_action = stream->timeout;
if (stream->state & MSP_STATE_LISTENING)
return 1;
// when we've delivered all local packets
// and all our data packets have been acked, close.
if ( (stream->state & MSP_STATE_SHUTDOWN_LOCAL)
&& (stream->state & MSP_STATE_SHUTDOWN_REMOTE)
&& stream->tx.packet_count == 0
&& stream->rx.packet_count == 0
&& stream->previous_ack == stream->rx.next_seq -1
)
stream->state |= MSP_STATE_CLOSED;
return 0;
}
@ -261,6 +274,16 @@ static void msp_consume_packet(struct msp_stream *stream, struct msp_packet *pac
free_acked_packets(&stream->rx, stream->rx.next_seq);
stream->rx.next_seq++;
// when we've delivered all local packets
// and all our data packets have been acked, close.
if ( (stream->state & MSP_STATE_SHUTDOWN_LOCAL)
&& (stream->state & MSP_STATE_SHUTDOWN_REMOTE)
&& stream->tx.packet_count == 0
&& stream->rx.packet_count == 0
&& stream->previous_ack == stream->rx.next_seq -1
)
stream->state |= MSP_STATE_CLOSED;
}
static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload, size_t len)
@ -310,9 +333,10 @@ static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload,
stream->state |= MSP_STATE_RECEIVED_DATA;
uint16_t seq = read_uint16(&payload[3]);
if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1)
stream->next_ack = now;
if (compare_wrapped_uint16(seq, stream->rx.next_seq)>=0){
if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1)
stream->next_ack = now;
}
return 0;
}

View File

@ -263,7 +263,7 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay
conn->last_state=state;
if (state&MSP_STATE_DATAOUT)
if (state & MSP_STATE_DATAOUT)
try_send(conn);
if (state & MSP_STATE_CLOSED){

251
msp_server.c Normal file
View File

@ -0,0 +1,251 @@
#include "conf.h"
#include "mem.h"
#include "overlay_packet.h"
#include "overlay_address.h"
#include "overlay_buffer.h"
#include "msp_server.h"
#include "dataformats.h"
#include "msp_common.h"
struct msp_server_state{
struct msp_server_state *_next;
struct msp_stream stream;
struct subscriber *local_sid;
mdp_port_t local_port;
struct subscriber *remote_sid;
mdp_port_t remote_port;
};
static struct msp_server_state *msp_create(
struct msp_server_state **root,
struct subscriber *remote_sid, mdp_port_t remote_port,
struct subscriber *local_sid, mdp_port_t local_port)
{
struct msp_server_state *state = (struct msp_server_state *)emalloc_zero(sizeof(struct msp_server_state));
msp_stream_init(&state->stream);
state->remote_sid = remote_sid;
state->remote_port = remote_port;
state->local_sid = local_sid;
state->local_port = local_port;
state->_next = (*root);
(*root) = state;
return state;
}
struct msp_server_state * msp_find_or_connect(
struct msp_server_state **root,
struct subscriber *remote_sid, mdp_port_t remote_port,
struct subscriber *local_sid, mdp_port_t local_port)
{
struct msp_server_state *state = (*root);
while(state){
if (state->remote_sid == remote_sid && state->remote_port == remote_port)
break;
state = state->_next;
}
if (!state){
state = msp_create(root, remote_sid, remote_port, local_sid, local_port);
state->stream.state|=MSP_STATE_DATAOUT;
// make sure we send a FIRST packet soon
state->stream.next_action = state->stream.next_ack = gettime_ms()+10;
}
return state;
}
int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator)
{
iterator->_next = NULL;
iterator->_root = root;
return 0;
}
time_ms_t msp_iterator_close(struct msp_iterator *iterator)
{
struct msp_server_state **ptr = iterator->_root;
time_ms_t next_action = TIME_MS_NEVER_WILL;
while(*ptr){
struct msp_server_state *p = (*ptr);
if (p->stream.state & MSP_STATE_CLOSED){
*ptr = p->_next;
free_all_packets(&p->stream.tx);
free_all_packets(&p->stream.rx);
free(p);
}else{
if (p->stream.next_action < next_action)
next_action = p->stream.next_action;
ptr = &p->_next;
}
}
return next_action;
}
static void send_frame(struct msp_server_state *state, struct overlay_buffer *payload)
{
struct internal_mdp_header response_header;
bzero(&response_header, sizeof(response_header));
response_header.source = state->local_sid;
response_header.source_port = state->local_port;
response_header.destination = state->remote_sid;
response_header.destination_port = state->remote_port;
overlay_send_frame(&response_header, payload);
ob_free(payload);
}
static void send_packet(struct msp_server_state *state, struct msp_packet *packet)
{
struct overlay_buffer *payload = ob_new();
uint8_t *msp_header = ob_append_space(payload, MSP_PAYLOAD_PREAMBLE_SIZE);
size_t len = msp_write_preamble(msp_header, &state->stream, packet);
assert(len == MSP_PAYLOAD_PREAMBLE_SIZE);
if (packet->len)
ob_append_bytes(payload, packet->payload, packet->len);
ob_flip(payload);
send_frame(state, payload);
}
static void send_ack(struct msp_server_state *state)
{
struct overlay_buffer *payload = ob_new();
uint8_t *msp_header = ob_append_space(payload, 3);
msp_write_ack_header(msp_header, &state->stream);
ob_flip(payload);
send_frame(state, payload);
}
struct msp_server_state * msp_process_next(struct msp_iterator *iterator)
{
time_ms_t now = gettime_ms();
struct msp_server_state *ptr = iterator->_next;
while(1){
if (ptr){
struct msp_packet *packet = ptr->stream.tx._head;
ptr->stream.next_action = ptr->stream.timeout;
while(packet){
if (packet->sent + RETRANSMIT_TIME < now)
// (re)transmit this packet
send_packet(ptr, packet);
if (ptr->stream.next_action > packet->sent + RETRANSMIT_TIME)
ptr->stream.next_action = packet->sent + RETRANSMIT_TIME;
packet=packet->_next;
}
// should we send an ack now without sending a payload?
if (now > ptr->stream.next_ack)
send_ack(ptr);
if (ptr->stream.next_action > ptr->stream.next_ack)
ptr->stream.next_action = ptr->stream.next_ack;
ptr = ptr->_next;
}else{
ptr = *iterator->_root;
}
if (!ptr){
iterator->_next = ptr;
return NULL;
}
msp_stream_process(&ptr->stream);
if (ptr->stream.state & MSP_STATE_DATAOUT){
iterator->_next = ptr;
return ptr;
}
}
}
int msp_send_packet(struct msp_server_state *state, const uint8_t *payload, size_t len)
{
return msp_stream_send(&state->stream, payload, len);
}
int msp_shutdown_stream(struct msp_server_state *state)
{
msp_stream_shutdown(&state->stream);
return 0;
}
struct msp_server_state * msp_find_and_process(struct msp_server_state **root, const struct internal_mdp_header *header, struct overlay_buffer *payload)
{
if (ob_remaining(payload)<1){
WHY("Expected at least 1 byte");
return NULL;
}
struct msp_server_state *state = (*root);
while(state){
if (state->remote_sid == header->source && state->remote_port == header->source_port)
break;
state = state->_next;
}
int flags = ob_peek(payload);
if (!state && (flags & FLAG_FIRST))
state = msp_create(root, header->source, header->source_port, header->destination, header->destination_port);
if (!state){
if (!(flags & FLAG_STOP)){
struct internal_mdp_header response_header;
bzero(&response_header, sizeof(response_header));
mdp_init_response(header, &response_header);
uint8_t response = FLAG_STOP;
struct overlay_buffer * response_payload = ob_static(&response, 1);
ob_limitsize(response_payload, 1);
overlay_send_frame(&response_header, response_payload);
ob_free(response_payload);
}
return NULL;
}
msp_process_packet(&state->stream, ob_current_ptr(payload), ob_remaining(payload));
return state;
}
struct msp_packet *msp_recv_next(struct msp_server_state *state)
{
return msp_stream_next(&state->stream);
}
struct overlay_buffer *msp_unpack(struct msp_server_state *UNUSED(state), struct msp_packet *packet)
{
if (packet->offset >= packet->len)
return NULL;
struct overlay_buffer *payload = ob_static(packet->payload + packet->offset, packet->len - packet->offset);
ob_limitsize(payload, packet->len - packet->offset);
return payload;
}
void msp_consumed(struct msp_server_state *state, struct msp_packet *packet, struct overlay_buffer *payload)
{
msp_consume_packet(&state->stream, packet, payload ? ob_position(payload) : 0);
if (payload)
ob_free(payload);
}
time_ms_t msp_next_action(struct msp_server_state *state)
{
return state->stream.next_action;
}
struct subscriber * msp_remote_peer(struct msp_server_state *state)
{
return state->remote_sid;
}
int msp_can_send(struct msp_server_state *state)
{
return (state->stream.state & MSP_STATE_DATAOUT) ? 1 : 0;
}

33
msp_server.h Normal file
View File

@ -0,0 +1,33 @@
#ifndef __SERVAL_DNA__MSP_SERVER_H
#define __SERVAL_DNA__MSP_SERVER_H
struct msp_server_state;
struct msp_packet;
struct msp_iterator{
struct msp_server_state *_next;
struct msp_server_state **_root;
};
struct msp_server_state * msp_find_or_connect(
struct msp_server_state **root,
struct subscriber *remote_sid, mdp_port_t remote_port,
struct subscriber *local_sid, mdp_port_t local_port);
struct msp_server_state * msp_find_and_process(struct msp_server_state **root, const struct internal_mdp_header *header, struct overlay_buffer *payload);
struct msp_packet *msp_recv_next(struct msp_server_state *state);
struct overlay_buffer *msp_unpack(struct msp_server_state *state, struct msp_packet *packet);
void msp_consumed(struct msp_server_state *state, struct msp_packet *packet, struct overlay_buffer *payload);
time_ms_t msp_next_action(struct msp_server_state *state);
struct subscriber * msp_remote_peer(struct msp_server_state *state);
int msp_can_send(struct msp_server_state *state);
int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator);
time_ms_t msp_iterator_close(struct msp_iterator *iterator);
struct msp_server_state * msp_process_next(struct msp_iterator *iterator);
int msp_send_packet(struct msp_server_state *state, const uint8_t *payload, size_t len);
int msp_shutdown_stream(struct msp_server_state *state);
#endif

View File

@ -20,8 +20,8 @@
#ifndef __SERVAL_DNA__OVERLAY_PACKET_H
#define __SERVAL_DNA__OVERLAY_PACKET_H
#include "overlay_address.h"
#include "serval_types.h"
#include "overlay_address.h"
#include "section.h"
#define FRAME_NOT_SENT -1

View File

@ -165,7 +165,7 @@ int _overlay_payload_enqueue(struct __sourceloc __whence, struct overlay_frame *
return WHY("Packet content overrun -- not queueing");
if (ob_position(p->payload) >= MDP_MTU)
FATAL("Queued packet is too big");
FATALF("Queued packet len %u is too big", ob_position(p->payload));
if (queue->length>=queue->maxLength)
return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength);

View File

@ -77,6 +77,7 @@ SERVAL_DAEMON_SOURCES = \
overlay_mdp.c \
overlay_mdp_services.c \
mdp_filter.c \
msp_server.c \
overlay_olsr.c \
overlay_packetformats.c \
overlay_payload.c \