mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-01-29 15:43:56 +00:00
Pipe file contents across msp connection
This commit is contained in:
parent
3a533187b7
commit
741442c54c
183
msp_client.c
183
msp_client.c
@ -21,7 +21,7 @@ struct msp_packet{
|
||||
size_t len;
|
||||
};
|
||||
|
||||
#define MAX_WINDOW_SIZE 64
|
||||
#define MAX_WINDOW_SIZE 4
|
||||
struct msp_window{
|
||||
int packet_count;
|
||||
uint32_t base_rtt;
|
||||
@ -58,7 +58,9 @@ struct msp_sock * msp_socket(int mdp_sock)
|
||||
ret->tx.base_rtt = ret->tx.rtt = 0xFFFFFFFF;
|
||||
ret->tx.last_activity = TIME_NEVER_HAS;
|
||||
ret->rx.last_activity = TIME_NEVER_HAS;
|
||||
ret->next_action = TIME_NEVER_WILL;
|
||||
ret->timeout = gettime_ms() + 10000;
|
||||
ret->previous_ack = 0xFFFF;
|
||||
if (root)
|
||||
root->_prev=ret;
|
||||
root = ret;
|
||||
@ -71,14 +73,19 @@ static void free_all_packets(struct msp_window *window)
|
||||
while(p){
|
||||
struct msp_packet *free_me=p;
|
||||
p=p->_next;
|
||||
free((void *)free_me->payload);
|
||||
if (free_me->payload)
|
||||
free((void *)free_me->payload);
|
||||
free(free_me);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_acked_packets(struct msp_window *window, uint16_t seq)
|
||||
{
|
||||
if (!window->_head)
|
||||
return;
|
||||
|
||||
struct msp_packet *p = window->_head;
|
||||
|
||||
uint32_t rtt=0;
|
||||
time_ms_t now = gettime_ms();
|
||||
|
||||
@ -87,7 +94,8 @@ static void free_acked_packets(struct msp_window *window, uint16_t seq)
|
||||
rtt = now - p->sent;
|
||||
struct msp_packet *free_me=p;
|
||||
p=p->_next;
|
||||
free((void *)free_me->payload);
|
||||
if (free_me->payload)
|
||||
free((void *)free_me->payload);
|
||||
free(free_me);
|
||||
window->packet_count--;
|
||||
}
|
||||
@ -96,12 +104,13 @@ static void free_acked_packets(struct msp_window *window, uint16_t seq)
|
||||
window->rtt = rtt;
|
||||
if (window->base_rtt > rtt)
|
||||
window->base_rtt = rtt;
|
||||
DEBUGF("RTT %u, base %u", rtt, window->base_rtt);
|
||||
}
|
||||
if (!p)
|
||||
window->_tail = NULL;
|
||||
}
|
||||
|
||||
void msp_close(struct msp_sock *sock)
|
||||
static void msp_free(struct msp_sock *sock)
|
||||
{
|
||||
sock->state |= MSP_STATE_CLOSED;
|
||||
|
||||
@ -115,22 +124,26 @@ void msp_close(struct msp_sock *sock)
|
||||
root=sock->_next;
|
||||
if (sock->_next)
|
||||
sock->_next->_prev = sock->_prev;
|
||||
|
||||
|
||||
free_all_packets(&sock->tx);
|
||||
free_all_packets(&sock->rx);
|
||||
|
||||
free(sock);
|
||||
}
|
||||
|
||||
void msp_close(struct msp_sock *sock)
|
||||
{
|
||||
sock->state |= MSP_STATE_CLOSED;
|
||||
}
|
||||
|
||||
void msp_close_all(int mdp_sock)
|
||||
{
|
||||
struct msp_sock **p = &root;
|
||||
while(*p){
|
||||
if ((*p)->mdp_sock == mdp_sock){
|
||||
msp_close(*p);
|
||||
}else{
|
||||
p=&(*p)->_next;
|
||||
}
|
||||
struct msp_sock *p = root;
|
||||
while(p){
|
||||
struct msp_sock *sock=p;
|
||||
p=p->_next;
|
||||
if (sock->mdp_sock == mdp_sock)
|
||||
msp_free(sock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,14 +161,6 @@ msp_state_t msp_get_state(struct msp_sock *sock)
|
||||
return sock->state;
|
||||
}
|
||||
|
||||
void msp_set_watch(struct msp_sock *sock, msp_state_t flags)
|
||||
{
|
||||
assert(flags & ~(MSP_STATE_POLLIN|MSP_STATE_POLLOUT));
|
||||
// clear any existing poll bits, and set the requested ones
|
||||
sock->state &= ~(MSP_STATE_POLLIN|MSP_STATE_POLLOUT);
|
||||
sock->state |= flags;
|
||||
}
|
||||
|
||||
int msp_set_local(struct msp_sock *sock, struct mdp_sockaddr local)
|
||||
{
|
||||
assert(sock->state == MSP_STATE_UNINITIALISED);
|
||||
@ -167,6 +172,9 @@ int msp_set_remote(struct msp_sock *sock, struct mdp_sockaddr remote)
|
||||
{
|
||||
assert(sock->state == MSP_STATE_UNINITIALISED);
|
||||
sock->header.remote = remote;
|
||||
sock->state|=MSP_STATE_DATAOUT;
|
||||
// make sure we send a packet soon
|
||||
sock->next_action = gettime_ms()+10;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -179,8 +187,7 @@ int msp_listen(struct msp_sock *sock)
|
||||
sock->header.flags |= MDP_FLAG_BIND;
|
||||
|
||||
if (mdp_send(sock->mdp_sock, &sock->header, NULL, 0)==-1){
|
||||
sock->state|=MSP_STATE_ERROR;
|
||||
msp_close(sock);
|
||||
sock->state|=MSP_STATE_ERROR|MSP_STATE_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -197,42 +204,40 @@ int msp_get_remote_adr(struct msp_sock *sock, struct mdp_sockaddr *remote)
|
||||
static int add_packet(struct msp_window *window, uint16_t seq, uint8_t flags,
|
||||
const uint8_t *payload, size_t len)
|
||||
{
|
||||
struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet));
|
||||
if (!packet)
|
||||
return -1;
|
||||
|
||||
struct msp_packet **insert_pos=NULL;
|
||||
|
||||
if (!window->_head){
|
||||
window->_head = window->_tail = packet;
|
||||
insert_pos = &window->_head;
|
||||
}else{
|
||||
if (window->_tail->seq == seq){
|
||||
// ignore duplicate packets
|
||||
free(packet);
|
||||
return 0;
|
||||
}else if (compare_wrapped_uint16(window->_tail->seq, seq)<0){
|
||||
if (compare_wrapped_uint16(window->_head->seq, seq)>0){
|
||||
// this is ambiguous
|
||||
free(packet);
|
||||
return WHYF("%04x is both < tail (%04x) and > head (%04x)", seq, window->_tail->seq, window->_head->seq);
|
||||
}
|
||||
|
||||
window->_tail->_next = packet;
|
||||
window->_tail = packet;
|
||||
insert_pos = &window->_tail->_next;
|
||||
}else{
|
||||
struct msp_packet **pos = &window->_head;
|
||||
while(compare_wrapped_uint16((*pos)->seq, seq)<0){
|
||||
if ((*pos)->seq == seq){
|
||||
// ignore duplicate packets
|
||||
free(packet);
|
||||
return 0;
|
||||
}
|
||||
pos = &(*pos)->_next;
|
||||
insert_pos = &window->_head;
|
||||
while(compare_wrapped_uint16((*insert_pos)->seq, seq)<0)
|
||||
insert_pos = &(*insert_pos)->_next;
|
||||
if ((*insert_pos)->seq == seq){
|
||||
// ignore duplicate packets
|
||||
return 0;
|
||||
}
|
||||
(*pos)->_next = packet;
|
||||
packet->_next = (*pos);
|
||||
*pos = packet;
|
||||
}
|
||||
}
|
||||
|
||||
struct msp_packet *packet = emalloc_zero(sizeof(struct msp_packet));
|
||||
if (!packet)
|
||||
return -1;
|
||||
|
||||
packet->_next = (*insert_pos);
|
||||
*insert_pos = packet;
|
||||
if (!packet->_next)
|
||||
window->_tail = packet;
|
||||
packet->added = gettime_ms();
|
||||
packet->seq = seq;
|
||||
packet->flags = flags;
|
||||
@ -272,7 +277,6 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
|
||||
write_uint16(&msp_header[3], packet->seq);
|
||||
sock->previous_ack = sock->rx.next_seq;
|
||||
|
||||
DEBUGF("Sending packet flags %d, ack %d, seq %d", packet->flags, sock->rx.next_seq, packet->seq);
|
||||
struct fragmented_data data={
|
||||
.fragment_count=3,
|
||||
.iov={
|
||||
@ -297,9 +301,12 @@ static int msp_send_packet(struct msp_sock *sock, struct msp_packet *packet)
|
||||
|
||||
ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data);
|
||||
if (r==-1){
|
||||
if (errno==11)
|
||||
return 1;
|
||||
msp_close_all(sock->mdp_sock);
|
||||
return -1;
|
||||
}
|
||||
DEBUGF("Sent packet seq %02x len %zd (acked %02x)", packet->seq, packet->len, sock->rx.next_seq);
|
||||
sock->tx.last_activity = packet->sent = gettime_ms();
|
||||
return 0;
|
||||
}
|
||||
@ -338,9 +345,11 @@ static int send_ack(struct msp_sock *sock)
|
||||
|
||||
ssize_t r = send_message(sock->mdp_sock, &daemon_addr, &data);
|
||||
if (r==-1){
|
||||
msp_close_all(sock->mdp_sock);
|
||||
if (errno!=11)
|
||||
msp_close_all(sock->mdp_sock);
|
||||
return -1;
|
||||
}
|
||||
DEBUGF("Sent packet (acked %02x)", sock->rx.next_seq);
|
||||
sock->tx.last_activity = gettime_ms();
|
||||
return 0;
|
||||
}
|
||||
@ -358,7 +367,8 @@ int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len)
|
||||
return -1;
|
||||
|
||||
sock->tx.next_seq++;
|
||||
|
||||
if (sock->tx.packet_count>=MAX_WINDOW_SIZE)
|
||||
sock->state&=~MSP_STATE_DATAOUT;
|
||||
// make sure we attempt to process packets from this sock soon
|
||||
// TODO calculate based on congestion window
|
||||
sock->next_action = gettime_ms();
|
||||
@ -376,6 +386,9 @@ int msp_shutdown(struct msp_sock *sock)
|
||||
sock->tx.next_seq++;
|
||||
}
|
||||
sock->state|=MSP_STATE_SHUTDOWN_LOCAL;
|
||||
sock->state&=~MSP_STATE_DATAOUT;
|
||||
// make sure we send a packet soon
|
||||
sock->next_action = gettime_ms();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -384,17 +397,16 @@ static int process_sock(struct msp_sock *sock)
|
||||
time_ms_t now = gettime_ms();
|
||||
|
||||
if (sock->timeout < now){
|
||||
msp_close(sock);
|
||||
sock->state |= MSP_STATE_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
sock->next_action = sock->timeout;
|
||||
|
||||
struct msp_packet *p;
|
||||
|
||||
// deliver packets that have now arrived in order
|
||||
p = sock->rx._head;
|
||||
if (p)
|
||||
DEBUGF("Seq %d vs %d", p->seq, sock->rx.next_seq);
|
||||
|
||||
// TODO ... ? (sock->state & MSP_STATE_POLLIN)
|
||||
while(p && p->seq == sock->rx.next_seq){
|
||||
@ -404,10 +416,17 @@ static int process_sock(struct msp_sock *sock)
|
||||
if (packet->flags & FLAG_SHUTDOWN)
|
||||
sock->state|=MSP_STATE_SHUTDOWN_REMOTE;
|
||||
|
||||
if (sock->handler
|
||||
&& sock->handler(sock, sock->state, packet->payload, packet->len, sock->context)==-1){
|
||||
if (sock->handler && packet->payload){
|
||||
int r = sock->handler(sock, sock->state, packet->payload, packet->len, sock->context);
|
||||
if (r==-1){
|
||||
sock->state |= MSP_STATE_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
// keep the packet if the handler refused to accept it.
|
||||
break;
|
||||
if (r){
|
||||
sock->next_action=gettime_ms()+1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
p=p->_next;
|
||||
@ -418,31 +437,42 @@ static int process_sock(struct msp_sock *sock)
|
||||
// transmit packets that can now be sent
|
||||
p = sock->tx._head;
|
||||
while(p){
|
||||
if (p->sent==0 || p->sent + sock->tx.rtt*2 < now){
|
||||
if (p->sent==0 || p->sent + 1500 < now){
|
||||
if (!sock->header.local.port){
|
||||
if (sock->header.flags & MDP_FLAG_BIND)
|
||||
// wait until we have heard back from the daemon with our port number before sending another packet.
|
||||
break;
|
||||
sock->header.flags |= MDP_FLAG_BIND;
|
||||
}
|
||||
if (msp_send_packet(sock, p)==-1)
|
||||
int r = msp_send_packet(sock, p);
|
||||
if (r==-1)
|
||||
return -1;
|
||||
if (r)
|
||||
break;
|
||||
}
|
||||
if (sock->next_action > p->sent + sock->tx.rtt*2)
|
||||
sock->next_action = p->sent + sock->tx.rtt*2;
|
||||
if (sock->next_action > p->sent + 1500)
|
||||
sock->next_action = p->sent + 1500;
|
||||
p=p->_next;
|
||||
}
|
||||
|
||||
time_ms_t next_packet = sock->tx.last_activity + sock->tx.rtt*2;
|
||||
|
||||
// should we send an ack now without sending a payload?
|
||||
if (sock->previous_ack != sock->rx.next_seq){
|
||||
if (send_ack(sock))
|
||||
if (sock->previous_ack != sock->rx.next_seq || now > next_packet){
|
||||
int r = send_ack(sock);
|
||||
if (r==-1)
|
||||
return -1;
|
||||
next_packet = sock->tx.last_activity + sock->tx.rtt*2;
|
||||
}
|
||||
|
||||
if ((sock->state & (MSP_STATE_SHUTDOWN_LOCAL|MSP_STATE_SHUTDOWN_REMOTE)) == (MSP_STATE_SHUTDOWN_LOCAL|MSP_STATE_SHUTDOWN_REMOTE)
|
||||
if (sock->next_action > next_packet)
|
||||
sock->next_action = next_packet;
|
||||
|
||||
if (sock->state & MSP_STATE_SHUTDOWN_LOCAL
|
||||
&& sock->state & MSP_STATE_SHUTDOWN_REMOTE
|
||||
&& sock->tx.packet_count == 0
|
||||
&& sock->rx.packet_count == 0){
|
||||
msp_close(sock);
|
||||
sock->state |= MSP_STATE_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -451,33 +481,36 @@ static int process_sock(struct msp_sock *sock)
|
||||
|
||||
int msp_processing(time_ms_t *next_action)
|
||||
{
|
||||
*next_action=0;
|
||||
*next_action=TIME_NEVER_WILL;
|
||||
struct msp_sock *sock = root;
|
||||
time_ms_t now = gettime_ms();
|
||||
while(sock){
|
||||
struct msp_sock *s=sock;
|
||||
sock = s->_next;
|
||||
|
||||
if (s->next_action && s->next_action <= now){
|
||||
if (!(sock->state & MSP_STATE_CLOSED)
|
||||
&& sock->next_action <= now){
|
||||
// this might cause the socket to be closed.
|
||||
if (process_sock(s)==0){
|
||||
if (process_sock(sock)==0){
|
||||
// remember the time of the next thing we need to do.
|
||||
if (s->next_action!=0 && s->next_action < *next_action)
|
||||
*next_action=s->next_action;
|
||||
if (sock->next_action < *next_action)
|
||||
*next_action=sock->next_action;
|
||||
}
|
||||
}
|
||||
if (sock->state & MSP_STATE_CLOSED){
|
||||
struct msp_sock *s = sock->_next;
|
||||
msp_free(sock);
|
||||
sock=s;
|
||||
}else{
|
||||
sock = sock->_next;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t *payload, size_t len)
|
||||
{
|
||||
DEBUGF("packet from %s:%d", alloca_tohex_sid_t(header->remote.sid), header->remote.port);
|
||||
|
||||
// any kind of error reported by the daemon, close all related msp connections
|
||||
if (header->flags & MDP_FLAG_ERROR){
|
||||
msp_close_all(mdp_sock);
|
||||
return 0;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// find or create mdp_sock...
|
||||
@ -493,6 +526,7 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
|
||||
s->header.local = header->local;
|
||||
s->header.flags &= ~MDP_FLAG_BIND;
|
||||
DEBUGF("Bound to %s:%d", alloca_tohex_sid_t(header->local.sid), header->local.port);
|
||||
s->next_action = gettime_ms();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -540,9 +574,18 @@ static int process_packet(int mdp_sock, struct mdp_header *header, const uint8_t
|
||||
uint16_t ack_seq = read_uint16(&payload[1]);
|
||||
// release acknowledged packets
|
||||
free_acked_packets(&sock->tx, ack_seq);
|
||||
|
||||
// TODO if their ack seq has not advanced, we may need to hurry up and retransmit a packet
|
||||
}
|
||||
|
||||
if (sock->tx.packet_count < MAX_WINDOW_SIZE
|
||||
&& !(sock->state & MSP_STATE_DATAOUT)
|
||||
&& !(sock->state & MSP_STATE_SHUTDOWN_LOCAL)){
|
||||
sock->state|=MSP_STATE_DATAOUT;
|
||||
if (sock->handler)
|
||||
sock->handler(sock, sock->state, NULL, 0, sock->context);
|
||||
}
|
||||
|
||||
// make sure we attempt to process packets from this sock soon
|
||||
// TODO calculate based on congestion window
|
||||
sock->state |= MSP_STATE_RECEIVED_PACKET;
|
||||
|
12
msp_client.h
12
msp_client.h
@ -16,15 +16,8 @@
|
||||
// something has gone wrong somewhere
|
||||
#define MSP_STATE_ERROR (1<<6)
|
||||
|
||||
// does the client want to be interupted to reading data?
|
||||
#define MSP_STATE_POLLIN (1<<7)
|
||||
// does the client want to know when data can be written?
|
||||
#define MSP_STATE_POLLOUT (1<<8)
|
||||
|
||||
// is there some data to read?
|
||||
#define MSP_STATE_DATAIN (1<<9)
|
||||
// is there space for sending more data?
|
||||
#define MSP_STATE_DATAOUT (1<<10)
|
||||
#define MSP_STATE_DATAOUT (1<<7)
|
||||
|
||||
|
||||
struct msp_sock;
|
||||
@ -48,9 +41,6 @@ int msp_listen(struct msp_sock *sock);
|
||||
int msp_get_remote_adr(struct msp_sock *sock, struct mdp_sockaddr *remote);
|
||||
msp_state_t msp_get_state(struct msp_sock *sock);
|
||||
|
||||
// which events are you interested in handling?
|
||||
void msp_set_watch(struct msp_sock *sock, msp_state_t flags);
|
||||
|
||||
// bind, send data, and potentially shutdown this end of the connection
|
||||
int msp_send(struct msp_sock *sock, const uint8_t *payload, size_t len);
|
||||
int msp_shutdown(struct msp_sock *sock);
|
||||
|
119
msp_proxy.c
119
msp_proxy.c
@ -21,6 +21,7 @@ struct connection{
|
||||
struct connection *stdio_connection=NULL;
|
||||
struct msp_sock *listener=NULL;
|
||||
|
||||
static int try_send();
|
||||
static void msp_poll(struct sched_ent *alarm);
|
||||
static void stdin_poll(struct sched_ent *alarm);
|
||||
static void stdout_poll(struct sched_ent *alarm);
|
||||
@ -85,20 +86,24 @@ static void free_connection(struct connection *conn)
|
||||
static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *context)
|
||||
{
|
||||
struct connection *conn = context;
|
||||
DEBUGF("Handler, state %d, payload len %zd", state, len);
|
||||
|
||||
if (payload && len){
|
||||
dump("incoming payload", payload, len);
|
||||
if (conn->out->capacity < len + conn->out->limit){
|
||||
DEBUGF("Insufficient space len %zd, capacity %zd, limit %zd", len, conn->out->capacity, conn->out->limit);
|
||||
return -1;
|
||||
}
|
||||
if (conn->out->limit==0){
|
||||
watch(&stdout_alarm);
|
||||
INFOF("Watching stdout");
|
||||
if (conn->out->limit){
|
||||
// attempt to write immediately
|
||||
stdout_alarm.poll.revents=POLLOUT;
|
||||
stdout_poll(&stdout_alarm);
|
||||
}
|
||||
if (conn->out->capacity < len + conn->out->limit)
|
||||
return 1;
|
||||
|
||||
bcopy(payload, &conn->out->bytes[conn->out->limit], len);
|
||||
conn->out->limit+=len;
|
||||
// attempt to write immediately
|
||||
if (!is_watching(&stdout_alarm))
|
||||
watch(&stdout_alarm);
|
||||
|
||||
stdout_alarm.poll.revents=POLLOUT;
|
||||
stdout_poll(&stdout_alarm);
|
||||
}
|
||||
|
||||
if (state & MSP_STATE_CLOSED){
|
||||
@ -106,35 +111,31 @@ static int msp_handler(struct msp_sock *sock, msp_state_t state, const uint8_t *
|
||||
msp_get_remote_adr(sock, &remote);
|
||||
INFOF(" - Connection with %s:%d closed", alloca_tohex_sid_t(remote.sid), remote.port);
|
||||
|
||||
if (conn == stdio_connection){
|
||||
stdio_connection->sock=NULL;
|
||||
}else{
|
||||
conn->sock = NULL;
|
||||
|
||||
if (conn != stdio_connection)
|
||||
free_connection(conn);
|
||||
}
|
||||
|
||||
unschedule(&mdp_sock);
|
||||
|
||||
if (mdp_sock.poll.events){
|
||||
if (is_watching(&mdp_sock))
|
||||
unwatch(&mdp_sock);
|
||||
mdp_sock.poll.events=0;
|
||||
INFOF("Unwatching mdp socket");
|
||||
}
|
||||
mdp_close(mdp_sock.poll.fd);
|
||||
mdp_sock.poll.fd=-1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (state & MSP_STATE_SHUTDOWN_REMOTE){
|
||||
struct mdp_sockaddr remote;
|
||||
msp_get_remote_adr(sock, &remote);
|
||||
INFOF(" - Connection with %s:%d remote shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
|
||||
if (state&MSP_STATE_DATAOUT){
|
||||
try_send();
|
||||
if (stdio_connection->in->limit<stdio_connection->in->capacity && !is_watching(&stdin_alarm))
|
||||
watch(&stdin_alarm);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t *payload, size_t len, void *UNUSED(context))
|
||||
{
|
||||
if (state & MSP_STATE_CLOSED)
|
||||
return 0;
|
||||
|
||||
struct mdp_sockaddr remote;
|
||||
msp_get_remote_adr(sock, &remote);
|
||||
INFOF(" - New connection from %s:%d", alloca_tohex_sid_t(remote.sid), remote.port);
|
||||
@ -146,7 +147,6 @@ static int msp_listener(struct msp_sock *sock, msp_state_t state, const uint8_t
|
||||
if (!stdio_connection){
|
||||
stdio_connection=conn;
|
||||
watch(&stdin_alarm);
|
||||
INFOF("Watching stdin");
|
||||
}
|
||||
msp_set_handler(sock, msp_handler, conn);
|
||||
if (payload)
|
||||
@ -176,13 +176,24 @@ static void msp_poll(struct sched_ent *alarm)
|
||||
}
|
||||
}
|
||||
|
||||
static int try_send()
|
||||
{
|
||||
if (!stdio_connection->in->limit)
|
||||
return 0;
|
||||
if (msp_send(stdio_connection->sock, stdio_connection->in->bytes, stdio_connection->in->limit)==-1)
|
||||
return 0;
|
||||
|
||||
// if this packet was acceptted, clear the read buffer
|
||||
stdio_connection->in->limit = stdio_connection->in->position = 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void stdin_poll(struct sched_ent *alarm)
|
||||
{
|
||||
INFOF("Poll stdin, %d", alarm->poll.revents);
|
||||
if (alarm->poll.revents & POLLIN) {
|
||||
if (!stdio_connection){
|
||||
unwatch(alarm);
|
||||
INFOF("Unwatching stdin");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -191,30 +202,25 @@ static void stdin_poll(struct sched_ent *alarm)
|
||||
ssize_t r = read(alarm->poll.fd,
|
||||
stdio_connection->in->bytes + stdio_connection->in->limit,
|
||||
remaining);
|
||||
INFOF("Read %zd from stdin %d, %d", r, alarm->poll.revents, errno);
|
||||
if (r>0){
|
||||
dump("stdin",stdio_connection->in->bytes + stdio_connection->in->limit, r);
|
||||
|
||||
stdio_connection->in->limit+=r;
|
||||
|
||||
if (msp_send(stdio_connection->sock, stdio_connection->in->bytes, stdio_connection->in->limit)!=-1){
|
||||
// if this packet was acceptted, clear the read buffer
|
||||
stdio_connection->in->limit = stdio_connection->in->position = 0;
|
||||
if (try_send()){
|
||||
// attempt to process this socket asap
|
||||
mdp_sock.alarm = gettime_ms();
|
||||
mdp_sock.deadline = mdp_sock.alarm+10;
|
||||
unschedule(&mdp_sock);
|
||||
schedule(&mdp_sock);
|
||||
}
|
||||
|
||||
// stop reading input when the buffer is full
|
||||
if (stdio_connection->in->limit==stdio_connection->in->capacity){
|
||||
unwatch(alarm);
|
||||
INFOF("Unwatching stdin");
|
||||
}
|
||||
}else{
|
||||
// EOF, just trigger our error handler
|
||||
alarm->poll.revents|=POLLERR;
|
||||
if (stdio_connection->in->limit)
|
||||
unwatch(alarm);
|
||||
else
|
||||
// EOF and no data in the buffer, just trigger our error handler
|
||||
alarm->poll.revents|=POLLERR;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -224,7 +230,8 @@ static void stdin_poll(struct sched_ent *alarm)
|
||||
struct mdp_sockaddr remote;
|
||||
msp_get_remote_adr(stdio_connection->sock, &remote);
|
||||
msp_shutdown(stdio_connection->sock);
|
||||
unwatch(alarm);
|
||||
if (is_watching(alarm))
|
||||
unwatch(alarm);
|
||||
INFOF(" - Connection with %s:%d local shutdown", alloca_tohex_sid_t(remote.sid), remote.port);
|
||||
// attempt to process this socket asap
|
||||
mdp_sock.alarm = gettime_ms();
|
||||
@ -238,8 +245,8 @@ static void stdout_poll(struct sched_ent *alarm)
|
||||
{
|
||||
if (alarm->poll.revents & POLLOUT) {
|
||||
if (!stdio_connection){
|
||||
unwatch(alarm);
|
||||
INFOF("Unwatching stdout");
|
||||
if (is_watching(alarm))
|
||||
unwatch(alarm);
|
||||
return;
|
||||
}
|
||||
// try to write some data
|
||||
@ -248,7 +255,6 @@ static void stdout_poll(struct sched_ent *alarm)
|
||||
ssize_t r = write(alarm->poll.fd,
|
||||
stdio_connection->out->bytes+stdio_connection->out->position,
|
||||
data);
|
||||
INFOF("Wrote %zd to stdout", r);
|
||||
if (r > 0)
|
||||
stdio_connection->out->position+=r;
|
||||
}
|
||||
@ -257,19 +263,22 @@ static void stdout_poll(struct sched_ent *alarm)
|
||||
if (stdio_connection->out->position==stdio_connection->out->limit){
|
||||
stdio_connection->out->limit=0;
|
||||
stdio_connection->out->position=0;
|
||||
unwatch(alarm);
|
||||
INFOF("Unwatching stdout");
|
||||
if (is_watching(alarm))
|
||||
unwatch(alarm);
|
||||
}
|
||||
|
||||
if (stdio_connection->out->limit < stdio_connection->out->capacity){
|
||||
// TODO try to get more data from the socket
|
||||
// make sure we try to process this socket soon for more data
|
||||
mdp_sock.alarm = gettime_ms();
|
||||
mdp_sock.deadline = mdp_sock.alarm+10;
|
||||
unschedule(&mdp_sock);
|
||||
schedule(&mdp_sock);
|
||||
}
|
||||
}
|
||||
|
||||
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
|
||||
unwatch(alarm);
|
||||
INFOF("Unwatching stdout");
|
||||
// Um, quit?
|
||||
if (is_watching(alarm))
|
||||
unwatch(alarm);
|
||||
}
|
||||
}
|
||||
|
||||
@ -300,7 +309,6 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
|
||||
goto end;
|
||||
mdp_sock.poll.events = POLLIN;
|
||||
watch(&mdp_sock);
|
||||
INFOF("Watching mdp socket");
|
||||
|
||||
set_nonblock(STDIN_FILENO);
|
||||
set_nonblock(STDOUT_FILENO);
|
||||
@ -319,11 +327,10 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
|
||||
stdio_connection->sock = sock;
|
||||
msp_set_handler(sock, msp_handler, stdio_connection);
|
||||
msp_set_remote(sock, addr);
|
||||
INFOF("Set remote %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
|
||||
INFOF("- Connecting to %s:%d", alloca_tohex_sid_t(addr.sid), addr.port);
|
||||
|
||||
// note we only watch these stdio handles when we have space / bytes in our buffers
|
||||
watch(&stdin_alarm);
|
||||
INFOF("Watching stdin");
|
||||
}else{
|
||||
msp_set_handler(sock, msp_listener, NULL);
|
||||
msp_set_local(sock, addr);
|
||||
@ -336,6 +343,10 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
|
||||
INFOF(" - Listening on port %d", addr.port);
|
||||
}
|
||||
|
||||
// run msp_processing once to init alarm timer
|
||||
mdp_sock.poll.revents=0;
|
||||
msp_poll(&mdp_sock);
|
||||
|
||||
while(fd_poll()){
|
||||
;
|
||||
}
|
||||
@ -343,12 +354,10 @@ int app_msp_connection(const struct cli_parsed *parsed, struct cli_context *UNUS
|
||||
|
||||
end:
|
||||
listener=NULL;
|
||||
if (is_watching(&mdp_sock))
|
||||
unwatch(&mdp_sock);
|
||||
if (mdp_sock.poll.fd>=0){
|
||||
msp_close_all(mdp_sock.poll.fd);
|
||||
if (mdp_sock.poll.events){
|
||||
unwatch(&mdp_sock);
|
||||
INFOF("Unwatching mdp socket");
|
||||
}
|
||||
mdp_close(mdp_sock.poll.fd);
|
||||
}
|
||||
unschedule(&mdp_sock);
|
||||
|
@ -784,7 +784,8 @@ static int overlay_send_frame(
|
||||
}
|
||||
if (config.debug.mdprequests) {
|
||||
DEBUGF("Send frame %zu bytes", ob_position(plaintext));
|
||||
dump("Frame plaintext", ob_ptr(plaintext), ob_position(plaintext));
|
||||
if (config.debug.verbose)
|
||||
dump("Frame plaintext", ob_ptr(plaintext), ob_position(plaintext));
|
||||
}
|
||||
|
||||
/* Work out the disposition of the frame-> For now we are only worried
|
||||
|
62
tests/msp
62
tests/msp
@ -42,7 +42,7 @@ setup_hello() {
|
||||
start_servald_instances +A +B
|
||||
}
|
||||
server_hello() {
|
||||
executeOk_servald --timeout=5 msp listen 512 <<EOF
|
||||
executeOk_servald --timeout=20 msp listen 512 <<EOF
|
||||
Hello from the server
|
||||
EOF
|
||||
assertStdoutGrep --matches=1 "^Hello from the client$"
|
||||
@ -52,11 +52,69 @@ test_hello() {
|
||||
fork server_hello
|
||||
wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log"
|
||||
set_instance +B
|
||||
executeOk_servald --timeout=7 msp connect $SIDA 512 <<EOF
|
||||
executeOk_servald --timeout=20 msp connect $SIDA 512 <<EOF
|
||||
Hello from the client
|
||||
EOF
|
||||
assertStdoutGrep --matches=1 "^Hello from the server$"
|
||||
fork_wait_all
|
||||
}
|
||||
|
||||
doc_client_no_data="Client connection with no data"
|
||||
setup_client_no_data() {
|
||||
setup_servald
|
||||
assert_no_servald_processes
|
||||
create_file file1 64000
|
||||
foreach_instance +A +B create_single_identity
|
||||
foreach_instance +A +B \
|
||||
executeOk_servald config \
|
||||
set debug.mdprequests on \
|
||||
set log.console.level DEBUG \
|
||||
set log.console.show_time on
|
||||
start_servald_instances +A +B
|
||||
}
|
||||
server_client_no_data() {
|
||||
executeOk_servald --timeout=20 msp listen 512 <file1
|
||||
tfw_cat --stderr
|
||||
}
|
||||
test_client_no_data() {
|
||||
set_instance +A
|
||||
fork server_client_no_data
|
||||
wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log"
|
||||
set_instance +B
|
||||
executeOk_servald --timeout=20 msp connect $SIDA 512 <<EOF
|
||||
EOF
|
||||
tfw_cat --stderr
|
||||
assert diff file1 "$TFWSTDOUT"
|
||||
fork_wait_all
|
||||
}
|
||||
|
||||
doc_server_no_data="Server sends no data"
|
||||
setup_server_no_data() {
|
||||
setup_servald
|
||||
assert_no_servald_processes
|
||||
create_file file1 64000
|
||||
foreach_instance +A +B create_single_identity
|
||||
foreach_instance +A +B \
|
||||
executeOk_servald config \
|
||||
set debug.mdprequests on \
|
||||
set log.console.level DEBUG \
|
||||
set log.console.show_time on
|
||||
start_servald_instances +A +B
|
||||
}
|
||||
server_server_no_data() {
|
||||
executeOk_servald --timeout=20 msp listen 512 <<EOF
|
||||
EOF
|
||||
tfw_cat --stderr
|
||||
assert diff file1 "$TFWSTDOUT"
|
||||
}
|
||||
test_server_no_data() {
|
||||
set_instance +A
|
||||
fork server_server_no_data
|
||||
wait_until grep "Bind MDP $SIDA:512" "$instance_servald_log"
|
||||
set_instance +B
|
||||
executeOk_servald --timeout=20 msp connect $SIDA 512 <file1
|
||||
tfw_cat --stderr
|
||||
fork_wait_all
|
||||
}
|
||||
|
||||
runTests "$@"
|
||||
|
Loading…
x
Reference in New Issue
Block a user