From 15e8c181063ecd32b2dc2eec9185d976005e0c43 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 19 Mar 2014 22:01:32 -0700 Subject: [PATCH] Bug fixes. --- node/IpcListener.cpp | 40 +++++++++++++++++++++------------------- node/SocketManager.cpp | 14 +++++++++++++- node/SocketManager.hpp | 23 ++++++++++++++++++++++- 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/node/IpcListener.cpp b/node/IpcListener.cpp index 94cf065f9..306231a1c 100644 --- a/node/IpcListener.cpp +++ b/node/IpcListener.cpp @@ -38,6 +38,8 @@ #else #include #include +#include +#include #include #endif @@ -56,29 +58,29 @@ IpcListener::IpcListener(const char *ep,void (*commandHandler)(void *,IpcConnect strncpy(unaddr.sun_path,_endpoint.c_str(),sizeof(unaddr.sun_path)); unaddr.sun_path[sizeof(unaddr.sun_path) - 1] = (char)0; - for(int tries=0;tries<3;++tries) { - _sock = socket(AF_UNIX,SOCK_STREAM,0); - if (_sock <= 0) + struct stat stattmp; + if (stat(_endpoint.c_str(),&stattmp)) { + int testSock = socket(AF_UNIX,SOCK_STREAM,0); + if (testSock <= 0) throw std::runtime_error("unable to create socket of type AF_UNIX"); - if (bind(_sock,(struct sockaddr *)&unaddr,sizeof(unaddr))) { - ::close(_sock); - if (errno == EADDRINUSE) { - int testSock = socket(AF_UNIX,SOCK_STREAM,0); - if (testSock <= 0) - throw std::runtime_error("unable to create socket of type AF_UNIX"); - if (connect(testSock,(struct sockaddr *)&unaddr,sizeof(unaddr))) { - // error indicates nothing is listening on other end, so unlink and try again - ::close(testSock); - unlink(_endpoint.c_str()); - } else { - // success means endpoint is being actively listened to by a process - ::close(testSock); - throw std::runtime_error("IPC endpoint address in use"); - } - } else throw std::runtime_error("IPC endpoint could not be bound"); + if (connect(testSock,(struct sockaddr *)&unaddr,sizeof(unaddr))) { + // error means nothing is listening, orphaned name + ::close(testSock); + } else { + // success means endpoint is being actively listened to by a process + ::close(testSock); + throw std::runtime_error("IPC endpoint address in use"); } } + ::unlink(_endpoint.c_str()); + _sock = socket(AF_UNIX,SOCK_STREAM,0); + if (_sock <= 0) + throw std::runtime_error("unable to create socket of type AF_UNIX"); + if (bind(_sock,(struct sockaddr *)&unaddr,sizeof(unaddr))) { + ::close(_sock); + throw std::runtime_error("IPC endpoint could not be bound"); + } if (listen(_sock,8)) { ::close(_sock); throw std::runtime_error("listen() failed for bound AF_UNIX socket"); diff --git a/node/SocketManager.cpp b/node/SocketManager.cpp index 2c583f09d..3c94f22c3 100644 --- a/node/SocketManager.cpp +++ b/node/SocketManager.cpp @@ -316,6 +316,8 @@ SocketManager::SocketManager( _udpV4Socket = SharedPtr(new UdpSocket(Socket::ZT_SOCKET_TYPE_UDP_V4,s)); } } + + _updateNfds(); } SocketManager::~SocketManager() @@ -370,7 +372,7 @@ void SocketManager::poll(unsigned long timeout) tv.tv_sec = (long)(timeout / 1000); tv.tv_usec = (long)((timeout % 1000) * 1000); - select(_nfds,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0); + select(_nfds + 1,&rfds,&wfds,&efds,(timeout > 0) ? &tv : (struct timeval *)0); if (FD_ISSET(_whackReceivePipe,&rfds)) { char tmp[32]; @@ -396,6 +398,8 @@ void SocketManager::poll(unsigned long timeout) _fdSetLock.lock(); FD_SET(sockfd,&_readfds); _fdSetLock.unlock(); + if (sockfd > _nfds) + _nfds = sockfd; } } if ((_tcpV6ListenSocket != INVALID_SOCKET)&&(FD_ISSET(_tcpV6ListenSocket,&rfds))) { @@ -413,6 +417,8 @@ void SocketManager::poll(unsigned long timeout) _fdSetLock.lock(); FD_SET(sockfd,&_readfds); _fdSetLock.unlock(); + if (sockfd > _nfds) + _nfds = sockfd; } } @@ -421,6 +427,7 @@ void SocketManager::poll(unsigned long timeout) if ((_udpV6Socket)&&(FD_ISSET(_udpV6Socket->_sock,&rfds))) _udpV6Socket->notifyAvailableForRead(_udpV6Socket,this); + bool closedSockets = false; { // grab copy of TCP sockets list because _tcpSockets[] might be changed in a handler Mutex::Lock _l2(_tcpSockets_m); if (_tcpSockets.size()) { @@ -436,6 +443,7 @@ void SocketManager::poll(unsigned long timeout) FD_CLR(s->second->_sock,&_writefds); _fdSetLock.unlock(); _tcpSockets.erase(s++); + closedSockets = true; } } } @@ -451,6 +459,7 @@ void SocketManager::poll(unsigned long timeout) FD_CLR((*s)->_sock,&_readfds); FD_CLR((*s)->_sock,&_writefds); _fdSetLock.unlock(); + closedSockets = true; continue; } } @@ -464,10 +473,13 @@ void SocketManager::poll(unsigned long timeout) FD_CLR((*s)->_sock,&_readfds); FD_CLR((*s)->_sock,&_writefds); _fdSetLock.unlock(); + closedSockets = true; continue; } } } + if (closedSockets) + _updateNfds(); } void SocketManager::whack() diff --git a/node/SocketManager.hpp b/node/SocketManager.hpp index 9f980e379..e6e87a1fe 100644 --- a/node/SocketManager.hpp +++ b/node/SocketManager.hpp @@ -169,6 +169,27 @@ private: #endif } + inline void _updateNfds() + { + int nfds = _whackSendPipe; + if (_whackReceivePipe > nfds) + nfds = _whackReceivePipe; + if (_tcpV4ListenSocket > nfds) + nfds = _tcpV4ListenSocket; + if (_tcpV6ListenSocket > nfds) + nfds = _tcpV6ListenSocket; + if ((_udpV4Socket)&&(_udpV4Socket->_sock > nfds)) + nfds = _udpV4Socket->_sock; + if ((_udpV6Socket)&&(_udpV6Socket->_sock > nfds)) + nfds = _udpV6Socket->_sock; + Mutex::Lock _l(_tcpSockets_m); + for(std::map< InetAddress,SharedPtr >::const_iterator s(_tcpSockets.begin());s!=_tcpSockets.end();++s) { + if (s->second->_sock > nfds) + nfds = s->second->_sock; + } + _nfds = nfds; + } + #ifdef __WINDOWS__ SOCKET _whackSendPipe; SOCKET _whackReceivePipe; @@ -187,7 +208,7 @@ private: fd_set _readfds; fd_set _writefds; - int _nfds; + volatile int _nfds; Mutex _fdSetLock; std::map< InetAddress,SharedPtr > _tcpSockets;