Watch mdp socket on new tunnelled connections

This commit is contained in:
Jeremy Lakeman 2014-05-15 16:30:41 +09:30
parent 9e267ece21
commit 5f9ffefcfc

View File

@ -39,6 +39,7 @@ struct connection{
MSP_SOCKET sock;
struct buffer *in;
struct buffer *out;
char eof;
int last_state;
};
@ -53,6 +54,7 @@ static void msp_poll(struct sched_ent *alarm);
static void service_poll(struct sched_ent *alarm);
static void listen_poll(struct sched_ent *alarm);
static void io_poll(struct sched_ent *alarm);
static MSP_HANDLER msp_handler;
struct profile_total mdp_sock_stats={
.name="msp_poll"
@ -141,6 +143,10 @@ static void free_connection(struct connection *conn)
{
if (!conn)
return;
if (!msp_socket_is_closed(conn->sock)){
msp_set_handler(conn->sock, msp_handler, NULL);
msp_close(conn->sock);
}
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (is_watching(&conn->alarm_out))
@ -159,12 +165,8 @@ static void free_connection(struct connection *conn)
conn->alarm_out.poll.fd=-1;
free(conn);
if (msp_socket_count()==0){
unschedule(&mdp_sock);
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
}
if (msp_socket_count()==0 && is_watching(&mdp_sock))
unwatch(&mdp_sock);
}
static void process_msp_asap()
@ -195,6 +197,9 @@ static void local_shutdown(struct connection *conn)
static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *payload, size_t len, void *context)
{
struct connection *conn = context;
if (!conn)
return 0;
if (state & MSP_STATE_ERROR)
saw_error=1;
@ -209,8 +214,8 @@ static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *pay
bcopy(payload, &conn->out->bytes[conn->out->limit], len);
conn->out->limit+=len;
if (!is_watching(&conn->alarm_out))
watch(&conn->alarm_out);
conn->alarm_out.poll.events|=POLLOUT;
watch(&conn->alarm_out);
// attempt to write immediately
conn->alarm_out.poll.revents=POLLOUT;
@ -281,7 +286,6 @@ static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *pa
return 0;
conn->sock = sock;
watch(&conn->alarm_in);
msp_set_handler(sock, msp_handler, conn);
if (payload)
return msp_handler(sock, state, payload, len, conn);
@ -339,11 +343,11 @@ static int try_send(struct connection *conn)
// if this packet was acceptted, clear the read buffer
conn->in->limit = conn->in->position = 0;
// hit end of data?
if (!(conn->alarm_in.poll.events&POLLIN)){
if (conn->eof){
local_shutdown(conn);
}else{
if (!is_watching(&conn->alarm_in))
watch(&conn->alarm_in);
conn->alarm_in.poll.events|=POLLIN;
watch(&conn->alarm_in);
}
return 1;
}
@ -358,19 +362,30 @@ static void io_poll(struct sched_ent *alarm)
ssize_t r = read(alarm->poll.fd,
conn->in->bytes + conn->in->limit,
remaining);
if (r<0){
WARNF_perror("read(%d)", alarm->poll.fd);
alarm->poll.revents = POLLERR;
}
if (r==0){
// EOF
r=-1;
alarm->poll.revents = POLLHUP;
}
if (r>0){
conn->in->limit+=r;
if (try_send(conn))
process_msp_asap();
// stop reading input when the buffer is full
if (conn->in->limit==conn->in->capacity){
unwatch(alarm);
}
}else{
// EOF? trigger a graceful shutdown
alarm->poll.revents = POLLHUP;
}
}
// stop reading input when the buffer is full
if (conn->in->limit==conn->in->capacity){
alarm->poll.events &= ~POLLIN;
if (alarm->poll.events)
watch(alarm);
else if (is_watching(alarm))
unwatch(alarm);
}
}
if (alarm->poll.revents & POLLOUT) {
@ -380,6 +395,8 @@ static void io_poll(struct sched_ent *alarm)
ssize_t r = write(alarm->poll.fd,
conn->out->bytes+conn->out->position,
data);
if (r < 0)
WARNF_perror("write(%d)", alarm->poll.fd);
if (r > 0)
conn->out->position+=r;
}
@ -387,7 +404,10 @@ static void io_poll(struct sched_ent *alarm)
// if the buffer is empty now, reset it and unwatch the handle
if (conn->out->position==conn->out->limit){
conn->out->limit = conn->out->position = 0;
if (is_watching(alarm))
alarm->poll.events &= ~POLLOUT;
if (alarm->poll.events)
watch(alarm);
else if (is_watching(alarm))
unwatch(alarm);
if (conn->last_state & MSP_STATE_SHUTDOWN_REMOTE)
remote_shutdown(conn);
@ -404,8 +424,12 @@ static void io_poll(struct sched_ent *alarm)
if (alarm->poll.revents & POLLHUP) {
// EOF? trigger a graceful shutdown
unwatch(alarm);
conn->eof=1;
alarm->poll.events &= ~POLLIN;
if (alarm->poll.events)
watch(alarm);
else if (is_watching(alarm))
unwatch(alarm);
if (!conn->in->limit){
local_shutdown(conn);
process_msp_asap();
@ -429,6 +453,7 @@ static void listen_poll(struct sched_ent *alarm)
return;
}
INFOF("- Incoming TCP connection from %s", alloca_socket_address(&addr));
watch(&mdp_sock);
MSP_SOCKET sock = msp_socket(mdp_sock.poll.fd, 0);
if (msp_socket_is_null(sock))
return;
@ -502,7 +527,6 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
mdp_sock.poll.fd = mdp_socket();
if (mdp_sock.poll.fd==-1)
goto end;
watch(&mdp_sock);
set_nonblock(STDIN_FILENO);
set_nonblock(STDOUT_FILENO);
@ -528,6 +552,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
watch(&listen_alarm);
INFOF("- Forwarding from %s to %s:%d", alloca_socket_address(&ip_addr), alloca_tohex_sid_t(addr.sid), addr.port);
}else{
watch(&mdp_sock);
sock = msp_socket(mdp_sock.poll.fd, 0);
once = 1;
struct connection *conn=alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll);
@ -538,6 +563,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
}
}else{
watch(&mdp_sock);
sock = msp_socket(mdp_sock.poll.fd, 0);
msp_set_handler(sock, msp_listener, NULL);
msp_set_local(sock, &addr);
@ -558,6 +584,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
process_msp_asap();
sigIntFlag = 0;
signal(SIGINT, sigIntHandler);
signal(SIGTERM, sigIntHandler);
while(sigIntFlag==0 && fd_poll()){
;