diff --git a/main.cpp b/main.cpp index 075840208..7d38c4634 100644 --- a/main.cpp +++ b/main.cpp @@ -37,7 +37,7 @@ using namespace ZeroTier; int main(int argc,char **argv) { One *one = One::newInstance("/tmp/foo",12345); - one->waitForTermination(); + one->run(); printf("termination reason: %d, message: %s\n",(int)one->reasonForTermination(),one->fatalErrorMessage().c_str()); return 0; } diff --git a/node/Peer.cpp b/node/Peer.cpp index 1926f2e5b..541fe5dd4 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -111,6 +111,7 @@ void Peer::received( * paths without confirming that a bidirectional link is in * fact present, but any packet that decodes and authenticates * correctly is considered valid. */ + TRACE("got non-confirmation packet from unknown path %s(%s), pinging...",_id.address().toString().c_str(),remoteAddr.toString().c_str()); attemptToContactAt(RR,remoteAddr,linkDesperation,now); } } @@ -193,12 +194,14 @@ void Peer::doPingAndKeepalive(const RuntimeEnvironment *RR,uint64_t now) if ((bestPath)&&(bestPath->active(now))) { const unsigned int desp = std::max(RR->node->coreDesperation(),bestPath->lastReceiveDesperation()); if ((now - bestPath->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) { + TRACE("PING %s(%s) desperation == %u",_id.address().toString().c_str(),bestPath->address().toString().c_str(),desp); attemptToContactAt(RR,bestPath->address(),desp,now); bestPath->sent(now); } else if ((now - bestPath->lastSend()) >= ZT_NAT_KEEPALIVE_DELAY) { // We only do keepalive if desperation is zero right now, since higher // desperation paths involve things like tunneling that do not need it. if (desp == 0) { + TRACE("NAT keepalive %s(%s)",_id.address().toString().c_str(),bestPath->address().toString().c_str()); RR->node->putPacket(bestPath->address(),"",0,0); bestPath->sent(now); } diff --git a/osdep/OSXEthernetTap.cpp b/osdep/OSXEthernetTap.cpp index eddf787af..b333156eb 100644 --- a/osdep/OSXEthernetTap.cpp +++ b/osdep/OSXEthernetTap.cpp @@ -320,7 +320,6 @@ OSXEthernetTap::OSXEthernetTap( const char *friendlyName, void (*handler)(void *,const MAC &,const MAC &,unsigned int,const Buffer<4096> &), void *arg) : - EthernetTap("OSXEthernetTap",mac,mtu,metric), _handler(handler), _arg(arg), _mtu(mtu), diff --git a/osdep/OSXEthernetTap.hpp b/osdep/OSXEthernetTap.hpp index 1ce424e9a..24d819324 100644 --- a/osdep/OSXEthernetTap.hpp +++ b/osdep/OSXEthernetTap.hpp @@ -33,6 +33,12 @@ #include +#include "../node/Constants.hpp" +#include "../node/MAC.hpp" +#include "../node/Buffer.hpp" +#include "../node/InetAddress.hpp" +#include "../node/MulticastGroup.hpp" + #include "Thread.hpp" namespace ZeroTier { diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index 1b55e0a6c..96a946d15 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -724,13 +724,16 @@ public: case ZT_PHY_SOCKET_UDP: if (FD_ISSET(s->sock,&rfds)) { - memset(&ss,0,sizeof(ss)); - socklen_t slen = sizeof(ss); - long n = (long)::recvfrom(s->sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen); - if (n > 0) { - try { - _datagramHandler((PhySocket *)&(*s),&(s->uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n); - } catch ( ... ) {} + for(;;) { + memset(&ss,0,sizeof(ss)); + socklen_t slen = sizeof(ss); + long n = (long)::recvfrom(s->sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen); + if (n > 0) { + try { + _datagramHandler((PhySocket *)&(*s),&(s->uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n); + } catch ( ... ) {} + } else if (n < 0) + break; } } break; diff --git a/service/One.cpp b/service/One.cpp index 2ebfd285e..20be70061 100644 --- a/service/One.cpp +++ b/service/One.cpp @@ -39,7 +39,6 @@ #include "../node/InetAddress.hpp" #include "../osdep/Phy.hpp" -#include "../osdep/Thread.hpp" #include "../osdep/OSUtils.hpp" #include "One.hpp" @@ -109,22 +108,73 @@ public: in6.sin6_port = in4.sin_port; _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&in6,this,131072); _v6TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in6,this); - - _thread = Thread::start(this); } virtual ~OneImpl() { - if (reasonForTermination() == ONE_STILL_RUNNING) { - terminate(); - waitForTermination(); - } _phy.close(_v4UdpSocket); _phy.close(_v6UdpSocket); _phy.close(_v4TcpListenSocket); _phy.close(_v6TcpListenSocket); } + virtual ReasonForTermination run() + { + try { + _node = new Node( + OSUtils::now(), + this, + SnodeDataStoreGetFunction, + SnodeDataStorePutFunction, + SnodeWirePacketSendFunction, + SnodeVirtualNetworkFrameFunction, + SnodeVirtualNetworkConfigFunction, + SnodeEventCallback, + ((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0)); + + if (_master) + _node->setNetconfMaster((void *)_master); + + _nextBackgroundTaskDeadline = 0; + for(;;) { + _run_m.lock(); + if (!_run) { + _run_m.unlock(); + _termReason_m.lock(); + _termReason = ONE_NORMAL_TERMINATION; + _termReason_m.unlock(); + break; + } else _run_m.unlock(); + + uint64_t dl = _nextBackgroundTaskDeadline; + uint64_t now = OSUtils::now(); + + if (dl <= now) { + _node->processBackgroundTasks(now,const_cast(&_nextBackgroundTaskDeadline)); + dl = _nextBackgroundTaskDeadline; + now = OSUtils::now(); + } + + const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100; + printf("polling: %lums timeout\n",delay); + _phy.poll(delay); + } + } catch (std::exception &exc) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = exc.what(); + } catch ( ... ) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "unexpected exception in main thread"; + } + + delete _node; + _node = (Node *)0; + + return _termReason; + } + virtual ReasonForTermination reasonForTermination() const { Mutex::Lock _l(_termReason_m); @@ -137,12 +187,6 @@ public: return _fatalErrorMessage; } - virtual void waitForTermination() - { - if (reasonForTermination() == ONE_STILL_RUNNING) - Thread::join(_thread); - } - virtual void terminate() { _run_m.lock(); @@ -155,21 +199,23 @@ public: inline void phyOnDatagramFunction(PhySocket *sock,const struct sockaddr *from,void *data,unsigned long len) { - ZT1_ResultCode rc = _node->processWirePacket( - OSUtils::now(), - (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big - 0, - data, - len, - const_cast(&_nextBackgroundTaskDeadline)); - if (ZT1_ResultCode_isFatal(rc)) { - char tmp[256]; - Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket(%d)",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - } + try { + ZT1_ResultCode rc = _node->processWirePacket( + OSUtils::now(), + (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big + 0, + data, + len, + const_cast(&_nextBackgroundTaskDeadline)); + if (ZT1_ResultCode_isFatal(rc)) { + char tmp[256]; + Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket(%d)",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + } + } catch ( ... ) {} } inline void phyOnTcpConnectFunction(PhySocket *sock,bool success) @@ -309,55 +355,6 @@ public: fflush(stderr); } - void threadMain() - throw() - { - _nextBackgroundTaskDeadline = 0; - try { - _node = new Node( - OSUtils::now(), - this, - SnodeDataStoreGetFunction, - SnodeDataStorePutFunction, - SnodeWirePacketSendFunction, - SnodeVirtualNetworkFrameFunction, - SnodeVirtualNetworkConfigFunction, - SnodeEventCallback, - ((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0)); - - if (_master) - _node->setNetconfMaster((void *)_master); - - for(;;) { - _run_m.lock(); - if (!_run) { - _run_m.unlock(); - break; - } else _run_m.unlock(); - - uint64_t dl = _nextBackgroundTaskDeadline; - uint64_t now = OSUtils::now(); - - if (dl <= now) { - _node->processBackgroundTasks(now,const_cast(&_nextBackgroundTaskDeadline)); - dl = _nextBackgroundTaskDeadline; - now = OSUtils::now(); - } - - _phy.poll((dl > now) ? (unsigned long)(dl - now) : 100); - } - } catch (std::exception &exc) { - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = exc.what(); - } catch ( ... ) { - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = "unexpected exception in main thread"; - } - delete _node; - } - private: const std::string _homePath; SimpleFunctionPhy _phy; @@ -368,7 +365,6 @@ private: PhySocket *_v6UdpSocket; PhySocket *_v4TcpListenSocket; PhySocket *_v6TcpListenSocket; - Thread _thread; volatile uint64_t _nextBackgroundTaskDeadline; ReasonForTermination _termReason; diff --git a/service/One.hpp b/service/One.hpp index 260716560..1a33dc72e 100644 --- a/service/One.hpp +++ b/service/One.hpp @@ -75,7 +75,10 @@ public: static std::string platformDefaultHomePath(); /** - * Create and start a new instance of the service + * Create a new instance of the service + * + * Once created, you must call the run() method to actually start + * processing. * * @param hp Home path * @param port TCP and UDP port for packets and HTTP control @@ -88,11 +91,19 @@ public: NetworkConfigMaster *master = (NetworkConfigMaster *)0, const char *overrideRootTopology = (const char *)0); - /** - * Deletion will block until service stops if it's still running - */ virtual ~One(); + /** + * Execute the service main I/O loop until terminated + * + * The terminate() method may be called from a signal handler or another + * thread to terminate execution. Otherwise this will not return unless + * another condition terminates execution such as a fatal error. + * + * @param + */ + virtual ReasonForTermination run() = 0; + /** * @return Reason for terminating or ONE_STILL_RUNNING if running */ @@ -104,14 +115,7 @@ public: virtual std::string fatalErrorMessage() const = 0; /** - * Block until service terminates - */ - virtual void waitForTermination() = 0; - - /** - * Terminate background service - * - * Actual shutdown might take a few seconds. + * Terminate background service (can be called from other threads) */ virtual void terminate() = 0;