From db7f68afd7fcd0693e18370de91ea55369e9c870 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Thu, 28 Nov 2013 16:53:12 +1030 Subject: [PATCH] Initial msp library implementation with connection state tracking --- commandline.c | 5 + dataformats.c | 10 + dataformats.h | 5 + directory_service.c | 1 + fdqueue.c | 4 +- headerfiles.mk | 1 + mdp_client.h | 3 - msp_client.c | 534 ++++++++++++++++++++++++++++++++++++++++++++ msp_client.h | 63 ++++++ msp_proxy.c | 356 +++++++++++++++++++++++++++++ overlay_mdp.c | 17 +- serval.h | 1 + sourcefiles.mk | 2 + tests/msp | 60 +++++ 14 files changed, 1053 insertions(+), 9 deletions(-) create mode 100644 msp_client.c create mode 100644 msp_client.h create mode 100644 msp_proxy.c create mode 100755 tests/msp diff --git a/commandline.c b/commandline.c index 8fd9303a..820ab451 100644 --- a/commandline.c +++ b/commandline.c @@ -2261,6 +2261,7 @@ static int app_keyring_set_tag(const struct cli_parsed *parsed, struct cli_conte return r; } +// returns -1 on error, -2 on timeout, packet length on success. ssize_t mdp_poll_recv(int mdp_sock, time_ms_t timeout, struct mdp_header *rev_header, unsigned char *payload, size_t buffer_size) { time_ms_t now = gettime_ms(); @@ -3055,6 +3056,10 @@ struct cli_schema command_line_options[]={ "Run byte order handling test"}, {app_slip_test,{"test","slip","[--seed=]","[--duration=|--iterations=]",NULL}, 0, "Run serial encapsulation test"}, + {app_msp_connection,{"msp", "listen", "", NULL}, 0, + "Listen for incoming connections"}, + {app_msp_connection,{"msp", "connect", "", "", NULL}, 0, + "Connect to a remote party"}, #ifdef HAVE_VOIPTEST {app_pa_phone,{"phone",NULL}, 0, "Run phone test application"}, diff --git a/dataformats.c b/dataformats.c index 95b59762..8d26393c 100644 --- a/dataformats.c +++ b/dataformats.c @@ -239,3 +239,13 @@ uint16_t read_uint16(const unsigned char *o) for(i=0;i<2;i++) v=(v<<8)|o[2-1-i]; return v; } + +int compare_wrapped_uint8(uint8_t one, uint8_t two) +{ + return (int8_t)(one - two); +} + +int compare_wrapped_uint16(uint16_t one, uint16_t two) +{ + return (int16_t)(one - two); +} diff --git a/dataformats.h b/dataformats.h index 26c9fb7c..2770a111 100644 --- a/dataformats.h +++ b/dataformats.h @@ -42,4 +42,9 @@ uint64_t read_uint64(const unsigned char *o); uint32_t read_uint32(const unsigned char *o); uint16_t read_uint16(const unsigned char *o); +// compare sequence numbers that wrap +// returns <0 if one is before two, 0 if they are the same, else >0 +int compare_wrapped_uint8(uint8_t one, uint8_t two); +int compare_wrapped_uint16(uint16_t one, uint16_t two); + #endif //__SERVAL_DNA___DATA_FORMATS_H diff --git a/directory_service.c b/directory_service.c index ebad7542..311c1951 100644 --- a/directory_service.c +++ b/directory_service.c @@ -23,6 +23,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include #include #include "constants.h" +#include "serval.h" #include "mdp_client.h" #include "str.h" diff --git a/fdqueue.c b/fdqueue.c index de18178b..a78967f6 100644 --- a/fdqueue.c +++ b/fdqueue.c @@ -204,7 +204,9 @@ int _watch(struct __sourceloc __whence, struct sched_ent *alarm) WARN("watch() called without supplying an alarm name"); if (!alarm->function) - return WHY("Can't watch if you haven't set the function pointer"); + FATAL("Can't watch if you haven't set the function pointer"); + if (!alarm->poll.events) + FATAL("Can't watch if you haven't set any poll flags"); if (alarm->_poll_index>=0 && fd_callbacks[alarm->_poll_index]==alarm){ // updating event flags diff --git a/headerfiles.mk b/headerfiles.mk index fc9ae543..ff362454 100644 --- a/headerfiles.mk +++ b/headerfiles.mk @@ -28,5 +28,6 @@ HDRS= fifo.h \ constants.h \ monitor-client.h \ mdp_client.h \ + msp_client.h \ radio_link.h \ sqlite-amalgamation-3070900/sqlite3.h diff --git a/mdp_client.h b/mdp_client.h index a93e0a3c..4b820d59 100644 --- a/mdp_client.h +++ b/mdp_client.h @@ -19,8 +19,6 @@ #ifndef __SERVAL_DNA__MDP_CLIENT_H #define __SERVAL_DNA__MDP_CLIENT_H -#include "serval.h" - // define 3rd party mdp API without any structure padding #pragma pack(push, 1) @@ -95,7 +93,6 @@ ssize_t mdp_recv(int socket, struct mdp_header *header, uint8_t *payload, ssize_ int mdp_poll(int socket, time_ms_t timeout_ms); - /* Client-side MDP function */ int overlay_mdp_client_socket(void); int overlay_mdp_client_close(int mdp_sockfd); diff --git a/msp_client.c b/msp_client.c new file mode 100644 index 00000000..f5f36874 --- /dev/null +++ b/msp_client.c @@ -0,0 +1,534 @@ + +#include +#include "serval.h" +#include "mdp_client.h" +#include "msp_client.h" +#include "str.h" +#include "dataformats.h" +#include "socket.h" +#include "log.h" + +#define FLAG_SHUTDOWN (1<<0) + +struct msp_packet{ + struct msp_packet *_next; + uint16_t seq; + uint8_t flags; + time_ms_t added; + time_ms_t sent; + const uint8_t *payload; + size_t len; +}; + +#define MAX_WINDOW_SIZE 64 +struct msp_window{ + int packet_count; + uint32_t base_rtt; + uint32_t rtt; + uint16_t next_seq; // seq of next expected TX or RX packet. + struct msp_packet *_head, *_tail; +}; + +struct msp_sock{ + struct msp_sock *_next; + struct msp_sock *_prev; + int mdp_sock; + msp_state_t state; + struct msp_window tx; + struct msp_window rx; + uint16_t previous_ack; + int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context); + void *context; + struct mdp_header header; + time_ms_t last_rx; + time_ms_t timeout; + time_ms_t next_action; +}; + +struct msp_sock *root=NULL; + +struct msp_sock * msp_socket(int mdp_sock) +{ + struct msp_sock *ret = emalloc_zero(sizeof(struct msp_sock)); + ret->mdp_sock = mdp_sock; + ret->state = MSP_STATE_UNINITIALISED; + ret->_next = root; + // TODO set base rtt to ensure that we send the first packet a few times before giving up + ret->tx.base_rtt = ret->tx.rtt = 0xFFFFFFFF; + if (root) + root->_prev=ret; + root = ret; + return ret; +} + +static void free_all_packets(struct msp_window *window) +{ + struct msp_packet *p = window->_head; + while(p){ + struct msp_packet *free_me=p; + p=p->_next; + free((void *)free_me->payload); + free(free_me); + } +} + +static void free_acked_packets(struct msp_window *window, uint16_t seq) +{ + struct msp_packet *p = window->_head; + uint32_t rtt=0; + time_ms_t now = gettime_ms(); + + while(p && compare_wrapped_uint16(p->seq, seq)<=0){ + if (p->sent) + rtt = now - p->sent; + struct msp_packet *free_me=p; + p=p->_next; + free((void *)free_me->payload); + free(free_me); + window->packet_count--; + } + window->_head = p; + if (rtt){ + window->rtt = rtt; + if (window->base_rtt > rtt) + window->base_rtt = rtt; + } + if (!p) + window->_tail = NULL; +} + +void msp_close(struct msp_sock *sock) +{ + sock->state |= MSP_STATE_CLOSED; + + // last chance to free other resources + if (sock->handler) + sock->handler(sock, sock->state, NULL, 0, sock->context); + + if (sock->_prev) + sock->_prev->_next = sock->_next; + else + root=sock->_next; + if (sock->_next) + sock->_next->_prev = sock->_prev; + + free_all_packets(&sock->tx); + free_all_packets(&sock->rx); + + free(sock); +} + +int msp_set_handler(struct msp_sock *sock, + int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context), + void *context) +{ + sock->handler = handler; + sock->context = context; + return 0; +} + +msp_state_t msp_get_state(struct msp_sock *sock) +{ + return sock->state; +} + +void msp_set_watch(struct msp_sock *sock, msp_state_t flags) +{ + assert(flags & ~(MSP_STATE_POLLIN|MSP_STATE_POLLOUT)); + // clear any existing poll bits, and set the requested ones + sock->state &= ~(MSP_STATE_POLLIN|MSP_STATE_POLLOUT); + sock->state |= flags; +} + +int msp_set_local(struct msp_sock *sock, struct mdp_sockaddr local) +{ + assert(sock->state == MSP_STATE_UNINITIALISED); + sock->header.local = local; + return 0; +} + +int msp_set_remote(struct msp_sock *sock, struct mdp_sockaddr remote) +{ + assert(sock->state == MSP_STATE_UNINITIALISED); + sock->header.remote = remote; + return 0; +} + +int msp_listen(struct msp_sock *sock) +{ + assert(sock->state == MSP_STATE_UNINITIALISED); + assert(sock->header.local.port); + + sock->state |= MSP_STATE_LISTENING; + sock->header.flags |= MDP_FLAG_BIND; + mdp_send(sock->mdp_sock, &sock->header, NULL, 0); + + return 0; +} + +int msp_get_remote_adr(struct msp_sock *sock, struct mdp_sockaddr *remote) +{ + *remote = sock->header.remote; + return 0; +} + +static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags, + const uint8_t *payload, size_t len) +{ + struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet)); + if (!packet) + return -1; + + if (!window->_head){ + window->_head = window->_tail = packet; + }else{ + if (window->_tail->seq == seq){ + // ignore duplicate packets + free(packet); + return 0; + }else if (compare_wrapped_uint16(window->_tail->seq, seq)<0){ + if (compare_wrapped_uint16(window->_head->seq, seq)>0){ + // this is ambiguous + free(packet); + return WHYF("%04x is both < tail (%04x) and > head (%04x)", seq, window->_tail->seq, window->_head->seq); + } + + window->_tail->_next = packet; + window->_tail = packet; + }else{ + struct msp_packet **pos = &window->_head; + while(compare_wrapped_uint16((*pos)->seq, seq)<0){ + if ((*pos)->seq == seq){ + // ignore duplicate packets + free(packet); + return 0; + } + pos = &(*pos)->_next; + } + (*pos)->_next = packet; + packet->_next = (*pos); + *pos = packet; + } + } + + packet->added = gettime_ms(); + packet->seq = seq; + packet->flags = flags; + packet->len = len; + + if (payload && len){ + uint8_t *p = emalloc(len); + if (!p){ + free(packet); + return -1; + } + packet->payload = p; + bcopy(payload, p, len); + } + window->packet_count++; + return 0; +} + +struct socket_address daemon_addr={.addrlen=0,}; + +static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet) +{ + if (daemon_addr.addrlen == 0){ + if (make_local_sockaddr(&daemon_addr, "mdp.2.socket") == -1) + return -1; + } + + uint8_t msp_header[5]; + + msp_header[0]=packet->flags; + write_uint16(&msp_header[1], sock->rx.next_seq); + write_uint16(&msp_header[3], packet->seq); + sock->previous_ack = sock->rx.next_seq; + + DEBUGF("Sending packet flags %d, ack %d, seq %d", packet->flags, sock->rx.next_seq, packet->seq); + struct fragmented_data data={ + .fragment_count=3, + .iov={ + { + .iov_base = (void*)&sock->header, + .iov_len = sizeof(struct mdp_header) + }, + { + .iov_base = &msp_header, + .iov_len = sizeof(msp_header) + }, + { + .iov_base = (void*)packet->payload, + .iov_len = packet->len + } + } + }; + + // allow for sending an empty payload body + if (!(packet->payload && packet->len)) + data.fragment_count --; + + ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data); + if (r==-1) + return -1; + packet->sent = gettime_ms(); + if (!sock->timeout) + sock->timeout = packet->sent + 10000; + return 0; +} + +static int send_ack(struct msp_sock *sock) +{ + D; + if (daemon_addr.addrlen == 0){ + if (make_local_sockaddr(&daemon_addr, "mdp.2.socket") == -1) + return -1; + } + + uint8_t msp_header[3]; + + msp_header[0]=0; + write_uint16(&msp_header[1], sock->rx.next_seq); + sock->previous_ack = sock->rx.next_seq; + + struct fragmented_data data={ + .fragment_count=2, + .iov={ + { + .iov_base = (void*)&sock->header, + .iov_len = sizeof(struct mdp_header) + }, + { + .iov_base = &msp_header, + .iov_len = sizeof(msp_header) + } + } + }; + + ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data); + return r<0?-1:0; +} + +// add a packet to the transmit buffer +int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len) +{ + assert(sock->header.remote.port); + assert((sock->state & MSP_STATE_SHUTDOWN_LOCAL)==0); + + if (sock->tx.packet_count > MAX_WINDOW_SIZE) + return -1; + + if (add_packet(&sock->tx, sock->tx.next_seq, 0, payload, len)==-1) + return -1; + + sock->tx.next_seq++; + + // make sure we attempt to process packets from this sock soon + // TODO calculate based on congestion window + sock->next_action = gettime_ms(); + + return 0; +} + +int msp_shutdown(struct msp_sock *sock) +{ + if (sock->tx._tail && sock->tx._tail->sent==0){ + sock->tx._tail->flags |= FLAG_SHUTDOWN; + }else{ + if (add_packet(&sock->tx, sock->tx.next_seq, FLAG_SHUTDOWN, NULL, 0)==-1) + return -1; + sock->tx.next_seq++; + } + sock->state|=MSP_STATE_SHUTDOWN_LOCAL; + return 0; +} + +static int process_sock(struct msp_sock *sock) +{ + time_ms_t now = gettime_ms(); + + if (sock->timeout && sock->timeout < now){ + msp_close(sock); + return -1; + } + + sock->next_action = sock->timeout; + struct msp_packet *p; + + // deliver packets that have now arrived in order + p = sock->rx._head; + if (p) + DEBUGF("Seq %d vs %d", p->seq, sock->rx.next_seq); + + // TODO ... ? (sock->state & MSP_STATE_POLLIN) + while(p && p->seq == sock->rx.next_seq){ + struct msp_packet *packet=p; + + // process packet flags when we have delivered the packet + if (packet->flags & FLAG_SHUTDOWN) + sock->state|=MSP_STATE_SHUTDOWN_REMOTE; + + if (sock->handler + && sock->handler(sock, sock->state, packet->payload, packet->len, sock->context)==-1){ + // keep the packet if the handler refused to accept it. + break; + } + + p=p->_next; + sock->rx.next_seq++; + } + free_acked_packets(&sock->rx, sock->rx.next_seq -1); + + // transmit packets that can now be sent + p = sock->tx._head; + while(p){ + if (p->sent==0 || p->sent + sock->tx.rtt*2 < now){ + + if (!(sock->state & (MSP_STATE_CONNECTING|MSP_STATE_CONNECTED))){ + sock->state |= MSP_STATE_CONNECTING; + sock->header.flags |= MDP_FLAG_BIND; + + }else if (!sock->header.local.port){ + // wait until we have heard back from the daemon with our port number before sending another packet. + break; + } + if (msp_send_packet(sock, p)==-1) + return -1; + } + if (sock->next_action > p->sent + sock->tx.rtt*2) + sock->next_action = p->sent + sock->tx.rtt*2; + p=p->_next; + } + + // should we send an ack now without sending a payload? + if (sock->previous_ack != sock->rx.next_seq){ + if (send_ack(sock)) + return -1; + } + + if ((sock->state & (MSP_STATE_SHUTDOWN_LOCAL|MSP_STATE_SHUTDOWN_REMOTE)) == (MSP_STATE_SHUTDOWN_LOCAL|MSP_STATE_SHUTDOWN_REMOTE) + && sock->tx.packet_count == 0 + && sock->rx.packet_count == 0){ + msp_close(sock); + return -1; + } + + return 0; +} + +int msp_processing(time_ms_t *next_action) +{ + *next_action=0; + struct msp_sock *sock = root; + time_ms_t now = gettime_ms(); + while(sock){ + struct msp_sock *s=sock; + sock = s->_next; + + if (s->next_action && s->next_action <= now){ + // this might cause the socket to be closed. + if (process_sock(s)==0){ + // remember the time of the next thing we need to do. + if (s->next_action!=0 && s->next_action < *next_action) + *next_action=s->next_action; + } + } + } + return 0; +} + +static struct msp_sock * find_connection(int mdp_sock, struct mdp_header *header) +{ + struct msp_sock *s=root; + struct msp_sock *listen=NULL; + while(s){ + if (s->mdp_sock == mdp_sock ){ + if ((s->header.flags & MDP_FLAG_BIND) && (header->flags & MDP_FLAG_BIND)){ + // bind response from the daemon + s->header.local = header->local; + s->header.flags &= ~MDP_FLAG_BIND; + DEBUGF("Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port); + return NULL; + } + + if (s->state & MSP_STATE_LISTENING){ + // remember any matching listen socket so we can create a connection on first use + if (s->header.local.port == header->local.port + && (is_sid_t_any(s->header.local.sid) + || memcmp(&s->header.local.sid, &header->local.sid, SID_SIZE)==0)) + listen=s; + }else if (memcmp(&s->header.remote, &header->remote, sizeof header->remote)==0 + && memcmp(&s->header.local, &header->local, sizeof header->local)==0){ + // if the addresses match, we found it. + return s; + } + } + s = s->_next; + } + + if (listen){ + // create socket for incoming connection + s = msp_socket(listen->mdp_sock); + s->header = *header; + // use the same handler initially + s->handler = listen->handler; + s->context = listen->context; + return s; + } + + WARNF("Unexpected packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port); + return NULL; +} + +static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t *payload, size_t len) +{ + DEBUGF("packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port); + // find or create mdp_sock... + struct msp_sock *sock=find_connection(mdp_sock, header); + + if (!sock) + return 0; + + if (len<3) + return WHY("Expected at least 3 bytes"); + + sock->state |= MSP_STATE_CONNECTED; + sock->last_rx = gettime_ms(); + sock->timeout = sock->last_rx + 10000; + + uint8_t flags = payload[0]; + + uint16_t ack_seq = read_uint16(&payload[1]); + + // release acknowledged packets + free_acked_packets(&sock->tx, ack_seq); + + // TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet + + + // make sure we attempt to process packets from this sock soon + // TODO calculate based on congestion window + sock->next_action = gettime_ms(); + + if (len<5) + return 0; + + uint16_t seq = read_uint16(&payload[3]); + + add_packet(&sock->rx, seq, flags, &payload[5], len - 5); + return 0; +} + +int msp_recv(int mdp_sock) +{ + + struct mdp_header header; + uint8_t payload[1200]; + + ssize_t len = mdp_recv(mdp_sock, &header, payload, sizeof(payload)); + if (len<0) + return -1; + + return process_packet(mdp_sock, &header, payload, len); +} + diff --git a/msp_client.h b/msp_client.h new file mode 100644 index 00000000..56ff6589 --- /dev/null +++ b/msp_client.h @@ -0,0 +1,63 @@ +#ifndef __SERVALD_MSP_CLIENT_H +#define __SERVALD_MSP_CLIENT_H + +#define MSP_STATE_UNINITIALISED 0 +#define MSP_STATE_LISTENING (1<<0) + +#define MSP_STATE_CONNECTING (1<<1) +#define MSP_STATE_CONNECTED (1<<2) + +#define MSP_STATE_SHUTDOWN_LOCAL (1<<3) +#define MSP_STATE_SHUTDOWN_REMOTE (1<<4) + +// this connection is about to be free'd, release any other resources or references to the state +#define MSP_STATE_CLOSED (1<<5) + +// something has gone wrong somewhere and the connection will be closed +#define MSP_STATE_ERROR (1<<6) + +// does the client want to be interupted to reading data? +#define MSP_STATE_POLLIN (1<<7) +// does the client want to know when data can be written? +#define MSP_STATE_POLLOUT (1<<8) + +// is there some data to read? +#define MSP_STATE_DATAIN (1<<9) +// is there space for sending more data? +#define MSP_STATE_DATAOUT (1<<10) + + +struct msp_sock; +typedef uint16_t msp_state_t; + +// allocate a new socket +struct msp_sock * msp_socket(int mdp_sock); +void msp_close(struct msp_sock *sock); + +int msp_set_handler(struct msp_sock *sock, + int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context), + void *context); + +// the local address is only set when calling msp_listen or msp_send +int msp_set_local(struct msp_sock *sock, struct mdp_sockaddr local); +int msp_set_remote(struct msp_sock *sock, struct mdp_sockaddr remote); + +int msp_listen(struct msp_sock *sock); + +int msp_get_remote_adr(struct msp_sock *sock, struct mdp_sockaddr *remote); +msp_state_t msp_get_state(struct msp_sock *sock); + +// which events are you interested in handling? +void msp_set_watch(struct msp_sock *sock, msp_state_t flags); + +// bind, send data, and potentially shutdown this end of the connection +int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len); +int msp_shutdown(struct msp_sock *sock); + +// receive and process an incoming packet +int msp_recv(int mdp_sock); + +// next_action indicates the next time that msp_processing should be called +int msp_processing(time_ms_t *next_action); + +#endif \ No newline at end of file diff --git a/msp_proxy.c b/msp_proxy.c new file mode 100644 index 00000000..5fc51ab9 --- /dev/null +++ b/msp_proxy.c @@ -0,0 +1,356 @@ + +#include "serval.h" +#include "str.h" +#include "dataformats.h" +#include "mdp_client.h" +#include "msp_client.h" + +struct buffer{ + size_t position; + size_t limit; + size_t capacity; + uint8_t bytes[]; +}; + +struct connection{ + struct msp_sock *sock; + struct buffer *in; + struct buffer *out; +}; + +struct connection *stdio_connection=NULL; +struct msp_sock *listener=NULL; + +static void msp_poll(struct sched_ent *alarm); +static void stdin_poll(struct sched_ent *alarm); +static void stdout_poll(struct sched_ent *alarm); + +struct profile_total mdp_sock_stats={ + .name="msp_poll" +}; +struct sched_ent mdp_sock={ + .function = msp_poll, + .stats = &mdp_sock_stats, +}; + +struct profile_total stdin_stats={ + .name="stdin_poll" +}; +struct sched_ent stdin_alarm={ + .function = stdin_poll, + .stats = &stdin_stats, +}; + +struct profile_total stdout_stats={ + .name="stdout_poll" +}; +struct sched_ent stdout_alarm={ + .function = stdout_poll, + .stats = &stdout_stats, +}; + +static struct connection *alloc_connection() +{ + struct connection *conn = emalloc_zero(sizeof(struct connection)); + if (!conn) + return NULL; + conn->in = emalloc(1024 + sizeof(struct buffer)); + if (!conn->in){ + free(conn); + return NULL; + } + conn->out = emalloc(1024 + sizeof(struct buffer)); + if (!conn->out){ + free(conn->in); + free(conn); + return NULL; + } + conn->in->position = conn->out->position = 0; + conn->in->limit = conn->out->limit = 0; + conn->in->capacity = conn->out->capacity = 1024; + return conn; +} + +static void free_connection(struct connection *conn) +{ + if (!conn) + return; + if (conn->in) + free(conn->in); + if (conn->out) + free(conn->out); + free(conn); +} + +static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context) +{ + struct connection *conn = context; + DEBUGF("Handler, state %d, payload len %zd", state, len); + + if (payload && len){ + dump("incoming payload", payload, len); + if (conn->out->capacity < len + conn->out->limit){ + DEBUGF("Insufficient space len %zd, capacity %zd, limit %zd", len, conn->out->capacity, conn->out->limit); + return -1; + } + if (conn->out->limit==0){ + watch(&stdout_alarm); + INFOF("Watching stdout"); + } + bcopy(payload, &conn->out->bytes[conn->out->limit], len); + conn->out->limit+=len; + } + + if (state & MSP_STATE_CLOSED){ + struct mdp_sockaddr remote; + msp_get_remote_adr(sock, &remote); + INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port); + + if (conn == stdio_connection){ + stdio_connection->sock=NULL; + }else{ + free_connection(conn); + } + + unschedule(&mdp_sock); + + if (mdp_sock.poll.events){ + unwatch(&mdp_sock); + mdp_sock.poll.events=0; + INFOF("Unwatching mdp socket"); + } + mdp_close(mdp_sock.poll.fd); + mdp_sock.poll.fd=-1; + return 0; + } + + if (state & MSP_STATE_SHUTDOWN_REMOTE){ + struct mdp_sockaddr remote; + msp_get_remote_adr(sock, &remote); + INFOF(" - Connection with %s:%d remote shutdown", alloca_tohex_sid_t(remote.sid), remote.port); + } + + return 0; +} + +static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *UNUSED(context)) +{ + struct mdp_sockaddr remote; + msp_get_remote_adr(sock, &remote); + INFOF(" - New connection from %s:%d", alloca_tohex_sid_t(remote.sid), remote.port); + + struct connection *conn = alloc_connection(); + if (!conn) + return -1; + conn->sock = sock; + if (!stdio_connection){ + stdio_connection=conn; + watch(&stdin_alarm); + INFOF("Watching stdin"); + } + msp_set_handler(sock, msp_handler, conn); + if (payload) + return msp_handler(sock, state, payload, len, conn); + + // stop listening after an incoming connection + msp_close(listener); + + return 0; +} + +static void msp_poll(struct sched_ent *alarm) +{ + if (alarm->poll.revents & POLLIN) + // process incoming data packet + msp_recv(alarm->poll.fd); + + // do any timed actions that need to be done, either in response to receiving or due to a timed alarm. + msp_processing(&alarm->alarm); + if (alarm->alarm){ + time_ms_t now = gettime_ms(); + if (alarm->alarm < now) + alarm->alarm = now; + alarm->deadline = alarm->alarm +10; + unschedule(alarm); + schedule(alarm); + } +} + +static void stdin_poll(struct sched_ent *alarm) +{ + INFOF("Poll stdin, %d", alarm->poll.revents); + if (alarm->poll.revents & POLLIN) { + if (!stdio_connection){ + unwatch(alarm); + INFOF("Unwatching stdin"); + return; + } + + size_t remaining = stdio_connection->in->capacity - stdio_connection->in->limit; + if (remaining>0){ + ssize_t r = read(alarm->poll.fd, + stdio_connection->in->bytes + stdio_connection->in->limit, + remaining); + INFOF("Read %zd from stdin %d, %d", r, alarm->poll.revents, errno); + if (r>0){ + dump("stdin",stdio_connection->in->bytes + stdio_connection->in->limit, r); + + stdio_connection->in->limit+=r; + + if (msp_send(stdio_connection->sock, stdio_connection->in->bytes, stdio_connection->in->limit)!=-1){ + // if this packet was acceptted, clear the read buffer + stdio_connection->in->limit = stdio_connection->in->position = 0; + // attempt to process this socket asap + mdp_sock.alarm = gettime_ms(); + mdp_sock.deadline = mdp_sock.alarm+10; + unschedule(&mdp_sock); + schedule(&mdp_sock); + } + + // stop reading input when the buffer is full + if (stdio_connection->in->limit==stdio_connection->in->capacity){ + unwatch(alarm); + INFOF("Unwatching stdin"); + } + }else{ + // EOF, just trigger our error handler + alarm->poll.revents|=POLLERR; + } + } + } + + if (alarm->poll.revents & (POLLHUP | POLLERR)) { + // input has closed? + struct mdp_sockaddr remote; + msp_get_remote_adr(stdio_connection->sock, &remote); + msp_shutdown(stdio_connection->sock); + unwatch(alarm); + INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port); + // attempt to process this socket asap + mdp_sock.alarm = gettime_ms(); + mdp_sock.deadline = mdp_sock.alarm+10; + unschedule(&mdp_sock); + schedule(&mdp_sock); + } +} + +static void stdout_poll(struct sched_ent *alarm) +{ + if (alarm->poll.revents & POLLOUT) { + if (!stdio_connection){ + unwatch(alarm); + INFOF("Unwatching stdout"); + return; + } + // try to write some data + size_t data = stdio_connection->out->limit-stdio_connection->out->position; + if (data>0){ + ssize_t r = write(alarm->poll.fd, + stdio_connection->out->bytes+stdio_connection->out->position, + data); + INFOF("Wrote %zd to stdout", r); + if (r > 0) + stdio_connection->out->position+=r; + } + + // if the buffer is empty now, reset it and unwatch the handle + if (stdio_connection->out->position==stdio_connection->out->limit){ + stdio_connection->out->limit=0; + stdio_connection->out->position=0; + unwatch(alarm); + INFOF("Unwatching stdout"); + } + + if (stdio_connection->out->limit < stdio_connection->out->capacity){ + // TODO try to get more data from the socket + } + } + + if (alarm->poll.revents & (POLLHUP | POLLERR)) { + unwatch(alarm); + INFOF("Unwatching stdout"); + // Um, quit? + } +} + +int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUSED(context)) +{ + const char *sidhex, *port_string; + + if (cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1) + return -1; + if (cli_arg(parsed, "port", &port_string, cli_uint, NULL) == -1) + return -1; + + struct mdp_sockaddr addr; + bzero(&addr, sizeof addr); + + addr.port = atoi(port_string); + + if (sidhex && *sidhex){ + if (str_to_sid_t(&addr.sid, sidhex) == -1) + return WHY("str_to_sid_t() failed"); + } + + int ret=-1; + struct msp_sock *sock = NULL; + + mdp_sock.poll.fd = mdp_socket(); + if (mdp_sock.poll.fd==-1) + goto end; + mdp_sock.poll.events = POLLIN; + watch(&mdp_sock); + INFOF("Watching mdp socket"); + + set_nonblock(STDIN_FILENO); + set_nonblock(STDOUT_FILENO); + + stdin_alarm.poll.fd=STDIN_FILENO; + stdin_alarm.poll.events=POLLIN; + + stdout_alarm.poll.fd=STDOUT_FILENO; + stdout_alarm.poll.events=POLLOUT; + + sock = msp_socket(mdp_sock.poll.fd); + if (sidhex && *sidhex){ + stdio_connection = alloc_connection(); + if (!stdio_connection) + return -1; + stdio_connection->sock = sock; + msp_set_handler(sock, msp_handler, stdio_connection); + msp_set_remote(sock, addr); + INFOF("Set remote %s:%d", alloca_tohex_sid_t(addr.sid), addr.port); + + // note we only watch these stdio handles when we have space / bytes in our buffers + watch(&stdin_alarm); + INFOF("Watching stdin"); + }else{ + msp_set_handler(sock, msp_listener, NULL); + msp_set_local(sock, addr); + msp_listen(sock); + listener=sock; + INFOF(" - Listening on port %d", addr.port); + } + sock=NULL; + + while(fd_poll()){ + ; + } + ret=0; + +end: + if (sock) + msp_close(sock); + if (mdp_sock.poll.fd>=0){ + if (mdp_sock.poll.events){ + unwatch(&mdp_sock); + INFOF("Unwatching mdp socket"); + } + mdp_close(mdp_sock.poll.fd); + } + unschedule(&mdp_sock); + free_connection(stdio_connection); + stdio_connection=NULL; + return ret; +} + diff --git a/overlay_mdp.c b/overlay_mdp.c index b9cd1f8d..b0624e42 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -495,7 +495,7 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame */ if (config.debug.mdprequests) - DEBUGF("Received packet with listener (MDP ports: src=%s*:%"PRImdp_port_t", dst=%"PRImdp_port_t")", + DEBUGF("Received packet (MDP ports: src=%s*:%"PRImdp_port_t", dst=%"PRImdp_port_t")", alloca_tohex_sid_t_trunc(mdp->out.src.sid, 14), mdp->out.src.port, mdp->out.dst.port); @@ -530,6 +530,8 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame if (len < 0) RETURN(WHY("unsupported MDP packet type")); struct socket_address *client = &mdp_bindings[match].client; + if (config.debug.mdprequests) + DEBUGF("Forwarding packet to client %s", alloca_socket_address(client)); ssize_t r = sendto(mdp_sock.poll.fd,mdp,len,0, &client->addr, client->addrlen); if (r == -1){ WHYF_perror("sendto(fd=%d,len=%zu,addr=%s)", mdp_sock.poll.fd, (size_t)len, alloca_socket_address(client)); @@ -546,6 +548,7 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame } case 1: { + struct socket_address *client = &mdp_bindings[match].client; struct mdp_header header; header.local.sid=mdp->out.dst.sid; header.local.port=mdp->out.dst.port; @@ -562,7 +565,9 @@ static int overlay_saw_mdp_frame(struct overlay_frame *frame, overlay_mdp_frame if (mdp_bindings[match].internal) RETURN(mdp_bindings[match].internal(&header, mdp->out.payload, mdp->out.payload_length)); - RETURN(mdp_send2(&mdp_bindings[match].client, &header, mdp->out.payload, mdp->out.payload_length)); + if (config.debug.mdprequests) + DEBUGF("Forwarding packet to client v2 %s", alloca_socket_address(client)); + RETURN(mdp_send2(client, &header, mdp->out.payload, mdp->out.payload_length)); } } } else { @@ -740,6 +745,11 @@ static int overlay_send_frame( if (!source) return WHYF("No source specified"); + if (config.debug.mdprequests) + DEBUGF("Attempting to queue mdp packet from %s:%d to %s:%d", + alloca_tohex_sid_t(source->sid), src_port, + destination?alloca_tohex_sid_t(destination->sid):"broadcast", dst_port); + /* Prepare the overlay frame for dispatch */ struct overlay_frame *frame = emalloc_zero(sizeof(struct overlay_frame)); if (!frame) @@ -1406,9 +1416,6 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header overlay_saw_mdp_frame(NULL, &mdp); } - if (config.debug.mdprequests) - DEBUGF("Attempting to queue mdp packet"); - // construct, encrypt, sign and queue the packet if (overlay_send_frame( source, header->local.port, diff --git a/serval.h b/serval.h index c8465336..2a7794e5 100644 --- a/serval.h +++ b/serval.h @@ -602,6 +602,7 @@ int app_meshms_conversations(const struct cli_parsed *parsed, struct cli_context int app_meshms_send_message(const struct cli_parsed *parsed, struct cli_context *context); int app_meshms_list_messages(const struct cli_parsed *parsed, struct cli_context *context); int app_meshms_mark_read(const struct cli_parsed *parsed, struct cli_context *context); +int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *context); int monitor_get_fds(struct pollfd *fds,int *fdcount,int fdmax); diff --git a/sourcefiles.mk b/sourcefiles.mk index 4894d7cc..976771d1 100644 --- a/sourcefiles.mk +++ b/sourcefiles.mk @@ -24,6 +24,8 @@ SERVAL_SOURCES = \ $(SERVAL_BASE)meshms.c \ $(SERVAL_BASE)mdp_client.c \ $(SERVAL_BASE)mdp_net.c \ + $(SERVAL_BASE)msp_client.c \ + $(SERVAL_BASE)msp_proxy.c \ $(SERVAL_BASE)os.c \ $(SERVAL_BASE)mem.c \ $(SERVAL_BASE)instance.c \ diff --git a/tests/msp b/tests/msp new file mode 100755 index 00000000..fd106c7b --- /dev/null +++ b/tests/msp @@ -0,0 +1,60 @@ +#!/bin/bash + +# Tests for stream connections +# +# Copyright 2012 Serval Project, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + + +source "${0%/*}/../testframework.sh" +source "${0%/*}/../testdefs.sh" + +teardown() { + stop_all_servald_servers + kill_all_servald_processes + assert_no_servald_processes + report_all_servald_servers +} + +doc_hello="Simple Hello World" +setup_hello() { + setup_servald + assert_no_servald_processes + foreach_instance +A +B create_single_identity + foreach_instance +A +B \ + executeOk_servald config \ + set debug.mdprequests on \ + set log.console.level DEBUG \ + set log.console.show_time on + start_servald_instances +A +B + set_instance +A +} +test_hello() { + fork executeOk_servald --timeout=5 msp listen 512 <