Improve MSP API

Replace struct msp_sock * with typedef MSP_SOCKET
Add new 'flags' arg to msp_socket(), unused for now
msp_send() returns ssize_t number of bytes sent; -1 for error
MSP handler returns size_t; number of bytes consumed
Add MSP_MESSAGE_SIZE definition
Rename msp_set_remote() -> msp_connect()
Rename msp_get_local_adr() -> msp_get_local()
Rename msp_get_remote_adr() -> msp_get_remote()
msp_set_local() takes (const struct *) arg2
msp_connect() takes (const struct *) arg2
void msp_set_local(), msp_connect(), msp_listen(), msp_set_handler()
This commit is contained in:
Andrew Bettison 2014-05-07 15:03:44 +09:30
parent c9e2ed3038
commit 12a0ca4e8f
3 changed files with 282 additions and 147 deletions

View File

@ -40,6 +40,7 @@ struct msp_packet{
time_ms_t sent;
const uint8_t *payload;
size_t len;
size_t offset;
};
#define MAX_WINDOW_SIZE 4
@ -55,40 +56,132 @@ struct msp_window{
struct msp_sock{
struct msp_sock *_next;
struct msp_sock *_prev;
unsigned salt;
int mdp_sock;
msp_state_t state;
struct msp_window tx;
struct msp_window rx;
uint16_t previous_ack;
time_ms_t next_ack;
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context);
MSP_HANDLER *handler;
void *context;
struct mdp_header header;
time_ms_t timeout;
time_ms_t next_action;
};
struct msp_sock *root=NULL;
#define SALT_INVALID 0xdeadbeef
struct msp_sock * msp_socket(int mdp_sock)
int msp_socket_is_valid(MSP_SOCKET handle)
{
struct msp_sock *ret = emalloc_zero(sizeof(struct msp_sock));
ret->mdp_sock = mdp_sock;
ret->state = MSP_STATE_UNINITIALISED;
ret->_next = root;
// TODO set base rtt to ensure that we send the first packet a few times before giving up
ret->tx.base_rtt = ret->tx.rtt = 0xFFFFFFFF;
ret->tx.last_activity = TIME_MS_NEVER_HAS;
ret->rx.last_activity = TIME_MS_NEVER_HAS;
ret->next_action = TIME_MS_NEVER_WILL;
ret->timeout = gettime_ms() + 10000;
ret->previous_ack = 0x7FFF;
if (root)
root->_prev=ret;
root = ret;
return ret;
// TODO Set up temporary SIGSEGV and SIGBUS handlers in case handle.ptr points to unmapped memory
// or is misaligned, which could happen if the handle has never been initialised or free() calls
// munmap(2) on unused areas. That is an O(1) solution that involves a couple of system calls.
// An alternative O(n) solution without system calls would be to scan the socket linked list to
// see if handle.ptr is in it. A third, O(1) solution but O(n) in memory and involving more
// malloc() calls would be to add a new layer of pointer indirection between handles and msp_sock
// structs, and zero the indirect pointer on free().
//
// TODO also perform consistency checks on the _next and _prev pointers (requires SIGSEGV
// and SIGBUS handler in place).
return handle.ptr != NULL && handle.salt == handle.ptr->salt;
}
static inline struct msp_sock * handle_to_sock(const struct msp_handle *handle)
{
assert(handle != NULL);
assert(handle->ptr != NULL);
assert(handle->salt == handle->ptr->salt); // could SEGV is handle has not been initialised
return handle->ptr;
}
static inline struct msp_handle sock_to_handle(struct msp_sock *sock)
{
return (struct msp_handle){ .ptr = sock, .salt = sock->salt };
}
static struct msp_sock *root=NULL;
static unsigned salt_counter = 0;
MSP_SOCKET msp_socket(int mdp_sock, int flags)
{
if (flags != 0) {
WHYF("unsupported flags = %#x", flags);
return MSP_SOCKET_NULL;
}
struct msp_sock *sock = emalloc_zero(sizeof(struct msp_sock));
if (sock == NULL)
return MSP_SOCKET_NULL;
if (++salt_counter == SALT_INVALID)
++salt_counter;
sock->salt = salt_counter;
sock->mdp_sock = mdp_sock;
sock->state = MSP_STATE_UNINITIALISED;
// TODO set base rtt to ensure that we send the first packet a few times before giving up
sock->tx.base_rtt = sock->tx.rtt = 0xFFFFFFFF;
sock->tx.last_activity = TIME_MS_NEVER_HAS;
sock->rx.last_activity = TIME_MS_NEVER_HAS;
sock->next_action = TIME_MS_NEVER_WILL;
sock->timeout = gettime_ms() + 10000;
sock->previous_ack = 0x7FFF;
sock->_next = root;
if (root)
root->_prev = sock;
root = sock;
return sock_to_handle(sock);
}
msp_state_t msp_get_state(MSP_SOCKET handle)
{
return handle_to_sock(&handle)->state;
}
int msp_socket_is_initialising(MSP_SOCKET handle)
{
return msp_socket_is_valid(handle) && msp_get_state(handle) == MSP_STATE_UNINITIALISED;
}
int msp_socket_is_open(MSP_SOCKET handle)
{
if (!msp_socket_is_valid(handle))
return 0;
msp_state_t state = msp_get_state(handle);
return (state != MSP_STATE_UNINITIALISED || handle_to_sock(&handle)->tx.packet_count != 0)
&& !(state & MSP_STATE_CLOSED);
}
int msp_socket_is_closed(MSP_SOCKET handle)
{
return !msp_socket_is_valid(handle) || (msp_get_state(handle) & MSP_STATE_CLOSED) != 0;
}
int msp_socket_is_listening(MSP_SOCKET handle)
{
return msp_socket_is_valid(handle) && (msp_get_state(handle) & MSP_STATE_LISTENING);
}
int msp_socket_is_data(MSP_SOCKET handle)
{
return msp_socket_is_valid(handle)
&& ((msp_get_state(handle) & MSP_STATE_DATAOUT) || handle_to_sock(&handle)->tx.packet_count != 0);
}
int msp_socket_is_connected(MSP_SOCKET handle)
{
return msp_socket_is_valid(handle) && (msp_get_state(handle) & MSP_STATE_RECEIVED_PACKET);
}
int msp_socket_is_shutdown_local(MSP_SOCKET handle)
{
return msp_socket_is_valid(handle) && (msp_get_state(handle) & MSP_STATE_SHUTDOWN_LOCAL) != 0;
}
int msp_socket_is_shutdown_remote(MSP_SOCKET handle)
{
return msp_socket_is_valid(handle) && (msp_get_state(handle) & MSP_STATE_SHUTDOWN_REMOTE) != 0;
}
unsigned msp_socket_count()
{
unsigned i=0;
@ -175,17 +268,21 @@ static void msp_free(struct msp_sock *sock)
sock->_next->_prev = sock->_prev;
// last chance to free other resources
if (sock->handler)
sock->handler(sock, sock->state, NULL, 0, sock->context);
if (sock->handler) {
size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context);
assert(nconsumed == 0);
}
free_all_packets(&sock->tx);
free_all_packets(&sock->rx);
sock->salt = SALT_INVALID; // invalidate all handles that point here
free(sock);
}
void msp_close(struct msp_sock *sock)
void msp_close(MSP_SOCKET handle)
{
struct msp_sock *sock = handle_to_sock(&handle);
// TODO if never sent / received, just free it
sock->state |= MSP_STATE_CLOSED;
}
@ -201,40 +298,34 @@ void msp_close_all(int mdp_sock)
}
}
int msp_set_handler(struct msp_sock *sock,
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context),
void *context)
void msp_set_handler(MSP_SOCKET handle, MSP_HANDLER *handler, void *context)
{
struct msp_sock *sock = handle_to_sock(&handle);
sock->handler = handler;
sock->context = context;
return 0;
}
msp_state_t msp_get_state(struct msp_sock *sock)
{
return sock->state;
}
int msp_set_local(struct msp_sock *sock, struct mdp_sockaddr local)
void msp_set_local(MSP_SOCKET handle, const struct mdp_sockaddr *local)
{
struct msp_sock *sock = handle_to_sock(&handle);
assert(sock->state == MSP_STATE_UNINITIALISED);
sock->header.local = local;
return 0;
sock->header.local = *local;
}
int msp_set_remote(struct msp_sock *sock, struct mdp_sockaddr remote)
void msp_connect(MSP_SOCKET handle, const struct mdp_sockaddr *remote)
{
struct msp_sock *sock = handle_to_sock(&handle);
assert(sock->state == MSP_STATE_UNINITIALISED);
sock->header.remote = remote;
sock->header.remote = *remote;
sock->state|=MSP_STATE_DATAOUT;
// make sure we send a packet soon
sock->next_ack = gettime_ms()+10;
sock->next_action = sock->next_ack;
return 0;
}
int msp_listen(struct msp_sock *sock)
int msp_listen(MSP_SOCKET handle)
{
struct msp_sock *sock = handle_to_sock(&handle);
assert(sock->state == MSP_STATE_UNINITIALISED);
assert(sock->header.local.port);
@ -251,14 +342,17 @@ int msp_listen(struct msp_sock *sock)
return 0;
}
int msp_get_remote_adr(struct msp_sock *sock, struct mdp_sockaddr *remote)
void msp_get_local(MSP_SOCKET handle, struct mdp_sockaddr *local)
{
*remote = sock->header.remote;
return 0;
*local = handle_to_sock(&handle)->header.local;
}
static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags,
const uint8_t *payload, size_t len)
void msp_get_remote(MSP_SOCKET handle, struct mdp_sockaddr *remote)
{
*remote = handle_to_sock(&handle)->header.remote;
}
static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags, const uint8_t *payload, size_t len)
{
struct msp_packet **insert_pos=NULL;
@ -298,6 +392,7 @@ static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags,
packet->seq = seq;
packet->flags = flags;
packet->len = len;
packet->offset = 0;
if (payload && len){
uint8_t *p = emalloc(len);
@ -322,7 +417,7 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
return -1;
}
uint8_t msp_header[5];
uint8_t msp_header[MSP_PAYLOAD_PREAMBLE_SIZE];
msp_header[0]=packet->flags;
@ -417,8 +512,10 @@ static int send_ack(struct msp_sock *sock)
}
// add a packet to the transmit buffer
int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len)
ssize_t msp_send(MSP_SOCKET handle, const uint8_t *payload, size_t len)
{
struct msp_sock *sock = handle_to_sock(&handle);
assert(!(sock->state&MSP_STATE_LISTENING));
assert(sock->header.remote.port);
assert((sock->state & MSP_STATE_SHUTDOWN_LOCAL)==0);
@ -435,11 +532,13 @@ int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len)
// TODO calculate based on congestion window
sock->next_action = gettime_ms();
return 0;
return len;
}
int msp_shutdown(struct msp_sock *sock)
int msp_shutdown(MSP_SOCKET handle)
{
struct msp_sock *sock = handle_to_sock(&handle);
assert(!(sock->state&MSP_STATE_LISTENING));
assert(!(sock->state&MSP_STATE_SHUTDOWN_LOCAL));
if (sock->tx._tail && sock->tx._tail->sent==0){
sock->tx._tail->flags |= FLAG_SHUTDOWN;
@ -484,18 +583,23 @@ static int process_sock(struct msp_sock *sock)
sock->state|=MSP_STATE_SHUTDOWN_REMOTE;
if (sock->handler){
int r = sock->handler(sock, sock->state, packet->payload, packet->len, sock->context);
if (r==-1){
sock->state |= MSP_STATE_CLOSED;
return -1;
}
// keep the packet if the handler refused to accept it.
if (r){
sock->next_action=gettime_ms()+1;
if (packet->len)
assert(packet->offset < packet->len);
else
assert(packet->offset == 0);
size_t len = packet->len - packet->offset;
size_t nconsumed = sock->handler(sock_to_handle(sock), sock->state, packet->payload, len, sock->context);
assert(nconsumed <= len);
// stop calling the handler if nothing was consumed
if (nconsumed == 0 && len != 0)
break;
}
packet->offset += nconsumed;
// keep the packet if the handler has not consumed it all
if (packet->offset < packet->len)
continue;
assert(packet->offset == packet->len);
}
p=p->_next;
sock->rx.next_seq++;
}
@ -538,11 +642,12 @@ static int process_sock(struct msp_sock *sock)
if (sock->next_action > sock->next_ack)
sock->next_action = sock->next_ack;
if (sock->state & MSP_STATE_SHUTDOWN_LOCAL
&& sock->state & MSP_STATE_SHUTDOWN_REMOTE
&& sock->tx.packet_count == 0
&& sock->rx.packet_count == 0
&& sock->previous_ack == sock->rx.next_seq){
if ( (sock->state & MSP_STATE_SHUTDOWN_LOCAL)
&& (sock->state & MSP_STATE_SHUTDOWN_REMOTE)
&& sock->tx.packet_count == 0
&& sock->rx.packet_count == 0
&& sock->previous_ack == sock->rx.next_seq
){
sock->state |= MSP_STATE_CLOSED;
return -1;
}
@ -554,16 +659,12 @@ int msp_processing(time_ms_t *next_action)
{
*next_action=TIME_MS_NEVER_WILL;
struct msp_sock *sock = root;
time_ms_t now = gettime_ms();
while(sock){
if (!(sock->state & MSP_STATE_CLOSED)
&& sock->next_action <= now){
// this might cause the socket to be closed.
if (process_sock(sock)==0){
// remember the time of the next thing we need to do.
if (sock->next_action < *next_action)
*next_action=sock->next_action;
}
if (!(sock->state & MSP_STATE_CLOSED)) {
// this might cause the socket to be closed
// remember the time of the next thing we need to do.
if (process_sock(sock)==0 && sock->next_action < *next_action)
*next_action=sock->next_action;
}else if (sock->next_action < *next_action)
*next_action=sock->next_action;
if (sock->state & MSP_STATE_CLOSED){
@ -624,11 +725,14 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
if (listen && !sock){
// create a new socket for the incoming connection
sock = msp_socket(listen->mdp_sock);
sock->header = *header;
// use the same handler initially
sock->handler = listen->handler;
sock->context = listen->context;
MSP_SOCKET handle = msp_socket(listen->mdp_sock, 0);
sock = handle.ptr;
if (sock) {
sock->header = *header;
// use the same handler initially
sock->handler = listen->handler;
sock->context = listen->context;
}
}
if (!sock){
@ -659,7 +763,7 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
&& !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)){
sock->state|=MSP_STATE_DATAOUT;
if (sock->handler)
sock->handler(sock, sock->state, NULL, 0, sock->context);
sock->handler(sock_to_handle(sock), sock->state, NULL, 0, sock->context);
}
// make sure we attempt to process packets from this sock soon
@ -668,27 +772,23 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
sock->next_action = gettime_ms();
if (len<5)
if (len<MSP_PAYLOAD_PREAMBLE_SIZE)
return 0;
sock->state |= MSP_STATE_RECEIVED_DATA;
uint16_t seq = read_uint16(&payload[3]);
if (add_packet(&sock->rx, seq, flags, &payload[5], len - 5)==1)
if (add_packet(&sock->rx, seq, flags, &payload[MSP_PAYLOAD_PREAMBLE_SIZE], len - MSP_PAYLOAD_PREAMBLE_SIZE)==1)
sock->next_ack = gettime_ms();
return 0;
}
int msp_recv(int mdp_sock)
{
struct mdp_header header;
uint8_t payload[1200];
ssize_t len = mdp_recv(mdp_sock, &header, payload, sizeof(payload));
if (len<0)
if (len == -1)
return -1;
return process_packet(mdp_sock, &header, payload, len);
}

View File

@ -1,5 +1,5 @@
/*
Mesh Streaming Protocol (MSP)
Mesh Streaming Protocol (MSP) API
Copyright (C) 2013-2014 Serval Project Inc.
This program is free software; you can redistribute it and/or
@ -20,56 +20,89 @@
#ifndef __SERVAL_DNA__MSP_CLIENT_H
#define __SERVAL_DNA__MSP_CLIENT_H
#define MSP_STATE_UNINITIALISED 0
#define MSP_STATE_LISTENING (1<<0)
#include "constants.h"
#define MSP_STATE_RECEIVED_DATA (1<<1)
#define MSP_STATE_RECEIVED_PACKET (1<<2)
#ifndef __MSP_CLIENT_INLINE
# if __GNUC__ && !__GNUC_STDC_INLINE__
# define __MSP_CLIENT_INLINE extern inline
# else
# define __MSP_CLIENT_INLINE inline
# endif
#endif
#define MSP_STATE_SHUTDOWN_LOCAL (1<<3)
#define MSP_STATE_SHUTDOWN_REMOTE (1<<4)
#define MSP_PAYLOAD_PREAMBLE_SIZE 5
#define MSP_MESSAGE_SIZE (MDP_MTU - MSP_MESSAGE_PREAMBLE_SIZE)
// this connection is about to be free'd, release any other resources or references to the state
#define MSP_STATE_CLOSED (1<<5)
// something has gone wrong somewhere
#define MSP_STATE_ERROR (1<<6)
// is there space for sending more data?
#define MSP_STATE_DATAOUT (1<<7)
struct msp_sock;
typedef uint16_t msp_state_t;
struct msp_sock;
struct msp_handle {
struct msp_sock *ptr;
unsigned salt;
};
typedef struct msp_handle MSP_SOCKET;
#define MSP_SOCKET_NULL ((MSP_SOCKET){.ptr=NULL,.salt=0})
__MSP_CLIENT_INLINE int msp_socket_is_null(MSP_SOCKET sock) {
return sock.ptr == NULL;
}
int msp_socket_is_valid(MSP_SOCKET);
// socket lifecycle
msp_state_t msp_get_state(MSP_SOCKET sock);
#define MSP_STATE_UNINITIALISED ((msp_state_t) 0)
#define MSP_STATE_LISTENING ((msp_state_t) (1<<0))
#define MSP_STATE_RECEIVED_DATA ((msp_state_t) (1<<1))
#define MSP_STATE_RECEIVED_PACKET ((msp_state_t) (1<<2))
#define MSP_STATE_SHUTDOWN_LOCAL ((msp_state_t) (1<<3))
#define MSP_STATE_SHUTDOWN_REMOTE ((msp_state_t) (1<<4))
// this connection is about to be free'd, release any other resources or references to the state
#define MSP_STATE_CLOSED ((msp_state_t) (1<<5))
// something has gone wrong somewhere
#define MSP_STATE_ERROR ((msp_state_t) (1<<6))
// is there space for sending more data?
#define MSP_STATE_DATAOUT ((msp_state_t) (1<<7))
int msp_socket_is_initialising(MSP_SOCKET);
int msp_socket_is_open(MSP_SOCKET);
int msp_socket_is_closed(MSP_SOCKET);
int msp_socket_is_listening(MSP_SOCKET);
int msp_socket_is_data(MSP_SOCKET);
int msp_socket_is_connected(MSP_SOCKET);
int msp_socket_is_shutdown_local(MSP_SOCKET);
int msp_socket_is_shutdown_remote(MSP_SOCKET);
unsigned msp_socket_count(void);
// allocate a new socket
struct msp_sock * msp_socket(int mdp_sock);
void msp_close(struct msp_sock *sock);
MSP_SOCKET msp_socket(int mdp_sock, int flags);
// initialise a socket
void msp_set_local(MSP_SOCKET sock, const struct mdp_sockaddr *local);
void msp_connect(MSP_SOCKET sock, const struct mdp_sockaddr *remote);
int msp_listen(MSP_SOCKET sock);
// close socket(s)
int msp_shutdown(MSP_SOCKET sock);
void msp_close(MSP_SOCKET);
void msp_close_all(int mdp_sock);
unsigned msp_socket_count();
void msp_debug();
void msp_debug(void);
int msp_set_handler(struct msp_sock *sock,
int (*handler)(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context),
void *context);
typedef size_t MSP_HANDLER(MSP_SOCKET sock, msp_state_t state, const uint8_t *payload, size_t len, void *context);
void msp_set_handler(MSP_SOCKET sock, MSP_HANDLER *handler, void *context);
// the local address is only set when calling msp_listen or msp_send
int msp_set_local(struct msp_sock *sock, struct mdp_sockaddr local);
int msp_set_remote(struct msp_sock *sock, struct mdp_sockaddr remote);
int msp_listen(struct msp_sock *sock);
int msp_get_remote_adr(struct msp_sock *sock, struct mdp_sockaddr *remote);
msp_state_t msp_get_state(struct msp_sock *sock);
int msp_get_mdp_socket(MSP_SOCKET); // returns arg passed to msp_socket() if MSP_SOCKET is valid
void msp_get_local(MSP_SOCKET sock, struct mdp_sockaddr *addr);
void msp_get_remote(MSP_SOCKET sock, struct mdp_sockaddr *addr);
// bind, send data, and potentially shutdown this end of the connection
int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len);
int msp_shutdown(struct msp_sock *sock);
ssize_t msp_send(MSP_SOCKET sock, const uint8_t *payload, size_t len);
// receive and process an incoming packet
int msp_recv(int mdp_sock);
// next_action indicates the next time that msp_processing should be called
int msp_processing(time_ms_t *next_action);

View File

@ -36,7 +36,7 @@ struct buffer{
struct connection{
struct sched_ent alarm_in;
struct sched_ent alarm_out;
struct msp_sock *sock;
MSP_SOCKET sock;
struct buffer *in;
struct buffer *out;
int last_state;
@ -44,7 +44,7 @@ struct connection{
int saw_error=0;
int once =0;
struct msp_sock *listener=NULL;
MSP_SOCKET listener = MSP_SOCKET_NULL;
struct mdp_sockaddr remote_addr;
struct socket_address ip_addr;
@ -98,7 +98,7 @@ const char *service_name=NULL;
mdp_port_t service_port;
static struct connection *alloc_connection(
struct msp_sock *sock,
MSP_SOCKET sock,
int fd_in,
void (*func_in)(struct sched_ent *alarm),
int fd_out,
@ -180,19 +180,19 @@ static void remote_shutdown(struct connection *conn)
struct mdp_sockaddr remote;
if (shutdown(conn->alarm_out.poll.fd, SHUT_WR))
WARNF_perror("shutdown(%d)", conn->alarm_out.poll.fd);
msp_get_remote_adr(conn->sock, &remote);
msp_get_remote(conn->sock, &remote);
INFOF(" - Connection with %s:%d remote shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
}
static void local_shutdown(struct connection *conn)
{
struct mdp_sockaddr remote;
msp_get_remote_adr(conn->sock, &remote);
msp_get_remote(conn->sock, &remote);
msp_shutdown(conn->sock);
INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
}
static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context)
static size_t msp_handler(MSP_SOCKET sock, msp_state_t state, const uint8_t *payload, size_t len, void *context)
{
struct connection *conn = context;
if (state & MSP_STATE_ERROR)
@ -205,7 +205,7 @@ static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *
conn->alarm_out.function(&conn->alarm_out);
}
if (conn->out->capacity < len + conn->out->limit)
return 1;
return 0;
bcopy(payload, &conn->out->bytes[conn->out->limit], len);
conn->out->limit+=len;
@ -224,24 +224,25 @@ static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *
if (state & MSP_STATE_CLOSED){
struct mdp_sockaddr remote;
msp_get_remote_adr(sock, &remote);
msp_get_remote(sock, &remote);
INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port);
conn->sock = NULL;
conn->sock = MSP_SOCKET_NULL;
if (is_watching(&conn->alarm_in))
unwatch(&conn->alarm_in);
if (!is_watching(&conn->alarm_out))
free_connection(conn);
assert(len == 0);
return 0;
}
if (state&MSP_STATE_DATAOUT)
try_send(conn);
return 0;
return len;
}
static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *UNUSED(context))
static size_t msp_listener(MSP_SOCKET sock, msp_state_t state, const uint8_t *payload, size_t len, void *UNUSED(context))
{
if (state & MSP_STATE_ERROR){
WHY("Error listening for incoming connections");
@ -253,11 +254,11 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
}
return 0;
return len;
}
struct mdp_sockaddr remote;
msp_get_remote_adr(sock, &remote);
msp_get_remote(sock, &remote);
INFOF(" - New connection from %s:%d", alloca_tohex_sid_t(remote.sid), remote.port);
int fd_in = STDIN_FILENO;
int fd_out = STDOUT_FILENO;
@ -266,18 +267,18 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t
int fd = esocket(PF_INET, SOCK_STREAM, 0);
if (fd==-1){
msp_close(sock);
return -1;
return 0;
}
if (socket_connect(fd, &ip_addr)==-1){
msp_close(sock);
close(fd);
return -1;
return 0;
}
fd_in = fd_out = fd;
}
struct connection *conn = alloc_connection(sock, fd_in, io_poll, fd_out, io_poll);
if (!conn)
return -1;
return 0;
conn->sock = sock;
watch(&conn->alarm_in);
@ -289,6 +290,7 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t
// stop listening after the first incoming connection
msp_close(listener);
}
assert(len == 0);
return 0;
}
@ -390,9 +392,9 @@ static void io_poll(struct sched_ent *alarm)
if (conn->last_state & MSP_STATE_SHUTDOWN_REMOTE)
remote_shutdown(conn);
}
if (conn->out->limit < conn->out->capacity){
if (conn->sock){
if (!msp_socket_is_null(conn->sock)){
process_msp_asap();
}else{
free_connection(conn);
@ -427,8 +429,8 @@ static void listen_poll(struct sched_ent *alarm)
return;
}
INFOF("- Incoming TCP connection from %s", alloca_socket_address(&addr));
struct msp_sock *sock = msp_socket(mdp_sock.poll.fd);
if (!sock)
MSP_SOCKET sock = msp_socket(mdp_sock.poll.fd, 0);
if (msp_socket_is_null(sock))
return;
struct connection *connection = alloc_connection(sock, fd, io_poll, fd, io_poll);
@ -438,7 +440,7 @@ static void listen_poll(struct sched_ent *alarm)
}
msp_set_handler(sock, msp_handler, connection);
msp_set_remote(sock, remote_addr);
msp_connect(sock, &remote_addr);
process_msp_asap();
if (once){
@ -472,7 +474,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
}
int ret=-1;
struct msp_sock *sock = NULL;
MSP_SOCKET sock = MSP_SOCKET_NULL;
if (service_name){
// listen for service discovery messages
@ -526,19 +528,19 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
watch(&listen_alarm);
INFOF("- Forwarding from %s to %s:%d", alloca_socket_address(&ip_addr), alloca_tohex_sid_t(addr.sid), addr.port);
}else{
sock = msp_socket(mdp_sock.poll.fd);
sock = msp_socket(mdp_sock.poll.fd, 0);
once = 1;
struct connection *conn=alloc_connection(sock, STDIN_FILENO, io_poll, STDOUT_FILENO, io_poll);
if (!conn)
goto end;
msp_set_handler(sock, msp_handler, conn);
msp_set_remote(sock, addr);
msp_connect(sock, &addr);
INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
}
}else{
sock = msp_socket(mdp_sock.poll.fd);
sock = msp_socket(mdp_sock.poll.fd, 0);
msp_set_handler(sock, msp_listener, NULL);
msp_set_local(sock, addr);
msp_set_local(sock, &addr);
// sock will be closed if listen fails
if (msp_listen(sock)==-1)
@ -565,7 +567,7 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
sigIntFlag = 0;
end:
listener=NULL;
listener = MSP_SOCKET_NULL;
if (is_watching(&mdp_sock))
unwatch(&mdp_sock);
if (mdp_sock.poll.fd!=-1){