Add command to support forwarding TCP connections to an MSP service

This commit is contained in:
Jeremy Lakeman 2013-12-19 14:45:58 +10:30
parent e09387b27d
commit 99e08d1516
5 changed files with 187 additions and 91 deletions

View File

@ -3058,7 +3058,7 @@ struct cli_schema command_line_options[]={
"Run serial encapsulation test"},
{app_msp_connection,{"msp", "listen", "<port>", NULL}, 0,
"Listen for incoming connections"},
{app_msp_connection,{"msp", "connect", "<sid>", "<port>", NULL}, 0,
{app_msp_connection,{"msp", "connect", "[--once]", "[--forward=<local_port>]", "<sid>", "<port>", NULL}, 0,
"Connect to a remote party"},
#ifdef HAVE_VOIPTEST
{app_pa_phone,{"phone",NULL}, 0,

View File

@ -79,6 +79,18 @@ unsigned msp_socket_count()
return i;
}
void msp_debug()
{
struct msp_sock *p=root;
DEBUGF("Msp sockets;");
while(p){
DEBUGF("State %d, from %s:%d to %s:%d", p->state,
alloca_tohex_sid_t(p->header.local.sid), p->header.local.port,
alloca_tohex_sid_t(p->header.remote.sid), p->header.remote.port);
p=p->_next;
}
}
static void free_all_packets(struct msp_window *window)
{
struct msp_packet *p = window->_head;
@ -113,6 +125,8 @@ static void free_acked_packets(struct msp_window *window, uint16_t seq)
}
window->_head = p;
if (rtt){
if (rtt < 10)
rtt=10;
window->rtt = rtt;
if (window->base_rtt > rtt)
window->base_rtt = rtt;

View File

@ -29,6 +29,8 @@ void msp_close(struct msp_sock *sock);
void msp_close_all(int mdp_sock);
unsigned msp_socket_count();
void msp_debug();
int msp_set_handler(struct msp_sock *sock,
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context),
void *context);

View File

@ -1,6 +1,8 @@
#include "serval.h"
#include "str.h"
#include "strbuf.h"
#include "strbuf_helpers.h"
#include "dataformats.h"
#include "mdp_client.h"
#include "msp_client.h"
@ -23,11 +25,13 @@ struct connection{
};
int saw_error=0;
int once =0;
struct msp_sock *listener=NULL;
struct mdp_sockaddr remote_addr;
static int try_send(struct connection *conn);
static void msp_poll(struct sched_ent *alarm);
static void listen_poll(struct sched_ent *alarm);
static void io_poll(struct sched_ent *alarm);
struct profile_total mdp_sock_stats={
@ -46,6 +50,17 @@ struct profile_total io_stats={
.name="io_stats"
};
struct profile_total listen_stats={
.name="listen_poll"
};
struct sched_ent listen_alarm={
.poll.revents = 0,
.poll.events = POLLIN,
.poll.fd = -1,
.function = listen_poll,
.stats = &listen_stats,
};
static struct connection *alloc_connection(
struct msp_sock *sock,
@ -103,6 +118,10 @@ static void free_connection(struct connection *conn)
close(conn->alarm_in.poll.fd);
if (conn->alarm_out.poll.fd!=-1 && conn->alarm_out.poll.fd != conn->alarm_in.poll.fd)
close(conn->alarm_out.poll.fd);
conn->in=NULL;
conn->out=NULL;
conn->alarm_in.poll.fd=-1;
conn->alarm_out.poll.fd=-1;
DEBUGF("Freeing connection %p", conn);
free(conn);
@ -115,6 +134,14 @@ static void free_connection(struct connection *conn)
}
}
static void process_msp_asap()
{
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
}
static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context)
{
struct connection *conn = context;
@ -148,19 +175,16 @@ static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *
INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port);
conn->sock = NULL;
if (!conn->out->limit){
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (!is_watching(&conn->alarm_out))
free_connection(conn);
}
return 0;
}
if (state&MSP_STATE_DATAOUT){
if (state&MSP_STATE_DATAOUT)
try_send(conn);
if (conn->in->limit<conn->in->capacity && !is_watching(&conn->alarm_in))
watch(&conn->alarm_in);
}
return 0;
}
@ -183,9 +207,10 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t
if (payload)
return msp_handler(sock, state, payload, len, conn);
// stop listening after the first incoming connection
msp_close(listener);
if (once){
// stop listening after the first incoming connection
msp_close(listener);
}
return 0;
}
@ -207,6 +232,14 @@ static void msp_poll(struct sched_ent *alarm)
}
}
static void local_shutdown(struct connection *conn)
{
struct mdp_sockaddr remote;
msp_get_remote_adr(conn->sock, &remote);
msp_shutdown(conn->sock);
INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
}
static int try_send(struct connection *conn)
{
if (!conn->in->limit)
@ -216,7 +249,13 @@ static int try_send(struct connection *conn)
// if this packet was acceptted, clear the read buffer
conn->in->limit = conn->in->position = 0;
// hit end of data?
if (conn->alarm_in.poll.events==0){
local_shutdown(conn);
}else{
if (!is_watching(&conn->alarm_in))
watch(&conn->alarm_in);
}
return 1;
}
@ -232,23 +271,15 @@ static void io_poll(struct sched_ent *alarm)
remaining);
if (r>0){
conn->in->limit+=r;
if (try_send(conn)){
// attempt to process this socket asap
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
}
if (try_send(conn))
process_msp_asap();
// stop reading input when the buffer is full
if (conn->in->limit==conn->in->capacity){
unwatch(alarm);
}
}else{
if (conn->in->limit)
unwatch(alarm);
else
// EOF and no data in the buffer, just trigger our error handler
alarm->poll.revents|=POLLERR;
// EOF? trigger a graceful shutdown
alarm->poll.revents = POLLHUP;
}
}
}
@ -266,52 +297,75 @@ static void io_poll(struct sched_ent *alarm)
// if the buffer is empty now, reset it and unwatch the handle
if (conn->out->position==conn->out->limit){
conn->out->limit=0;
conn->out->position=0;
conn->out->limit = conn->out->position = 0;
if (is_watching(alarm))
unwatch(alarm);
}
if (conn->out->limit < conn->out->capacity){
if (conn->sock){
// make sure we try to process this socket soon for more data
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
process_msp_asap();
}else{
free_connection(conn);
}
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
// input has closed?
if (conn->sock){
struct mdp_sockaddr remote;
msp_get_remote_adr(conn->sock, &remote);
msp_shutdown(conn->sock);
INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
if (alarm->poll.revents & POLLHUP) {
// EOF? trigger a graceful shutdown
unwatch(alarm);
alarm->poll.events = 0;
if (!conn->in->limit){
local_shutdown(conn);
process_msp_asap();
}
}
if (alarm->poll.revents & POLLERR) {
free_connection(conn);
process_msp_asap();
}
}
static void listen_poll(struct sched_ent *alarm)
{
if (alarm->poll.revents & POLLIN) {
struct socket_address addr;
addr.addrlen = sizeof addr.store;
int fd = accept(alarm->poll.fd, &addr.addr, &addr.addrlen);
if (fd==-1){
WHYF_perror("accept(%d)", alarm->poll.fd);
return;
}
INFOF("- Incoming TCP connection from %s", alloca_socket_address(&addr));
struct msp_sock *sock = msp_socket(mdp_sock.poll.fd);
if (!sock)
return;
if (is_watching(alarm))
struct connection *connection = alloc_connection(sock, fd, io_poll, fd, io_poll);
if (!connection){
msp_close(sock);
return;
}
msp_set_handler(sock, msp_handler, connection);
msp_set_remote(sock, remote_addr);
if (once){
unwatch(alarm);
close(alarm->poll.fd);
alarm->poll.fd=-1;
// attempt to process this msp socket asap
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
close(alarm->poll.fd);
alarm->poll.fd=-1;
}
}
}
int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *sidhex, *port_string;
if ( cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1
const char *sidhex, *port_string, *local_port_string;
once = cli_arg(parsed, "--once", NULL, NULL, NULL) == 0;
if ( cli_arg(parsed, "--forward", &local_port_string, cli_uint, NULL) == -1
|| cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1
|| cli_arg(parsed, "port", &port_string, cli_uint, NULL) == -1)
return -1;
@ -337,15 +391,36 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
set_nonblock(STDIN_FILENO);
set_nonblock(STDOUT_FILENO);
sock = msp_socket(mdp_sock.poll.fd);
if (sidhex && *sidhex){
struct connection *conn=alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll);
if (!conn)
goto end;
msp_set_handler(sock, msp_handler, conn);
msp_set_remote(sock, addr);
INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
if (local_port_string){
remote_addr = addr;
listen_alarm.poll.fd = esocket(PF_INET, SOCK_STREAM, 0);
if (listen_alarm.poll.fd==-1)
goto end;
struct socket_address ip_addr;
ip_addr.addrlen = sizeof(ip_addr.inet);
ip_addr.inet.sin_family = AF_INET;
ip_addr.inet.sin_port = htons(atoi(local_port_string));
ip_addr.inet.sin_addr.s_addr = INADDR_ANY;
if (socket_bind(listen_alarm.poll.fd, &ip_addr.addr, ip_addr.addrlen)==-1)
goto end;
if (socket_listen(listen_alarm.poll.fd, 0)==-1)
goto end;
watch(&listen_alarm);
INFOF("- Forwarding from port %d to %s:%d", ntohs(ip_addr.inet.sin_port), alloca_tohex_sid_t(addr.sid), addr.port);
}else{
sock = msp_socket(mdp_sock.poll.fd);
once = 1;
struct connection *conn=alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll);
if (!conn)
goto end;
msp_set_handler(sock, msp_handler, conn);
msp_set_remote(sock, addr);
INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
}
}else{
sock = msp_socket(mdp_sock.poll.fd);
once = 1;
msp_set_handler(sock, msp_listener, NULL);
msp_set_local(sock, addr);
@ -357,14 +432,11 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
INFOF(" - Listening on port %d", addr.port);
}
// run msp_processing once to init alarm timer
mdp_sock.poll.revents=0;
msp_poll(&mdp_sock);
process_msp_asap();
while(fd_poll()){
;
}
ret = saw_error;
end:
@ -375,6 +447,10 @@ end:
msp_close_all(mdp_sock.poll.fd);
mdp_close(mdp_sock.poll.fd);
}
if (listen_alarm.poll.fd !=-1 && is_watching(&listen_alarm))
unwatch(&listen_alarm);
if (listen_alarm.poll.fd!=-1)
close(listen_alarm.poll.fd);
unschedule(&mdp_sock);
return ret;
}

View File

@ -29,16 +29,21 @@ teardown() {
report_all_servald_servers
}
configure_servald_server() {
create_single_identity
add_servald_interface
executeOk_servald config \
set debug.mdprequests on \
set log.console.level DEBUG \
set log.console.show_time on
}
doc_connect_fail="Timeout when the connection isn't reachable"
setup_connect_fail() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B \
executeOk_servald config \
set debug.mdprequests on \
set log.console.level DEBUG \
set log.console.show_time on
set_instance +B
create_single_identity
start_servald_instances +A
}
test_connect_fail() {
@ -53,12 +58,6 @@ doc_hello="Simple Hello World"
setup_hello() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B \
executeOk_servald config \
set debug.mdprequests on \
set log.console.level DEBUG \
set log.console.show_time on
start_servald_instances +A +B
}
server_hello() {
@ -66,6 +65,7 @@ server_hello() {
Hello from the server
EOF
assertStdoutGrep --matches=1 "^Hello from the client$"
tfw_cat --stderr
}
test_hello() {
set_instance +A
@ -84,12 +84,6 @@ setup_client_no_data() {
setup_servald
assert_no_servald_processes
create_file file1 64000
foreach_instance +A +B create_single_identity
foreach_instance +A +B \
executeOk_servald config \
set debug.mdprequests on \
set log.console.level DEBUG \
set log.console.show_time on
start_servald_instances +A +B
}
server_client_no_data() {
@ -113,12 +107,6 @@ setup_server_no_data() {
setup_servald
assert_no_servald_processes
create_file file1 64000
foreach_instance +A +B create_single_identity
foreach_instance +A +B \
executeOk_servald config \
set debug.mdprequests on \
set log.console.level DEBUG \
set log.console.show_time on
start_servald_instances +A +B
}
server_server_no_data() {
@ -141,12 +129,6 @@ doc_keep_alive="Keep the connection alive with no data"
setup_keep_alive() {
setup_servald
assert_no_servald_processes
foreach_instance +A +B create_single_identity
foreach_instance +A +B \
executeOk_servald config \
set debug.mdprequests on \
set log.console.level DEBUG \
set log.console.show_time on
start_servald_instances +A +B
}
listen_pipe() {
@ -166,4 +148,26 @@ test_keep_alive() {
fork_wait_all
}
doc_forward="Forward TCP connections to a remote server"
setup_forward() {
setup_servald
assert_no_servald_processes
start_servald_instances +A +B
}
client_forward() {
executeOk --timeout=20 $servald msp connect --once --forward=2048 $1 512
tfw_cat --stdout --stderr
}
test_forward() {
set_instance +A
fork server_hello
wait_until --timeout=10 grep "Bind MDP $SIDA:512" "$instance_servald_log"
set_instance +B
fork client_forward $SIDA
sleep 1
executeOk nc -v 127.0.0.1 2048 < <(echo "Hello from the client")
assertStdoutGrep --matches=1 "^Hello from the server$"
fork_wait_all
}
runTests "$@"