Added some safety + unregister callbacks on close

This commit is contained in:
Joseph Henry 2016-01-18 16:04:29 -08:00
parent f2cc144811
commit 31ed86740c
2 changed files with 81 additions and 75 deletions

View File

@ -292,6 +292,9 @@ void NetconEthernetTap::threadMain()
{
uint64_t prev_tcp_time = 0, prev_status_time = 0, prev_etharp_time = 0;
Mutex::Lock _l(_tcpconns_m);
_tcpconns_m.unlock();
// Main timer loop
while (_run) {
uint64_t now = OSUtils::now();
@ -307,13 +310,16 @@ void NetconEthernetTap::threadMain()
prev_status_time = now;
status_remaining = STATUS_TMR_INTERVAL - since_status;
_tcpconns_m.lock();
for(size_t i=0;i<_TcpConnections.size();++i) {
if(!_TcpConnections[i]->sock)
continue; // Skip, this is a pending connection
int fd = _phy.getDescriptor(_TcpConnections[i]->sock);
dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size());
dwr(MSG_DEBUG," tap_thread(): sock=%x, pcb->state=%d\n", _TcpConnections[i]->sock, _TcpConnections[i]->pcb->state);
fcntl(fd, F_SETFL, O_NONBLOCK);
unsigned char tmpbuf[BUF_SZ];
@ -329,12 +335,14 @@ void NetconEthernetTap::threadMain()
phyOnUnixData(_TcpConnections[i]->sock,_phy.getuptr(_TcpConnections[i]->sock),&tmpbuf,BUF_SZ);
}
}
_tcpconns_m.unlock();
}
// Main TCP/ETHARP timer section
if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) {
prev_tcp_time = now;
lwipstack->tcp_tmr();
// Makeshift poll
_tcpconns_m.lock();
for(size_t i=0;i<_TcpConnections.size();++i) {
if(_TcpConnections[i]->txsz > 0){
lwipstack->_lock.lock();
@ -342,6 +350,7 @@ void NetconEthernetTap::threadMain()
lwipstack->_lock.unlock();
}
}
_tcpconns_m.unlock();
} else {
tcp_remaining = ZT_LWIP_TCP_TIMER_INTERVAL - since_tcp;
}
@ -367,7 +376,6 @@ void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {}
TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock)
{
Mutex::Lock _l(_tcpconns_m);
for(size_t i=0;i<_TcpConnections.size();++i) {
if(_TcpConnections[i]->sock == sock)
return _TcpConnections[i];
@ -375,27 +383,10 @@ TcpConnection *NetconEthernetTap::getConnection(PhySocket *sock)
return NULL;
}
TcpConnection *NetconEthernetTap::addConnection(TcpConnection *conn)
{
Mutex::Lock _l(_tcpconns_m);
_TcpConnections.push_back(conn);
return conn;
}
void NetconEthernetTap::removeConnection(TcpConnection *conn)
{
Mutex::Lock _l(_tcpconns_m);
for(size_t i=0;i<_TcpConnections.size();++i) {
if(_TcpConnections[i] == conn){
_TcpConnections.erase(_TcpConnections.begin() + i);
delete conn;
return;
}
}
}
void NetconEthernetTap::closeConnection(PhySocket *sock)
{
// Here we assume _tcpconns_m is already locked by caller
dwr(MSG_DEBUG,"closeConnection(%x)\n",sock);
if(!sock) {
dwr(MSG_DEBUG," closeConnection(): invalid PhySocket\n");
@ -405,16 +396,30 @@ void NetconEthernetTap::closeConnection(PhySocket *sock)
if(!conn)
return;
if(conn->pcb) {
if(conn->pcb->state == SYN_SENT || conn->pcb->state == CLOSE_WAIT) {
dwr(MSG_DEBUG," closeConnection(): PCB->state = %d\n", conn->pcb->state);
if(conn->pcb->state == SYN_SENT) {
dwr(MSG_DEBUG," closeConnection(): invalid PCB state for this operation. ignoring.\n");
return;
}
dwr(MSG_DEBUG," closeConnection(): PCB->state = %d\n", conn->pcb->state);
if(lwipstack->_tcp_close(conn->pcb) != ERR_OK) {
if(lwipstack->_tcp_close(conn->pcb) == ERR_OK) {
// Unregister callbacks for this PCB
lwipstack->_tcp_arg(conn->pcb, NULL);
lwipstack->_tcp_recv(conn->pcb, NULL);
lwipstack->_tcp_err(conn->pcb, NULL);
lwipstack->_tcp_sent(conn->pcb, NULL);
lwipstack->_tcp_poll(conn->pcb, NULL, 1);
}
else {
dwr(MSG_ERROR," closeConnection(): error while calling tcp_close()\n");
}
}
removeConnection(conn);
for(size_t i=0;i<_TcpConnections.size();++i) {
if(_TcpConnections[i] == conn){
_TcpConnections.erase(_TcpConnections.begin() + i);
delete conn;
break;
}
}
if(!sock)
return;
close(_phy.getDescriptor(sock));
@ -422,13 +427,15 @@ void NetconEthernetTap::closeConnection(PhySocket *sock)
}
void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) {
Mutex::Lock _l(_tcpconns_m);
closeConnection(sock);
}
void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr)
{
Mutex::Lock _l(_tcpconns_m);
Mutex::Lock _l2(_rx_buf_m);
TcpConnection *conn = getConnection(sock);
Mutex::Lock _l(_rx_buf_m);
int len = conn->rxsz;
int n = _phy.streamSend(conn->sock, conn->rxbuf, len);
if(n > 0) {
@ -596,6 +603,7 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
default:
break;
}
Mutex::Lock _l(_tcpconns_m);
closeConnection(sockdata.first); // close RPC after sending retval, no longer needed
jobmap.erase(CANARY_num);
return;
@ -636,6 +644,8 @@ void NetconEthernetTap::unloadRPC(void *data, pid_t &pid, pid_t &tid,
err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err)
{
Larg *l = (Larg*)arg;
Mutex::Lock _l(l->tap->_tcpconns_m);
TcpConnection *conn = l->conn;
NetconEthernetTap *tap = l->tap;
@ -655,15 +665,12 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newPCB, err_t err)
}
// create and populate new TcpConnection
TcpConnection *newTcpConn = new TcpConnection();
tap->addConnection(newTcpConn);
l->tap->_TcpConnections.push_back(newTcpConn);
newTcpConn->pcb = newPCB;
newTcpConn->sock = tap->_phy.wrapSocket(fds[0], newTcpConn);
if(sock_fd_write(fd, fds[1]) < 0)
return -1;
else {
//close(fds[1]); // close other end of socketpair
}
tap->lwipstack->_tcp_arg(newPCB, new Larg(tap, newTcpConn));
tap->lwipstack->_tcp_recv(newPCB, nc_recved);
tap->lwipstack->_tcp_err(newPCB, nc_err);
@ -686,21 +693,20 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *
int tot = 0;
struct pbuf* q = p;
Mutex::Lock _l(l->tap->_tcpconns_m);
if(!l->conn) {
dwr(MSG_ERROR," nc_recved(): no connection\n");
return ERR_OK;
}
if(p == NULL) {
if(l->conn && !l->conn->listening) {
dwr(MSG_INFO," nc_recved(): closing connection\n");
if(l->tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK)
dwr(MSG_ERROR," nc_recved(): error while calling tcp_close()\n");
if(l->conn->pcb->state == CLOSE_WAIT){
l->tap->closeConnection(l->conn->sock);
return ERR_ABRT;
}
return err;
}
Mutex::Lock _l(l->tap->_rx_buf_m);
Mutex::Lock _l2(l->tap->_rx_buf_m);
// Cycle through pbufs and write them to the RX buffer
// The RX buffer will be emptied via phyOnUnixWritable()
while(p != NULL) {
@ -721,10 +727,39 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *
return ERR_OK;
}
err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len)
{
Larg *l = (Larg*)arg;
Mutex::Lock _l(l->tap->_tcpconns_m);
if(l && l->conn && len) {
float max = (float)DEFAULT_BUF_SZ;
if(l->conn->txsz < max / 2) {
l->tap->_phy.setNotifyReadable(l->conn->sock, true);
l->tap->_phy.whack();
}
}
return ERR_OK;
}
err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *PCB, err_t err)
{
Larg *l = (Larg*)arg;
if(l && l->conn)
l->tap->sendReturnValue(l->tap->_phy.getDescriptor(l->conn->rpcSock), ERR_OK);
return ERR_OK;
}
err_t NetconEthernetTap::nc_poll(void* arg, struct tcp_pcb *PCB)
{
return ERR_OK;
}
void NetconEthernetTap::nc_err(void *arg, err_t err)
{
dwr(MSG_DEBUG,"nc_err() = %d\n", err);
Larg *l = (Larg*)arg;
Mutex::Lock _l(l->tap->_tcpconns_m);
if(!l->conn)
dwr(MSG_ERROR,"nc_err(): connection is NULL!\n");
int fd = l->tap->_phy.getDescriptor(l->conn->sock);
@ -796,37 +831,13 @@ void NetconEthernetTap::nc_err(void *arg, err_t err)
l->tap->closeConnection(l->conn);
}
err_t NetconEthernetTap::nc_poll(void* arg, struct tcp_pcb *PCB)
{
return ERR_OK;
}
err_t NetconEthernetTap::nc_sent(void* arg, struct tcp_pcb *PCB, u16_t len)
{
Larg *l = (Larg*)arg;
if(l->conn && len) {
float max = (float)DEFAULT_BUF_SZ;
if(l->conn->txsz < max / 2) {
l->tap->_phy.setNotifyReadable(l->conn->sock, true);
l->tap->_phy.whack();
}
}
return ERR_OK;
}
err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *PCB, err_t err)
{
Larg *l = (Larg*)arg;
l->tap->sendReturnValue(l->tap->_phy.getDescriptor(l->conn->rpcSock), ERR_OK);
return ERR_OK;
}
/*------------------------------------------------------------------------------
----------------------------- RPC Handler functions ----------------------------
------------------------------------------------------------------------------*/
void NetconEthernetTap::handleGetsockname(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct getsockname_st *getsockname_rpc)
{
Mutex::Lock _l(_tcpconns_m);
TcpConnection *conn = getConnection(sock);
char retmsg[sizeof(struct sockaddr_storage)];
memset(&retmsg, 0, sizeof(retmsg));
@ -837,6 +848,7 @@ void NetconEthernetTap::handleGetsockname(PhySocket *sock, PhySocket *rpcSock, v
void NetconEthernetTap::handleBind(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct bind_st *bind_rpc)
{
Mutex::Lock _l(_tcpconns_m);
struct sockaddr_in *rawAddr = (struct sockaddr_in *) &bind_rpc->addr;
int port = lwipstack->ntohs(rawAddr->sin_port);
ip_addr_t connAddr;
@ -878,6 +890,7 @@ void NetconEthernetTap::handleBind(PhySocket *sock, PhySocket *rpcSock, void **u
void NetconEthernetTap::handleListen(PhySocket *sock, PhySocket *rpcSock, void **uptr, struct listen_st *listen_rpc)
{
Mutex::Lock _l(_tcpconns_m);
TcpConnection *conn = getConnection(sock);
if(!conn){
dwr(MSG_ERROR," handleListen(): unable to locate TcpConnection.\n");
@ -913,13 +926,15 @@ void NetconEthernetTap::handleListen(PhySocket *sock, PhySocket *rpcSock, void *
TcpConnection * NetconEthernetTap::handleSocket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc)
{
Mutex::Lock _l(_tcpconns_m);
struct tcp_pcb *newPCB = lwipstack->tcp_new();
if(newPCB != NULL) {
TcpConnection *newConn = new TcpConnection();
*uptr = newConn;
newConn->sock = sock;
newConn->pcb = newPCB;
return addConnection(newConn);;
_TcpConnections.push_back(newConn);
return newConn;
}
dwr(MSG_ERROR," handleSocket(): Memory not available for new PCB\n");
sendReturnValue(_phy.getDescriptor(sock), -1, ENOMEM);
@ -928,9 +943,10 @@ TcpConnection * NetconEthernetTap::handleSocket(PhySocket *sock, void **uptr, st
void NetconEthernetTap::handleConnect(PhySocket *sock, PhySocket *rpcSock, TcpConnection *conn, struct connect_st* connect_rpc)
{
Mutex::Lock _l(_tcpconns_m);
struct sockaddr_in *rawAddr = (struct sockaddr_in *) &connect_rpc->__addr;
int port = lwipstack->ntohs(rawAddr->sin_port);
ip_addr_t connAddr = convert_ip((struct sockaddr_in *)&connect_rpc->__addr);
ip_addr_t connAddr = convert_ip(rawAddr);
if(conn != NULL) {
lwipstack->tcp_sent(conn->pcb, nc_sent);

View File

@ -56,9 +56,9 @@ struct connect_st;
struct getsockname_st;
struct accept_st;
#define APPLICATION_POLL_FREQ 50
#define APPLICATION_POLL_FREQ 2
#define ZT_LWIP_TCP_TIMER_INTERVAL 5
#define STATUS_TMR_INTERVAL 1000 // How often we check connection statuses (in ms)
#define STATUS_TMR_INTERVAL 500 // How often we check connection statuses (in ms)
#define DEFAULT_BUF_SZ 1024 * 1024 * 2
namespace ZeroTier {
@ -71,7 +71,7 @@ class LWIPStack;
*/
struct TcpConnection
{
bool listening;
bool listening, closing;
int pid, txsz, rxsz;
PhySocket *rpcSock, *sock;
struct tcp_pcb *pcb;
@ -410,16 +410,6 @@ private:
* Returns a pointer to a TcpConnection associated with a given PhySocket
*/
TcpConnection *getConnection(PhySocket *sock);
/*
* Safely adds a new TcpConnection to _TcpConnections
*/
TcpConnection *addConnection(TcpConnection *conn);
/*
* Safely removes a TcpConnection from _TcpConnections
*/
void removeConnection(TcpConnection *conn);
/*
* Closes a TcpConnection, associated LWIP PCB strcuture,