diff --git a/constants.h b/constants.h index 1b44e39f..1d5797d2 100644 --- a/constants.h +++ b/constants.h @@ -139,6 +139,29 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. Multiple replies can be used to respond with more. */ #define MDP_MAX_SID_REQUEST 59 + +#define MSP_PAYLOAD_PREAMBLE_SIZE 5 +#define MSP_MESSAGE_SIZE 1024 +//should be something like this...; (MDP_MTU - MSP_PAYLOAD_PREAMBLE_SIZE) + +#define MSP_STATE_UNINITIALISED (0) +#define MSP_STATE_LISTENING (1<<0) +#define MSP_STATE_RECEIVED_DATA (1<<1) +#define MSP_STATE_RECEIVED_PACKET (1<<2) +#define MSP_STATE_SHUTDOWN_LOCAL (1<<3) +#define MSP_STATE_SHUTDOWN_REMOTE (1<<4) +// this connection is about to be free'd, release any other resources or references to the state +#define MSP_STATE_CLOSED (1<<5) +// something has gone wrong somewhere +#define MSP_STATE_ERROR (1<<6) +// is there space for sending more data? +#define MSP_STATE_DATAOUT (1<<7) +#define MSP_STATE_STOPPED (1<<8) + +// stream timeout +#define MSP_TIMEOUT 10000 + + /* Maximum amount of audio to cram into a VoMP audio packet. More lets us include preemptive retransmissions. Less reduces the chance of packets getting lost, and reduces diff --git a/debug.h b/debug.h index a78f219a..69177022 100644 --- a/debug.h +++ b/debug.h @@ -43,8 +43,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #define DEBUG_perror(FLAG,X) DEBUGF_perror(FLAG, "%s", (X)) #define DEBUG_argv(FLAG,X,ARGC,ARGV) do { if (IF_DEBUG(FLAG)) _DEBUG_TAG_argv(#FLAG, X, (ARGC), (ARGV)); } while (0) -#define D (DEBUG("D"), 1) -#define T (IF_DEBUG(trace) ? (DEBUG("T"), 1) : 1) +#define D(FLAG) DEBUG(FLAG, "D") +#define T DEBUG(trace, "T") /* These IDEBUG macros use the IF_IDEBUG(IND) macro as the conditional. * diff --git a/headerfiles.mk b/headerfiles.mk index 8d851ce1..d4bbebdc 100644 --- a/headerfiles.mk +++ b/headerfiles.mk @@ -43,5 +43,6 @@ HDRS= fifo.h \ mdp_client.h \ msp_client.h \ msp_common.h \ + msp_server.h \ radio_link.h \ sqlite-amalgamation-3100200/sqlite3.h diff --git a/msp_client.c b/msp_client.c index 2ab1df11..2925c248 100644 --- a/msp_client.c +++ b/msp_client.c @@ -442,6 +442,8 @@ static int process_sock(struct msp_sock *sock) } call_handler(sock, NULL, 0); + + sock->stream.next_action = sock->stream.timeout; if (sock->handler && sock->stream.next_action > sock->last_handler + HANDLER_KEEPALIVE) sock->stream.next_action = sock->last_handler + HANDLER_KEEPALIVE; @@ -499,18 +501,6 @@ static int process_sock(struct msp_sock *sock) if (sock->stream.next_action > sock->stream.next_ack) sock->stream.next_action = sock->stream.next_ack; - // when we've delivered all local packets - // and all our data packets have been acked, close. - if ( (sock->stream.state & MSP_STATE_SHUTDOWN_LOCAL) - && (sock->stream.state & MSP_STATE_SHUTDOWN_REMOTE) - && sock->stream.tx.packet_count == 0 - && sock->stream.rx.packet_count == 0 - && sock->stream.previous_ack == sock->stream.rx.next_seq -1 - ){ - sock->stream.state |= MSP_STATE_CLOSED; - return -1; - } - return 0; } diff --git a/msp_client.h b/msp_client.h index 3f2c5b4d..9d65ba4c 100644 --- a/msp_client.h +++ b/msp_client.h @@ -30,9 +30,6 @@ # endif #endif -#define MSP_PAYLOAD_PREAMBLE_SIZE 5 -#define MSP_MESSAGE_SIZE (MDP_MTU - MSP_MESSAGE_PREAMBLE_SIZE) - typedef uint16_t msp_state_t; struct msp_sock; @@ -51,22 +48,6 @@ int msp_socket_is_valid(MSP_SOCKET); // socket lifecycle msp_state_t msp_get_state(MSP_SOCKET sock); -#define MSP_STATE_UNINITIALISED ((msp_state_t) 0) -#define MSP_STATE_LISTENING ((msp_state_t) (1<<0)) -#define MSP_STATE_RECEIVED_DATA ((msp_state_t) (1<<1)) -#define MSP_STATE_RECEIVED_PACKET ((msp_state_t) (1<<2)) -#define MSP_STATE_SHUTDOWN_LOCAL ((msp_state_t) (1<<3)) -#define MSP_STATE_SHUTDOWN_REMOTE ((msp_state_t) (1<<4)) -// this connection is about to be free'd, release any other resources or references to the state -#define MSP_STATE_CLOSED ((msp_state_t) (1<<5)) -// something has gone wrong somewhere -#define MSP_STATE_ERROR ((msp_state_t) (1<<6)) -// is there space for sending more data? -#define MSP_STATE_DATAOUT ((msp_state_t) (1<<7)) -#define MSP_STATE_STOPPED ((msp_state_t) (1<<8)) - -// stream timeout -#define MSP_TIMEOUT 10000 int msp_socket_is_initialising(MSP_SOCKET); int msp_socket_is_open(MSP_SOCKET); diff --git a/msp_common.h b/msp_common.h index 19a4b94d..e259bcbb 100644 --- a/msp_common.h +++ b/msp_common.h @@ -9,6 +9,8 @@ #define RETRANSMIT_TIME 1500 #define HANDLER_KEEPALIVE 1000 +typedef uint16_t msp_state_t; + struct msp_packet{ struct msp_packet *_next; uint16_t seq; @@ -183,7 +185,7 @@ static size_t msp_write_preamble(uint8_t *header, struct msp_stream *stream, str DEBUGF(msp, "With packet flags %02x seq %02x len %zd", header[0], packet->seq, packet->len); packet->sent = stream->tx.last_activity; - return 5; + return MSP_PAYLOAD_PREAMBLE_SIZE; } static ssize_t msp_stream_send(struct msp_stream *stream, const uint8_t *payload, size_t len) @@ -231,9 +233,20 @@ static int msp_stream_process(struct msp_stream *stream) stream->state |= (MSP_STATE_CLOSED|MSP_STATE_ERROR); return WHY("MSP socket timed out"); } - stream->next_action = stream->timeout; + if (stream->state & MSP_STATE_LISTENING) return 1; + + // when we've delivered all local packets + // and all our data packets have been acked, close. + if ( (stream->state & MSP_STATE_SHUTDOWN_LOCAL) + && (stream->state & MSP_STATE_SHUTDOWN_REMOTE) + && stream->tx.packet_count == 0 + && stream->rx.packet_count == 0 + && stream->previous_ack == stream->rx.next_seq -1 + ) + stream->state |= MSP_STATE_CLOSED; + return 0; } @@ -261,6 +274,16 @@ static void msp_consume_packet(struct msp_stream *stream, struct msp_packet *pac free_acked_packets(&stream->rx, stream->rx.next_seq); stream->rx.next_seq++; + + // when we've delivered all local packets + // and all our data packets have been acked, close. + if ( (stream->state & MSP_STATE_SHUTDOWN_LOCAL) + && (stream->state & MSP_STATE_SHUTDOWN_REMOTE) + && stream->tx.packet_count == 0 + && stream->rx.packet_count == 0 + && stream->previous_ack == stream->rx.next_seq -1 + ) + stream->state |= MSP_STATE_CLOSED; } static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload, size_t len) @@ -310,9 +333,10 @@ static int msp_process_packet(struct msp_stream *stream, const uint8_t *payload, stream->state |= MSP_STATE_RECEIVED_DATA; uint16_t seq = read_uint16(&payload[3]); - - if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1) - stream->next_ack = now; + if (compare_wrapped_uint16(seq, stream->rx.next_seq)>=0){ + if (add_packet(&stream->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1) + stream->next_ack = now; + } return 0; } diff --git a/msp_proxy.c b/msp_proxy.c index 19f17f3a..51e24e0b 100644 --- a/msp_proxy.c +++ b/msp_proxy.c @@ -263,7 +263,7 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay conn->last_state=state; - if (state&MSP_STATE_DATAOUT) + if (state & MSP_STATE_DATAOUT) try_send(conn); if (state & MSP_STATE_CLOSED){ diff --git a/msp_server.c b/msp_server.c new file mode 100644 index 00000000..eed6a2df --- /dev/null +++ b/msp_server.c @@ -0,0 +1,251 @@ + +#include "conf.h" +#include "mem.h" +#include "overlay_packet.h" +#include "overlay_address.h" +#include "overlay_buffer.h" +#include "msp_server.h" +#include "dataformats.h" + +#include "msp_common.h" + +struct msp_server_state{ + struct msp_server_state *_next; + struct msp_stream stream; + struct subscriber *local_sid; + mdp_port_t local_port; + struct subscriber *remote_sid; + mdp_port_t remote_port; +}; + +static struct msp_server_state *msp_create( + struct msp_server_state **root, + struct subscriber *remote_sid, mdp_port_t remote_port, + struct subscriber *local_sid, mdp_port_t local_port) +{ + struct msp_server_state *state = (struct msp_server_state *)emalloc_zero(sizeof(struct msp_server_state)); + msp_stream_init(&state->stream); + state->remote_sid = remote_sid; + state->remote_port = remote_port; + state->local_sid = local_sid; + state->local_port = local_port; + state->_next = (*root); + (*root) = state; + return state; +} + +struct msp_server_state * msp_find_or_connect( + struct msp_server_state **root, + struct subscriber *remote_sid, mdp_port_t remote_port, + struct subscriber *local_sid, mdp_port_t local_port) +{ + struct msp_server_state *state = (*root); + + while(state){ + if (state->remote_sid == remote_sid && state->remote_port == remote_port) + break; + state = state->_next; + } + + if (!state){ + state = msp_create(root, remote_sid, remote_port, local_sid, local_port); + state->stream.state|=MSP_STATE_DATAOUT; + // make sure we send a FIRST packet soon + state->stream.next_action = state->stream.next_ack = gettime_ms()+10; + } + return state; +} + +int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator) +{ + iterator->_next = NULL; + iterator->_root = root; + return 0; +} + +time_ms_t msp_iterator_close(struct msp_iterator *iterator) +{ + struct msp_server_state **ptr = iterator->_root; + time_ms_t next_action = TIME_MS_NEVER_WILL; + while(*ptr){ + struct msp_server_state *p = (*ptr); + if (p->stream.state & MSP_STATE_CLOSED){ + *ptr = p->_next; + free_all_packets(&p->stream.tx); + free_all_packets(&p->stream.rx); + free(p); + }else{ + if (p->stream.next_action < next_action) + next_action = p->stream.next_action; + ptr = &p->_next; + } + } + return next_action; +} + +static void send_frame(struct msp_server_state *state, struct overlay_buffer *payload) +{ + struct internal_mdp_header response_header; + bzero(&response_header, sizeof(response_header)); + + response_header.source = state->local_sid; + response_header.source_port = state->local_port; + response_header.destination = state->remote_sid; + response_header.destination_port = state->remote_port; + + overlay_send_frame(&response_header, payload); + ob_free(payload); +} + +static void send_packet(struct msp_server_state *state, struct msp_packet *packet) +{ + struct overlay_buffer *payload = ob_new(); + uint8_t *msp_header = ob_append_space(payload, MSP_PAYLOAD_PREAMBLE_SIZE); + size_t len = msp_write_preamble(msp_header, &state->stream, packet); + assert(len == MSP_PAYLOAD_PREAMBLE_SIZE); + if (packet->len) + ob_append_bytes(payload, packet->payload, packet->len); + ob_flip(payload); + send_frame(state, payload); +} + +static void send_ack(struct msp_server_state *state) +{ + struct overlay_buffer *payload = ob_new(); + uint8_t *msp_header = ob_append_space(payload, 3); + msp_write_ack_header(msp_header, &state->stream); + ob_flip(payload); + send_frame(state, payload); +} + +struct msp_server_state * msp_process_next(struct msp_iterator *iterator) +{ + time_ms_t now = gettime_ms(); + + struct msp_server_state *ptr = iterator->_next; + while(1){ + if (ptr){ + struct msp_packet *packet = ptr->stream.tx._head; + ptr->stream.next_action = ptr->stream.timeout; + + while(packet){ + if (packet->sent + RETRANSMIT_TIME < now) + // (re)transmit this packet + send_packet(ptr, packet); + + if (ptr->stream.next_action > packet->sent + RETRANSMIT_TIME) + ptr->stream.next_action = packet->sent + RETRANSMIT_TIME; + + packet=packet->_next; + } + + // should we send an ack now without sending a payload? + if (now > ptr->stream.next_ack) + send_ack(ptr); + + if (ptr->stream.next_action > ptr->stream.next_ack) + ptr->stream.next_action = ptr->stream.next_ack; + + ptr = ptr->_next; + }else{ + ptr = *iterator->_root; + } + + if (!ptr){ + iterator->_next = ptr; + return NULL; + } + msp_stream_process(&ptr->stream); + + if (ptr->stream.state & MSP_STATE_DATAOUT){ + iterator->_next = ptr; + return ptr; + } + } +} + +int msp_send_packet(struct msp_server_state *state, const uint8_t *payload, size_t len) +{ + + return msp_stream_send(&state->stream, payload, len); +} + +int msp_shutdown_stream(struct msp_server_state *state) +{ + msp_stream_shutdown(&state->stream); + return 0; +} + +struct msp_server_state * msp_find_and_process(struct msp_server_state **root, const struct internal_mdp_header *header, struct overlay_buffer *payload) +{ + if (ob_remaining(payload)<1){ + WHY("Expected at least 1 byte"); + return NULL; + } + + struct msp_server_state *state = (*root); + + while(state){ + if (state->remote_sid == header->source && state->remote_port == header->source_port) + break; + state = state->_next; + } + + int flags = ob_peek(payload); + if (!state && (flags & FLAG_FIRST)) + state = msp_create(root, header->source, header->source_port, header->destination, header->destination_port); + + if (!state){ + if (!(flags & FLAG_STOP)){ + struct internal_mdp_header response_header; + bzero(&response_header, sizeof(response_header)); + + mdp_init_response(header, &response_header); + uint8_t response = FLAG_STOP; + struct overlay_buffer * response_payload = ob_static(&response, 1); + ob_limitsize(response_payload, 1); + overlay_send_frame(&response_header, response_payload); + ob_free(response_payload); + } + return NULL; + } + + msp_process_packet(&state->stream, ob_current_ptr(payload), ob_remaining(payload)); + return state; +} + +struct msp_packet *msp_recv_next(struct msp_server_state *state) +{ + return msp_stream_next(&state->stream); +} + +struct overlay_buffer *msp_unpack(struct msp_server_state *UNUSED(state), struct msp_packet *packet) +{ + if (packet->offset >= packet->len) + return NULL; + struct overlay_buffer *payload = ob_static(packet->payload + packet->offset, packet->len - packet->offset); + ob_limitsize(payload, packet->len - packet->offset); + return payload; +} + +void msp_consumed(struct msp_server_state *state, struct msp_packet *packet, struct overlay_buffer *payload) +{ + msp_consume_packet(&state->stream, packet, payload ? ob_position(payload) : 0); + if (payload) + ob_free(payload); +} + +time_ms_t msp_next_action(struct msp_server_state *state) +{ + return state->stream.next_action; +} + +struct subscriber * msp_remote_peer(struct msp_server_state *state) +{ + return state->remote_sid; +} + +int msp_can_send(struct msp_server_state *state) +{ + return (state->stream.state & MSP_STATE_DATAOUT) ? 1 : 0; +} \ No newline at end of file diff --git a/msp_server.h b/msp_server.h new file mode 100644 index 00000000..4c15cd54 --- /dev/null +++ b/msp_server.h @@ -0,0 +1,33 @@ +#ifndef __SERVAL_DNA__MSP_SERVER_H +#define __SERVAL_DNA__MSP_SERVER_H + +struct msp_server_state; +struct msp_packet; + +struct msp_iterator{ + struct msp_server_state *_next; + struct msp_server_state **_root; +}; + +struct msp_server_state * msp_find_or_connect( + struct msp_server_state **root, + struct subscriber *remote_sid, mdp_port_t remote_port, + struct subscriber *local_sid, mdp_port_t local_port); + +struct msp_server_state * msp_find_and_process(struct msp_server_state **root, const struct internal_mdp_header *header, struct overlay_buffer *payload); + +struct msp_packet *msp_recv_next(struct msp_server_state *state); +struct overlay_buffer *msp_unpack(struct msp_server_state *state, struct msp_packet *packet); +void msp_consumed(struct msp_server_state *state, struct msp_packet *packet, struct overlay_buffer *payload); +time_ms_t msp_next_action(struct msp_server_state *state); +struct subscriber * msp_remote_peer(struct msp_server_state *state); +int msp_can_send(struct msp_server_state *state); + +int msp_iterator_open(struct msp_server_state **root, struct msp_iterator *iterator); +time_ms_t msp_iterator_close(struct msp_iterator *iterator); +struct msp_server_state * msp_process_next(struct msp_iterator *iterator); + +int msp_send_packet(struct msp_server_state *state, const uint8_t *payload, size_t len); +int msp_shutdown_stream(struct msp_server_state *state); + +#endif \ No newline at end of file diff --git a/overlay_packet.h b/overlay_packet.h index 2e2fe53e..d8a5cafe 100644 --- a/overlay_packet.h +++ b/overlay_packet.h @@ -20,8 +20,8 @@ #ifndef __SERVAL_DNA__OVERLAY_PACKET_H #define __SERVAL_DNA__OVERLAY_PACKET_H -#include "overlay_address.h" #include "serval_types.h" +#include "overlay_address.h" #include "section.h" #define FRAME_NOT_SENT -1 diff --git a/overlay_queue.c b/overlay_queue.c index 3fd0d8ba..ce66c106 100644 --- a/overlay_queue.c +++ b/overlay_queue.c @@ -165,7 +165,7 @@ int _overlay_payload_enqueue(struct __sourceloc __whence, struct overlay_frame * return WHY("Packet content overrun -- not queueing"); if (ob_position(p->payload) >= MDP_MTU) - FATAL("Queued packet is too big"); + FATALF("Queued packet len %u is too big", ob_position(p->payload)); if (queue->length>=queue->maxLength) return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength); diff --git a/sourcefiles.mk b/sourcefiles.mk index 74f0f465..4b77403e 100644 --- a/sourcefiles.mk +++ b/sourcefiles.mk @@ -77,6 +77,7 @@ SERVAL_DAEMON_SOURCES = \ overlay_mdp.c \ overlay_mdp_services.c \ mdp_filter.c \ + msp_server.c \ overlay_olsr.c \ overlay_packetformats.c \ overlay_payload.c \