serval-dna/msp_proxy.c
2014-01-09 15:01:37 +10:30

375 lines
9.2 KiB
C

#include "serval.h"
#include "str.h"
#include "dataformats.h"
#include "mdp_client.h"
#include "msp_client.h"
struct buffer{
size_t position;
size_t limit;
size_t capacity;
uint8_t bytes[];
};
struct connection{
struct msp_sock *sock;
struct buffer *in;
struct buffer *out;
int last_state;
};
struct connection *stdio_connection=NULL;
struct msp_sock *listener=NULL;
static int try_send();
static void msp_poll(struct sched_ent *alarm);
static void stdin_poll(struct sched_ent *alarm);
static void stdout_poll(struct sched_ent *alarm);
struct profile_total mdp_sock_stats={
.name="msp_poll"
};
struct sched_ent mdp_sock={
.function = msp_poll,
.stats = &mdp_sock_stats,
};
struct profile_total stdin_stats={
.name="stdin_poll"
};
struct sched_ent stdin_alarm={
.function = stdin_poll,
.stats = &stdin_stats,
};
struct profile_total stdout_stats={
.name="stdout_poll"
};
struct sched_ent stdout_alarm={
.function = stdout_poll,
.stats = &stdout_stats,
};
static struct connection *alloc_connection()
{
struct connection *conn = emalloc_zero(sizeof(struct connection));
if (!conn)
return NULL;
conn->in = emalloc(1024 + sizeof(struct buffer));
if (!conn->in){
free(conn);
return NULL;
}
conn->out = emalloc(1024 + sizeof(struct buffer));
if (!conn->out){
free(conn->in);
free(conn);
return NULL;
}
conn->in->position = conn->out->position = 0;
conn->in->limit = conn->out->limit = 0;
conn->in->capacity = conn->out->capacity = 1024;
return conn;
}
static void free_connection(struct connection *conn)
{
if (!conn)
return;
if (conn->in)
free(conn->in);
if (conn->out)
free(conn->out);
free(conn);
}
static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context)
{
struct connection *conn = context;
conn->last_state=state;
if (payload && len){
if (conn->out->limit){
// attempt to write immediately
stdout_alarm.poll.revents=POLLOUT;
stdout_poll(&stdout_alarm);
}
if (conn->out->capacity < len + conn->out->limit)
return 1;
bcopy(payload, &conn->out->bytes[conn->out->limit], len);
conn->out->limit+=len;
// attempt to write immediately
if (!is_watching(&stdout_alarm))
watch(&stdout_alarm);
stdout_alarm.poll.revents=POLLOUT;
stdout_poll(&stdout_alarm);
}
if (state & MSP_STATE_CLOSED){
struct mdp_sockaddr remote;
msp_get_remote_adr(sock, &remote);
INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port);
conn->sock = NULL;
if (conn != stdio_connection)
free_connection(conn);
unschedule(&mdp_sock);
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
return 0;
}
if (state&MSP_STATE_DATAOUT){
try_send();
if (stdio_connection->in->limit<stdio_connection->in->capacity && !is_watching(&stdin_alarm))
watch(&stdin_alarm);
}
return 0;
}
static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *UNUSED(context))
{
if (state & MSP_STATE_CLOSED)
return 0;
struct mdp_sockaddr remote;
msp_get_remote_adr(sock, &remote);
INFOF(" - New connection from %s:%d", alloca_tohex_sid_t(remote.sid), remote.port);
struct connection *conn = alloc_connection();
if (!conn)
return -1;
conn->sock = sock;
if (!stdio_connection){
stdio_connection=conn;
watch(&stdin_alarm);
}
msp_set_handler(sock, msp_handler, conn);
if (payload)
return msp_handler(sock, state, payload, len, conn);
// stop listening after the first incoming connection
msp_close(listener);
return 0;
}
static void msp_poll(struct sched_ent *alarm)
{
if (alarm->poll.revents & POLLIN)
// process incoming data packet
msp_recv(alarm->poll.fd);
// do any timed actions that need to be done, either in response to receiving or due to a timed alarm.
msp_processing(&alarm->alarm);
if (alarm->alarm){
time_ms_t now = gettime_ms();
if (alarm->alarm < now)
alarm->alarm = now;
alarm->deadline = alarm->alarm +10;
unschedule(alarm);
schedule(alarm);
}
}
static int try_send()
{
if (!stdio_connection->in->limit)
return 0;
if (msp_send(stdio_connection->sock, stdio_connection->in->bytes, stdio_connection->in->limit)==-1)
return 0;
// if this packet was acceptted, clear the read buffer
stdio_connection->in->limit = stdio_connection->in->position = 0;
return 1;
}
static void stdin_poll(struct sched_ent *alarm)
{
if (alarm->poll.revents & POLLIN) {
if (!stdio_connection){
unwatch(alarm);
return;
}
size_t remaining = stdio_connection->in->capacity - stdio_connection->in->limit;
if (remaining>0){
ssize_t r = read(alarm->poll.fd,
stdio_connection->in->bytes + stdio_connection->in->limit,
remaining);
if (r>0){
stdio_connection->in->limit+=r;
if (try_send()){
// attempt to process this socket asap
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
}
// stop reading input when the buffer is full
if (stdio_connection->in->limit==stdio_connection->in->capacity){
unwatch(alarm);
}
}else{
if (stdio_connection->in->limit)
unwatch(alarm);
else
// EOF and no data in the buffer, just trigger our error handler
alarm->poll.revents|=POLLERR;
}
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
// input has closed?
struct mdp_sockaddr remote;
msp_get_remote_adr(stdio_connection->sock, &remote);
msp_shutdown(stdio_connection->sock);
if (is_watching(alarm))
unwatch(alarm);
INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
// attempt to process this socket asap
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
}
}
static void stdout_poll(struct sched_ent *alarm)
{
if (alarm->poll.revents & POLLOUT) {
if (!stdio_connection){
if (is_watching(alarm))
unwatch(alarm);
return;
}
// try to write some data
size_t data = stdio_connection->out->limit-stdio_connection->out->position;
if (data>0){
ssize_t r = write(alarm->poll.fd,
stdio_connection->out->bytes+stdio_connection->out->position,
data);
if (r > 0)
stdio_connection->out->position+=r;
}
// if the buffer is empty now, reset it and unwatch the handle
if (stdio_connection->out->position==stdio_connection->out->limit){
stdio_connection->out->limit=0;
stdio_connection->out->position=0;
if (is_watching(alarm))
unwatch(alarm);
}
if (stdio_connection->out->limit < stdio_connection->out->capacity){
// make sure we try to process this socket soon for more data
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
if (is_watching(alarm))
unwatch(alarm);
}
}
int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *sidhex, *port_string;
if (cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1)
return -1;
if (cli_arg(parsed, "port", &port_string, cli_uint, NULL) == -1)
return -1;
struct mdp_sockaddr addr;
bzero(&addr, sizeof addr);
addr.port = atoi(port_string);
if (sidhex && *sidhex){
if (str_to_sid_t(&addr.sid, sidhex) == -1)
return WHY("str_to_sid_t() failed");
}
int ret=-1;
struct msp_sock *sock = NULL;
mdp_sock.poll.fd = mdp_socket();
if (mdp_sock.poll.fd==-1)
goto end;
mdp_sock.poll.events = POLLIN;
watch(&mdp_sock);
set_nonblock(STDIN_FILENO);
set_nonblock(STDOUT_FILENO);
stdin_alarm.poll.fd=STDIN_FILENO;
stdin_alarm.poll.events=POLLIN;
stdout_alarm.poll.fd=STDOUT_FILENO;
stdout_alarm.poll.events=POLLOUT;
sock = msp_socket(mdp_sock.poll.fd);
if (sidhex && *sidhex){
stdio_connection = alloc_connection();
if (!stdio_connection)
goto end;
stdio_connection->sock = sock;
msp_set_handler(sock, msp_handler, stdio_connection);
msp_set_remote(sock, addr);
INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
// note we only watch these stdio handles when we have space / bytes in our buffers
watch(&stdin_alarm);
}else{
msp_set_handler(sock, msp_listener, NULL);
msp_set_local(sock, addr);
// sock will be closed if listen fails
if (msp_listen(sock)==-1)
goto end;
listener=sock;
INFOF(" - Listening on port %d", addr.port);
}
// run msp_processing once to init alarm timer
mdp_sock.poll.revents=0;
msp_poll(&mdp_sock);
while(fd_poll()){
;
}
if (stdio_connection && stdio_connection->last_state & MSP_STATE_ERROR)
ret = 1;
else
ret = 0;
end:
listener=NULL;
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
if (mdp_sock.poll.fd>=0){
msp_close_all(mdp_sock.poll.fd);
mdp_close(mdp_sock.poll.fd);
}
unschedule(&mdp_sock);
free_connection(stdio_connection);
stdio_connection=NULL;
return ret;
}