Refactor stdio handling

This commit is contained in:
Jeremy Lakeman 2013-12-18 14:37:46 +10:30
parent 6045eb10e8
commit e09387b27d
4 changed files with 150 additions and 129 deletions

View File

@ -68,6 +68,17 @@ struct msp_sock * msp_socket(int mdp_sock)
return ret;
}
unsigned msp_socket_count()
{
unsigned i=0;
struct msp_sock *p=root;
while(p){
i++;
p=p->_next;
}
return i;
}
static void free_all_packets(struct msp_window *window)
{
struct msp_packet *p = window->_head;
@ -114,11 +125,7 @@ static void free_acked_packets(struct msp_window *window, uint16_t seq)
static void msp_free(struct msp_sock *sock)
{
sock->state |= MSP_STATE_CLOSED;
// last chance to free other resources
if (sock->handler)
sock->handler(sock, sock->state, NULL, 0, sock->context);
// remove from the list first
if (sock->_prev)
sock->_prev->_next = sock->_next;
else
@ -126,6 +133,10 @@ static void msp_free(struct msp_sock *sock)
if (sock->_next)
sock->_next->_prev = sock->_prev;
// last chance to free other resources
if (sock->handler)
sock->handler(sock, sock->state, NULL, 0, sock->context);
free_all_packets(&sock->tx);
free_all_packets(&sock->rx);
@ -134,6 +145,7 @@ static void msp_free(struct msp_sock *sock)
void msp_close(struct msp_sock *sock)
{
// TODO if never sent / received, just free it
sock->state |= MSP_STATE_CLOSED;
}

View File

@ -27,6 +27,7 @@ typedef uint16_t msp_state_t;
struct msp_sock * msp_socket(int mdp_sock);
void msp_close(struct msp_sock *sock);
void msp_close_all(int mdp_sock);
unsigned msp_socket_count();
int msp_set_handler(struct msp_sock *sock,
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context),

View File

@ -4,6 +4,7 @@
#include "dataformats.h"
#include "mdp_client.h"
#include "msp_client.h"
#include "socket.h"
struct buffer{
size_t position;
@ -13,49 +14,62 @@ struct buffer{
};
struct connection{
struct sched_ent alarm_in;
struct sched_ent alarm_out;
struct msp_sock *sock;
struct buffer *in;
struct buffer *out;
int last_state;
};
struct connection *stdio_connection=NULL;
int saw_error=0;
struct msp_sock *listener=NULL;
struct mdp_sockaddr remote_addr;
static int try_send();
static int try_send(struct connection *conn);
static void msp_poll(struct sched_ent *alarm);
static void stdin_poll(struct sched_ent *alarm);
static void stdout_poll(struct sched_ent *alarm);
static void io_poll(struct sched_ent *alarm);
struct profile_total mdp_sock_stats={
.name="msp_poll"
};
struct sched_ent mdp_sock={
.poll.revents = 0,
.poll.events = POLLIN,
.poll.fd = -1,
.function = msp_poll,
.stats = &mdp_sock_stats,
};
struct profile_total stdin_stats={
.name="stdin_poll"
};
struct sched_ent stdin_alarm={
.function = stdin_poll,
.stats = &stdin_stats,
struct profile_total io_stats={
.name="io_stats"
};
struct profile_total stdout_stats={
.name="stdout_poll"
};
struct sched_ent stdout_alarm={
.function = stdout_poll,
.stats = &stdout_stats,
};
static struct connection *alloc_connection()
static struct connection *alloc_connection(
struct msp_sock *sock,
int fd_in,
void (*func_in)(struct sched_ent *alarm),
int fd_out,
void (*func_out)(struct sched_ent *alarm))
{
struct connection *conn = emalloc_zero(sizeof(struct connection));
if (!conn)
return NULL;
conn->sock = sock;
conn->alarm_in.poll.fd = fd_in;
conn->alarm_in.poll.events = POLLIN;
conn->alarm_in.function = func_in;
conn->alarm_in.stats = &io_stats;
conn->alarm_in.context = conn;
conn->alarm_out.poll.fd = fd_out;
conn->alarm_out.poll.events = POLLOUT;
conn->alarm_out.function = func_out;
conn->alarm_out.stats = &io_stats;
conn->alarm_out.context = conn;
watch(&conn->alarm_in);
conn->in = emalloc(1024 + sizeof(struct buffer));
if (!conn->in){
free(conn);
@ -77,11 +91,28 @@ static void free_connection(struct connection *conn)
{
if (!conn)
return;
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (is_watching(&conn->alarm_out))
unwatch(&conn->alarm_out);
if (conn->in)
free(conn->in);
if (conn->out)
free(conn->out);
if (conn->alarm_in.poll.fd!=-1)
close(conn->alarm_in.poll.fd);
if (conn->alarm_out.poll.fd!=-1 && conn->alarm_out.poll.fd != conn->alarm_in.poll.fd)
close(conn->alarm_out.poll.fd);
DEBUGF("Freeing connection %p", conn);
free(conn);
if (msp_socket_count()==0){
DEBUGF("All sockets closed");
unschedule(&mdp_sock);
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
}
}
static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context)
@ -89,23 +120,26 @@ static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *
struct connection *conn = context;
conn->last_state=state;
if (state & MSP_STATE_ERROR)
saw_error=1;
if (payload && len){
if (conn->out->limit){
// attempt to write immediately
stdout_alarm.poll.revents=POLLOUT;
stdout_poll(&stdout_alarm);
conn->alarm_out.poll.revents=POLLOUT;
conn->alarm_out.function(&conn->alarm_out);
}
if (conn->out->capacity < len + conn->out->limit)
return 1;
bcopy(payload, &conn->out->bytes[conn->out->limit], len);
conn->out->limit+=len;
if (!is_watching(&conn->alarm_out))
watch(&conn->alarm_out);
// attempt to write immediately
if (!is_watching(&stdout_alarm))
watch(&stdout_alarm);
stdout_alarm.poll.revents=POLLOUT;
stdout_poll(&stdout_alarm);
conn->alarm_out.poll.revents=POLLOUT;
conn->alarm_out.function(&conn->alarm_out);
}
if (state & MSP_STATE_CLOSED){
@ -115,20 +149,17 @@ static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *
conn->sock = NULL;
if (conn != stdio_connection)
if (!conn->out->limit){
free_connection(conn);
}
unschedule(&mdp_sock);
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
return 0;
}
if (state&MSP_STATE_DATAOUT){
try_send();
if (stdio_connection->in->limit<stdio_connection->in->capacity && !is_watching(&stdin_alarm))
watch(&stdin_alarm);
try_send(conn);
if (conn->in->limit<conn->in->capacity && !is_watching(&conn->alarm_in))
watch(&conn->alarm_in);
}
return 0;
}
@ -142,14 +173,12 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t
msp_get_remote_adr(sock, &remote);
INFOF(" - New connection from %s:%d", alloca_tohex_sid_t(remote.sid), remote.port);
struct connection *conn = alloc_connection();
struct connection *conn = alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll);
if (!conn)
return -1;
conn->sock = sock;
if (!stdio_connection){
stdio_connection=conn;
watch(&stdin_alarm);
}
watch(&conn->alarm_in);
msp_set_handler(sock, msp_handler, conn);
if (payload)
return msp_handler(sock, state, payload, len, conn);
@ -178,35 +207,32 @@ static void msp_poll(struct sched_ent *alarm)
}
}
static int try_send()
static int try_send(struct connection *conn)
{
if (!stdio_connection->in->limit)
if (!conn->in->limit)
return 0;
if (msp_send(stdio_connection->sock, stdio_connection->in->bytes, stdio_connection->in->limit)==-1)
if (msp_send(conn->sock, conn->in->bytes, conn->in->limit)==-1)
return 0;
// if this packet was acceptted, clear the read buffer
stdio_connection->in->limit = stdio_connection->in->position = 0;
conn->in->limit = conn->in->position = 0;
return 1;
}
static void stdin_poll(struct sched_ent *alarm)
static void io_poll(struct sched_ent *alarm)
{
struct connection *conn = alarm->context;
if (alarm->poll.revents & POLLIN) {
if (!stdio_connection){
unwatch(alarm);
return;
}
size_t remaining = stdio_connection->in->capacity - stdio_connection->in->limit;
size_t remaining = conn->in->capacity - conn->in->limit;
if (remaining>0){
ssize_t r = read(alarm->poll.fd,
stdio_connection->in->bytes + stdio_connection->in->limit,
conn->in->bytes + conn->in->limit,
remaining);
if (r>0){
stdio_connection->in->limit+=r;
if (try_send()){
conn->in->limit+=r;
if (try_send(conn)){
// attempt to process this socket asap
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
@ -214,11 +240,11 @@ static void stdin_poll(struct sched_ent *alarm)
schedule(&mdp_sock);
}
// stop reading input when the buffer is full
if (stdio_connection->in->limit==stdio_connection->in->capacity){
if (conn->in->limit==conn->in->capacity){
unwatch(alarm);
}
}else{
if (stdio_connection->in->limit)
if (conn->in->limit)
unwatch(alarm);
else
// EOF and no data in the buffer, just trigger our error handler
@ -227,15 +253,53 @@ static void stdin_poll(struct sched_ent *alarm)
}
}
if (alarm->poll.revents & POLLOUT) {
// try to write some data
size_t data = conn->out->limit-conn->out->position;
if (data>0){
ssize_t r = write(alarm->poll.fd,
conn->out->bytes+conn->out->position,
data);
if (r > 0)
conn->out->position+=r;
}
// if the buffer is empty now, reset it and unwatch the handle
if (conn->out->position==conn->out->limit){
conn->out->limit=0;
conn->out->position=0;
if (is_watching(alarm))
unwatch(alarm);
}
if (conn->out->limit < conn->out->capacity){
if (conn->sock){
// make sure we try to process this socket soon for more data
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
}else{
free_connection(conn);
}
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
// input has closed?
struct mdp_sockaddr remote;
msp_get_remote_adr(stdio_connection->sock, &remote);
msp_shutdown(stdio_connection->sock);
if (conn->sock){
struct mdp_sockaddr remote;
msp_get_remote_adr(conn->sock, &remote);
msp_shutdown(conn->sock);
INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
}
if (is_watching(alarm))
unwatch(alarm);
INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
// attempt to process this socket asap
close(alarm->poll.fd);
alarm->poll.fd=-1;
// attempt to process this msp socket asap
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
@ -243,60 +307,19 @@ static void stdin_poll(struct sched_ent *alarm)
}
}
static void stdout_poll(struct sched_ent *alarm)
{
if (alarm->poll.revents & POLLOUT) {
if (!stdio_connection){
if (is_watching(alarm))
unwatch(alarm);
return;
}
// try to write some data
size_t data = stdio_connection->out->limit-stdio_connection->out->position;
if (data>0){
ssize_t r = write(alarm->poll.fd,
stdio_connection->out->bytes+stdio_connection->out->position,
data);
if (r > 0)
stdio_connection->out->position+=r;
}
// if the buffer is empty now, reset it and unwatch the handle
if (stdio_connection->out->position==stdio_connection->out->limit){
stdio_connection->out->limit=0;
stdio_connection->out->position=0;
if (is_watching(alarm))
unwatch(alarm);
}
if (stdio_connection->out->limit < stdio_connection->out->capacity){
// make sure we try to process this socket soon for more data
mdp_sock.alarm = gettime_ms();
mdp_sock.deadline = mdp_sock.alarm+10;
unschedule(&mdp_sock);
schedule(&mdp_sock);
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
if (is_watching(alarm))
unwatch(alarm);
}
}
int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUSED(context))
{
const char *sidhex, *port_string;
if (cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1)
return -1;
if (cli_arg(parsed, "port", &port_string, cli_uint, NULL) == -1)
if ( cli_arg(parsed, "sid", &sidhex, str_is_subscriber_id, NULL) == -1
|| cli_arg(parsed, "port", &port_string, cli_uint, NULL) == -1)
return -1;
struct mdp_sockaddr addr;
bzero(&addr, sizeof addr);
addr.port = atoi(port_string);
saw_error=0;
if (sidhex && *sidhex){
if (str_to_sid_t(&addr.sid, sidhex) == -1)
@ -309,30 +332,19 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
mdp_sock.poll.fd = mdp_socket();
if (mdp_sock.poll.fd==-1)
goto end;
mdp_sock.poll.events = POLLIN;
watch(&mdp_sock);
set_nonblock(STDIN_FILENO);
set_nonblock(STDOUT_FILENO);
stdin_alarm.poll.fd=STDIN_FILENO;
stdin_alarm.poll.events=POLLIN;
stdout_alarm.poll.fd=STDOUT_FILENO;
stdout_alarm.poll.events=POLLOUT;
sock = msp_socket(mdp_sock.poll.fd);
if (sidhex && *sidhex){
stdio_connection = alloc_connection();
if (!stdio_connection)
struct connection *conn=alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll);
if (!conn)
goto end;
stdio_connection->sock = sock;
msp_set_handler(sock, msp_handler, stdio_connection);
msp_set_handler(sock, msp_handler, conn);
msp_set_remote(sock, addr);
INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
// note we only watch these stdio handles when we have space / bytes in our buffers
watch(&stdin_alarm);
}else{
msp_set_handler(sock, msp_listener, NULL);
msp_set_local(sock, addr);
@ -353,22 +365,17 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
;
}
if (stdio_connection && stdio_connection->last_state & MSP_STATE_ERROR)
ret = 1;
else
ret = 0;
ret = saw_error;
end:
listener=NULL;
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
if (mdp_sock.poll.fd>=0){
if (mdp_sock.poll.fd!=-1){
msp_close_all(mdp_sock.poll.fd);
mdp_close(mdp_sock.poll.fd);
}
unschedule(&mdp_sock);
free_connection(stdio_connection);
stdio_connection=NULL;
return ret;
}

View File

@ -25,6 +25,7 @@ includeTests config
includeTests keyring
includeTests server
includeTests routing
includeTests msp
includeTests dnahelper
includeTests dnaprotocol
includeTests rhizomeops