diff --git a/commandline.c b/commandline.c index 820ab451..2c786051 100644 --- a/commandline.c +++ b/commandline.c @@ -3058,7 +3058,7 @@ struct cli_schema command_line_options[]={ "Run serial encapsulation test"}, {app_msp_connection,{"msp", "listen", "", NULL}, 0, "Listen for incoming connections"}, - {app_msp_connection,{"msp", "connect", "", "", NULL}, 0, + {app_msp_connection,{"msp", "connect", "[--once]", "[--forward=]", "", "", NULL}, 0, "Connect to a remote party"}, #ifdef HAVE_VOIPTEST {app_pa_phone,{"phone",NULL}, 0, diff --git a/msp_client.c b/msp_client.c index 88e94956..a75e7984 100644 --- a/msp_client.c +++ b/msp_client.c @@ -79,6 +79,18 @@ unsigned msp_socket_count() return i; } +void msp_debug() +{ + struct msp_sock *p=root; + DEBUGF("Msp sockets;"); + while(p){ + DEBUGF("State %d, from %s:%d to %s:%d", p->state, + alloca_tohex_sid_t(p->header.local.sid), p->header.local.port, + alloca_tohex_sid_t(p->header.remote.sid), p->header.remote.port); + p=p->_next; + } +} + static void free_all_packets(struct msp_window *window) { struct msp_packet *p = window->_head; @@ -113,6 +125,8 @@ static void free_acked_packets(struct msp_window *window, uint16_t seq) } window->_head = p; if (rtt){ + if (rtt < 10) + rtt=10; window->rtt = rtt; if (window->base_rtt > rtt) window->base_rtt = rtt; diff --git a/msp_client.h b/msp_client.h index e2d2717a..6a6e1dcc 100644 --- a/msp_client.h +++ b/msp_client.h @@ -29,6 +29,8 @@ void msp_close(struct msp_sock *sock); void msp_close_all(int mdp_sock); unsigned msp_socket_count(); +void msp_debug(); + int msp_set_handler(struct msp_sock *sock, int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context), void *context); diff --git a/msp_proxy.c b/msp_proxy.c index 47fcc3e6..a12528eb 100644 --- a/msp_proxy.c +++ b/msp_proxy.c @@ -1,6 +1,8 @@ #include "serval.h" #include "str.h" +#include "strbuf.h" +#include "strbuf_helpers.h" #include "dataformats.h" #include "mdp_client.h" #include "msp_client.h" @@ -23,11 +25,13 @@ struct connection{ }; int saw_error=0; +int once =0; struct msp_sock *listener=NULL; struct mdp_sockaddr remote_addr; static int try_send(struct connection *conn); static void msp_poll(struct sched_ent *alarm); +static void listen_poll(struct sched_ent *alarm); static void io_poll(struct sched_ent *alarm); struct profile_total mdp_sock_stats={ @@ -46,6 +50,17 @@ struct profile_total io_stats={ .name="io_stats" }; +struct profile_total listen_stats={ + .name="listen_poll" +}; + +struct sched_ent listen_alarm={ + .poll.revents = 0, + .poll.events = POLLIN, + .poll.fd = -1, + .function = listen_poll, + .stats = &listen_stats, +}; static struct connection *alloc_connection( struct msp_sock *sock, @@ -103,6 +118,10 @@ static void free_connection(struct connection *conn) close(conn->alarm_in.poll.fd); if (conn->alarm_out.poll.fd!=-1 && conn->alarm_out.poll.fd != conn->alarm_in.poll.fd) close(conn->alarm_out.poll.fd); + conn->in=NULL; + conn->out=NULL; + conn->alarm_in.poll.fd=-1; + conn->alarm_out.poll.fd=-1; DEBUGF("Freeing connection %p", conn); free(conn); @@ -115,6 +134,14 @@ static void free_connection(struct connection *conn) } } +static void process_msp_asap() +{ + mdp_sock.alarm = gettime_ms(); + mdp_sock.deadline = mdp_sock.alarm+10; + unschedule(&mdp_sock); + schedule(&mdp_sock); +} + static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context) { struct connection *conn = context; @@ -148,19 +175,16 @@ static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t * INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port); conn->sock = NULL; - - if (!conn->out->limit){ + if (is_watching(&conn->alarm_in)) + unwatch(&conn->alarm_in); + if (!is_watching(&conn->alarm_out)) free_connection(conn); - } return 0; } - if (state&MSP_STATE_DATAOUT){ + if (state&MSP_STATE_DATAOUT) try_send(conn); - if (conn->in->limitin->capacity && !is_watching(&conn->alarm_in)) - watch(&conn->alarm_in); - } return 0; } @@ -183,9 +207,10 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t if (payload) return msp_handler(sock, state, payload, len, conn); - // stop listening after the first incoming connection - msp_close(listener); - + if (once){ + // stop listening after the first incoming connection + msp_close(listener); + } return 0; } @@ -207,6 +232,14 @@ static void msp_poll(struct sched_ent *alarm) } } +static void local_shutdown(struct connection *conn) +{ + struct mdp_sockaddr remote; + msp_get_remote_adr(conn->sock, &remote); + msp_shutdown(conn->sock); + INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port); +} + static int try_send(struct connection *conn) { if (!conn->in->limit) @@ -216,7 +249,13 @@ static int try_send(struct connection *conn) // if this packet was acceptted, clear the read buffer conn->in->limit = conn->in->position = 0; - + // hit end of data? + if (conn->alarm_in.poll.events==0){ + local_shutdown(conn); + }else{ + if (!is_watching(&conn->alarm_in)) + watch(&conn->alarm_in); + } return 1; } @@ -232,23 +271,15 @@ static void io_poll(struct sched_ent *alarm) remaining); if (r>0){ conn->in->limit+=r; - if (try_send(conn)){ - // attempt to process this socket asap - mdp_sock.alarm = gettime_ms(); - mdp_sock.deadline = mdp_sock.alarm+10; - unschedule(&mdp_sock); - schedule(&mdp_sock); - } + if (try_send(conn)) + process_msp_asap(); // stop reading input when the buffer is full if (conn->in->limit==conn->in->capacity){ unwatch(alarm); } }else{ - if (conn->in->limit) - unwatch(alarm); - else - // EOF and no data in the buffer, just trigger our error handler - alarm->poll.revents|=POLLERR; + // EOF? trigger a graceful shutdown + alarm->poll.revents = POLLHUP; } } } @@ -266,52 +297,75 @@ static void io_poll(struct sched_ent *alarm) // if the buffer is empty now, reset it and unwatch the handle if (conn->out->position==conn->out->limit){ - conn->out->limit=0; - conn->out->position=0; + conn->out->limit = conn->out->position = 0; if (is_watching(alarm)) unwatch(alarm); } if (conn->out->limit < conn->out->capacity){ if (conn->sock){ - // make sure we try to process this socket soon for more data - mdp_sock.alarm = gettime_ms(); - mdp_sock.deadline = mdp_sock.alarm+10; - unschedule(&mdp_sock); - schedule(&mdp_sock); + process_msp_asap(); }else{ free_connection(conn); } } } - if (alarm->poll.revents & (POLLHUP | POLLERR)) { - // input has closed? - if (conn->sock){ - struct mdp_sockaddr remote; - msp_get_remote_adr(conn->sock, &remote); - msp_shutdown(conn->sock); - INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port); + if (alarm->poll.revents & POLLHUP) { + // EOF? trigger a graceful shutdown + unwatch(alarm); + alarm->poll.events = 0; + if (!conn->in->limit){ + local_shutdown(conn); + process_msp_asap(); } + } + + if (alarm->poll.revents & POLLERR) { + free_connection(conn); + process_msp_asap(); + } +} + +static void listen_poll(struct sched_ent *alarm) +{ + if (alarm->poll.revents & POLLIN) { + struct socket_address addr; + addr.addrlen = sizeof addr.store; + int fd = accept(alarm->poll.fd, &addr.addr, &addr.addrlen); + if (fd==-1){ + WHYF_perror("accept(%d)", alarm->poll.fd); + return; + } + INFOF("- Incoming TCP connection from %s", alloca_socket_address(&addr)); + struct msp_sock *sock = msp_socket(mdp_sock.poll.fd); + if (!sock) + return; - if (is_watching(alarm)) + struct connection *connection = alloc_connection(sock, fd, io_poll, fd, io_poll); + if (!connection){ + msp_close(sock); + return; + } + + msp_set_handler(sock, msp_handler, connection); + msp_set_remote(sock, remote_addr); + + if (once){ unwatch(alarm); - close(alarm->poll.fd); - alarm->poll.fd=-1; - - // attempt to process this msp socket asap - mdp_sock.alarm = gettime_ms(); - mdp_sock.deadline = mdp_sock.alarm+10; - unschedule(&mdp_sock); - schedule(&mdp_sock); + close(alarm->poll.fd); + alarm->poll.fd=-1; + } } } int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUSED(context)) { - const char *sidhex, *port_string; - - if ( cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1 + const char *sidhex, *port_string, *local_port_string; + once = cli_arg(parsed, "--once", NULL, NULL, NULL) == 0; + + if ( cli_arg(parsed, "--forward", &local_port_string, cli_uint, NULL) == -1 + || cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1 || cli_arg(parsed, "port", &port_string, cli_uint, NULL) == -1) return -1; @@ -337,15 +391,36 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS set_nonblock(STDIN_FILENO); set_nonblock(STDOUT_FILENO); - sock = msp_socket(mdp_sock.poll.fd); if (sidhex && *sidhex){ - struct connection *conn=alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll); - if (!conn) - goto end; - msp_set_handler(sock, msp_handler, conn); - msp_set_remote(sock, addr); - INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port); + if (local_port_string){ + remote_addr = addr; + listen_alarm.poll.fd = esocket(PF_INET, SOCK_STREAM, 0); + if (listen_alarm.poll.fd==-1) + goto end; + struct socket_address ip_addr; + ip_addr.addrlen = sizeof(ip_addr.inet); + ip_addr.inet.sin_family = AF_INET; + ip_addr.inet.sin_port = htons(atoi(local_port_string)); + ip_addr.inet.sin_addr.s_addr = INADDR_ANY; + if (socket_bind(listen_alarm.poll.fd, &ip_addr.addr, ip_addr.addrlen)==-1) + goto end; + if (socket_listen(listen_alarm.poll.fd, 0)==-1) + goto end; + watch(&listen_alarm); + INFOF("- Forwarding from port %d to %s:%d", ntohs(ip_addr.inet.sin_port), alloca_tohex_sid_t(addr.sid), addr.port); + }else{ + sock = msp_socket(mdp_sock.poll.fd); + once = 1; + struct connection *conn=alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll); + if (!conn) + goto end; + msp_set_handler(sock, msp_handler, conn); + msp_set_remote(sock, addr); + INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port); + } }else{ + sock = msp_socket(mdp_sock.poll.fd); + once = 1; msp_set_handler(sock, msp_listener, NULL); msp_set_local(sock, addr); @@ -357,14 +432,11 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS INFOF(" - Listening on port %d", addr.port); } - // run msp_processing once to init alarm timer - mdp_sock.poll.revents=0; - msp_poll(&mdp_sock); + process_msp_asap(); while(fd_poll()){ ; } - ret = saw_error; end: @@ -375,6 +447,10 @@ end: msp_close_all(mdp_sock.poll.fd); mdp_close(mdp_sock.poll.fd); } + if (listen_alarm.poll.fd !=-1 && is_watching(&listen_alarm)) + unwatch(&listen_alarm); + if (listen_alarm.poll.fd!=-1) + close(listen_alarm.poll.fd); unschedule(&mdp_sock); return ret; } diff --git a/tests/msp b/tests/msp index efecd154..50c2ec24 100755 --- a/tests/msp +++ b/tests/msp @@ -29,16 +29,21 @@ teardown() { report_all_servald_servers } +configure_servald_server() { + create_single_identity + add_servald_interface + executeOk_servald config \ + set debug.mdprequests on \ + set log.console.level DEBUG \ + set log.console.show_time on +} + doc_connect_fail="Timeout when the connection isn't reachable" setup_connect_fail() { setup_servald assert_no_servald_processes - foreach_instance +A +B create_single_identity - foreach_instance +A +B \ - executeOk_servald config \ - set debug.mdprequests on \ - set log.console.level DEBUG \ - set log.console.show_time on + set_instance +B + create_single_identity start_servald_instances +A } test_connect_fail() { @@ -53,12 +58,6 @@ doc_hello="Simple Hello World" setup_hello() { setup_servald assert_no_servald_processes - foreach_instance +A +B create_single_identity - foreach_instance +A +B \ - executeOk_servald config \ - set debug.mdprequests on \ - set log.console.level DEBUG \ - set log.console.show_time on start_servald_instances +A +B } server_hello() { @@ -66,6 +65,7 @@ server_hello() { Hello from the server EOF assertStdoutGrep --matches=1 "^Hello from the client$" + tfw_cat --stderr } test_hello() { set_instance +A @@ -84,12 +84,6 @@ setup_client_no_data() { setup_servald assert_no_servald_processes create_file file1 64000 - foreach_instance +A +B create_single_identity - foreach_instance +A +B \ - executeOk_servald config \ - set debug.mdprequests on \ - set log.console.level DEBUG \ - set log.console.show_time on start_servald_instances +A +B } server_client_no_data() { @@ -113,12 +107,6 @@ setup_server_no_data() { setup_servald assert_no_servald_processes create_file file1 64000 - foreach_instance +A +B create_single_identity - foreach_instance +A +B \ - executeOk_servald config \ - set debug.mdprequests on \ - set log.console.level DEBUG \ - set log.console.show_time on start_servald_instances +A +B } server_server_no_data() { @@ -141,12 +129,6 @@ doc_keep_alive="Keep the connection alive with no data" setup_keep_alive() { setup_servald assert_no_servald_processes - foreach_instance +A +B create_single_identity - foreach_instance +A +B \ - executeOk_servald config \ - set debug.mdprequests on \ - set log.console.level DEBUG \ - set log.console.show_time on start_servald_instances +A +B } listen_pipe() { @@ -166,4 +148,26 @@ test_keep_alive() { fork_wait_all } +doc_forward="Forward TCP connections to a remote server" +setup_forward() { + setup_servald + assert_no_servald_processes + start_servald_instances +A +B +} +client_forward() { + executeOk --timeout=20 $servald msp connect --once --forward=2048 $1 512 + tfw_cat --stdout --stderr +} +test_forward() { + set_instance +A + fork server_hello + wait_until --timeout=10 grep "Bind MDP $SIDA:512" "$instance_servald_log" + set_instance +B + fork client_forward $SIDA + sleep 1 + executeOk nc -v 127.0.0.1 2048 < <(echo "Hello from the client") + assertStdoutGrep --matches=1 "^Hello from the server$" + fork_wait_all +} + runTests "$@"