Add some TRACE around pinging (for now), and refactor service/One to just run in the foreground as some platforms may not require threads at all.

This commit is contained in:
Adam Ierymenko 2015-04-10 11:40:45 -07:00
parent 08a11a6f32
commit 9e651b39e4
7 changed files with 110 additions and 99 deletions

View File

@ -37,7 +37,7 @@ using namespace ZeroTier;
int main(int argc,char **argv) int main(int argc,char **argv)
{ {
One *one = One::newInstance("/tmp/foo",12345); One *one = One::newInstance("/tmp/foo",12345);
one->waitForTermination(); one->run();
printf("termination reason: %d, message: %s\n",(int)one->reasonForTermination(),one->fatalErrorMessage().c_str()); printf("termination reason: %d, message: %s\n",(int)one->reasonForTermination(),one->fatalErrorMessage().c_str());
return 0; return 0;
} }

View File

@ -111,6 +111,7 @@ void Peer::received(
* paths without confirming that a bidirectional link is in * paths without confirming that a bidirectional link is in
* fact present, but any packet that decodes and authenticates * fact present, but any packet that decodes and authenticates
* correctly is considered valid. */ * correctly is considered valid. */
TRACE("got non-confirmation packet from unknown path %s(%s), pinging...",_id.address().toString().c_str(),remoteAddr.toString().c_str());
attemptToContactAt(RR,remoteAddr,linkDesperation,now); attemptToContactAt(RR,remoteAddr,linkDesperation,now);
} }
} }
@ -193,12 +194,14 @@ void Peer::doPingAndKeepalive(const RuntimeEnvironment *RR,uint64_t now)
if ((bestPath)&&(bestPath->active(now))) { if ((bestPath)&&(bestPath->active(now))) {
const unsigned int desp = std::max(RR->node->coreDesperation(),bestPath->lastReceiveDesperation()); const unsigned int desp = std::max(RR->node->coreDesperation(),bestPath->lastReceiveDesperation());
if ((now - bestPath->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) { if ((now - bestPath->lastReceived()) >= ZT_PEER_DIRECT_PING_DELAY) {
TRACE("PING %s(%s) desperation == %u",_id.address().toString().c_str(),bestPath->address().toString().c_str(),desp);
attemptToContactAt(RR,bestPath->address(),desp,now); attemptToContactAt(RR,bestPath->address(),desp,now);
bestPath->sent(now); bestPath->sent(now);
} else if ((now - bestPath->lastSend()) >= ZT_NAT_KEEPALIVE_DELAY) { } else if ((now - bestPath->lastSend()) >= ZT_NAT_KEEPALIVE_DELAY) {
// We only do keepalive if desperation is zero right now, since higher // We only do keepalive if desperation is zero right now, since higher
// desperation paths involve things like tunneling that do not need it. // desperation paths involve things like tunneling that do not need it.
if (desp == 0) { if (desp == 0) {
TRACE("NAT keepalive %s(%s)",_id.address().toString().c_str(),bestPath->address().toString().c_str());
RR->node->putPacket(bestPath->address(),"",0,0); RR->node->putPacket(bestPath->address(),"",0,0);
bestPath->sent(now); bestPath->sent(now);
} }

View File

@ -320,7 +320,6 @@ OSXEthernetTap::OSXEthernetTap(
const char *friendlyName, const char *friendlyName,
void (*handler)(void *,const MAC &,const MAC &,unsigned int,const Buffer<4096> &), void (*handler)(void *,const MAC &,const MAC &,unsigned int,const Buffer<4096> &),
void *arg) : void *arg) :
EthernetTap("OSXEthernetTap",mac,mtu,metric),
_handler(handler), _handler(handler),
_arg(arg), _arg(arg),
_mtu(mtu), _mtu(mtu),

View File

@ -33,6 +33,12 @@
#include <stdexcept> #include <stdexcept>
#include "../node/Constants.hpp"
#include "../node/MAC.hpp"
#include "../node/Buffer.hpp"
#include "../node/InetAddress.hpp"
#include "../node/MulticastGroup.hpp"
#include "Thread.hpp" #include "Thread.hpp"
namespace ZeroTier { namespace ZeroTier {

View File

@ -724,13 +724,16 @@ public:
case ZT_PHY_SOCKET_UDP: case ZT_PHY_SOCKET_UDP:
if (FD_ISSET(s->sock,&rfds)) { if (FD_ISSET(s->sock,&rfds)) {
memset(&ss,0,sizeof(ss)); for(;;) {
socklen_t slen = sizeof(ss); memset(&ss,0,sizeof(ss));
long n = (long)::recvfrom(s->sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen); socklen_t slen = sizeof(ss);
if (n > 0) { long n = (long)::recvfrom(s->sock,buf,sizeof(buf),0,(struct sockaddr *)&ss,&slen);
try { if (n > 0) {
_datagramHandler((PhySocket *)&(*s),&(s->uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n); try {
} catch ( ... ) {} _datagramHandler((PhySocket *)&(*s),&(s->uptr),(const struct sockaddr *)&ss,(void *)buf,(unsigned long)n);
} catch ( ... ) {}
} else if (n < 0)
break;
} }
} }
break; break;

View File

@ -39,7 +39,6 @@
#include "../node/InetAddress.hpp" #include "../node/InetAddress.hpp"
#include "../osdep/Phy.hpp" #include "../osdep/Phy.hpp"
#include "../osdep/Thread.hpp"
#include "../osdep/OSUtils.hpp" #include "../osdep/OSUtils.hpp"
#include "One.hpp" #include "One.hpp"
@ -109,22 +108,73 @@ public:
in6.sin6_port = in4.sin_port; in6.sin6_port = in4.sin_port;
_v6UdpSocket = _phy.udpBind((const struct sockaddr *)&in6,this,131072); _v6UdpSocket = _phy.udpBind((const struct sockaddr *)&in6,this,131072);
_v6TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in6,this); _v6TcpListenSocket = _phy.tcpListen((const struct sockaddr *)&in6,this);
_thread = Thread::start(this);
} }
virtual ~OneImpl() virtual ~OneImpl()
{ {
if (reasonForTermination() == ONE_STILL_RUNNING) {
terminate();
waitForTermination();
}
_phy.close(_v4UdpSocket); _phy.close(_v4UdpSocket);
_phy.close(_v6UdpSocket); _phy.close(_v6UdpSocket);
_phy.close(_v4TcpListenSocket); _phy.close(_v4TcpListenSocket);
_phy.close(_v6TcpListenSocket); _phy.close(_v6TcpListenSocket);
} }
virtual ReasonForTermination run()
{
try {
_node = new Node(
OSUtils::now(),
this,
SnodeDataStoreGetFunction,
SnodeDataStorePutFunction,
SnodeWirePacketSendFunction,
SnodeVirtualNetworkFrameFunction,
SnodeVirtualNetworkConfigFunction,
SnodeEventCallback,
((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0));
if (_master)
_node->setNetconfMaster((void *)_master);
_nextBackgroundTaskDeadline = 0;
for(;;) {
_run_m.lock();
if (!_run) {
_run_m.unlock();
_termReason_m.lock();
_termReason = ONE_NORMAL_TERMINATION;
_termReason_m.unlock();
break;
} else _run_m.unlock();
uint64_t dl = _nextBackgroundTaskDeadline;
uint64_t now = OSUtils::now();
if (dl <= now) {
_node->processBackgroundTasks(now,const_cast<uint64_t *>(&_nextBackgroundTaskDeadline));
dl = _nextBackgroundTaskDeadline;
now = OSUtils::now();
}
const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100;
printf("polling: %lums timeout\n",delay);
_phy.poll(delay);
}
} catch (std::exception &exc) {
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = exc.what();
} catch ( ... ) {
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = "unexpected exception in main thread";
}
delete _node;
_node = (Node *)0;
return _termReason;
}
virtual ReasonForTermination reasonForTermination() const virtual ReasonForTermination reasonForTermination() const
{ {
Mutex::Lock _l(_termReason_m); Mutex::Lock _l(_termReason_m);
@ -137,12 +187,6 @@ public:
return _fatalErrorMessage; return _fatalErrorMessage;
} }
virtual void waitForTermination()
{
if (reasonForTermination() == ONE_STILL_RUNNING)
Thread::join(_thread);
}
virtual void terminate() virtual void terminate()
{ {
_run_m.lock(); _run_m.lock();
@ -155,21 +199,23 @@ public:
inline void phyOnDatagramFunction(PhySocket *sock,const struct sockaddr *from,void *data,unsigned long len) inline void phyOnDatagramFunction(PhySocket *sock,const struct sockaddr *from,void *data,unsigned long len)
{ {
ZT1_ResultCode rc = _node->processWirePacket( try {
OSUtils::now(), ZT1_ResultCode rc = _node->processWirePacket(
(const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big OSUtils::now(),
0, (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big
data, 0,
len, data,
const_cast<uint64_t *>(&_nextBackgroundTaskDeadline)); len,
if (ZT1_ResultCode_isFatal(rc)) { const_cast<uint64_t *>(&_nextBackgroundTaskDeadline));
char tmp[256]; if (ZT1_ResultCode_isFatal(rc)) {
Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket(%d)",(int)rc); char tmp[256];
Mutex::Lock _l(_termReason_m); Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket(%d)",(int)rc);
_termReason = ONE_UNRECOVERABLE_ERROR; Mutex::Lock _l(_termReason_m);
_fatalErrorMessage = tmp; _termReason = ONE_UNRECOVERABLE_ERROR;
this->terminate(); _fatalErrorMessage = tmp;
} this->terminate();
}
} catch ( ... ) {}
} }
inline void phyOnTcpConnectFunction(PhySocket *sock,bool success) inline void phyOnTcpConnectFunction(PhySocket *sock,bool success)
@ -309,55 +355,6 @@ public:
fflush(stderr); fflush(stderr);
} }
void threadMain()
throw()
{
_nextBackgroundTaskDeadline = 0;
try {
_node = new Node(
OSUtils::now(),
this,
SnodeDataStoreGetFunction,
SnodeDataStorePutFunction,
SnodeWirePacketSendFunction,
SnodeVirtualNetworkFrameFunction,
SnodeVirtualNetworkConfigFunction,
SnodeEventCallback,
((_overrideRootTopology.length() > 0) ? _overrideRootTopology.c_str() : (const char *)0));
if (_master)
_node->setNetconfMaster((void *)_master);
for(;;) {
_run_m.lock();
if (!_run) {
_run_m.unlock();
break;
} else _run_m.unlock();
uint64_t dl = _nextBackgroundTaskDeadline;
uint64_t now = OSUtils::now();
if (dl <= now) {
_node->processBackgroundTasks(now,const_cast<uint64_t *>(&_nextBackgroundTaskDeadline));
dl = _nextBackgroundTaskDeadline;
now = OSUtils::now();
}
_phy.poll((dl > now) ? (unsigned long)(dl - now) : 100);
}
} catch (std::exception &exc) {
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = exc.what();
} catch ( ... ) {
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = "unexpected exception in main thread";
}
delete _node;
}
private: private:
const std::string _homePath; const std::string _homePath;
SimpleFunctionPhy _phy; SimpleFunctionPhy _phy;
@ -368,7 +365,6 @@ private:
PhySocket *_v6UdpSocket; PhySocket *_v6UdpSocket;
PhySocket *_v4TcpListenSocket; PhySocket *_v4TcpListenSocket;
PhySocket *_v6TcpListenSocket; PhySocket *_v6TcpListenSocket;
Thread _thread;
volatile uint64_t _nextBackgroundTaskDeadline; volatile uint64_t _nextBackgroundTaskDeadline;
ReasonForTermination _termReason; ReasonForTermination _termReason;

View File

@ -75,7 +75,10 @@ public:
static std::string platformDefaultHomePath(); static std::string platformDefaultHomePath();
/** /**
* Create and start a new instance of the service * Create a new instance of the service
*
* Once created, you must call the run() method to actually start
* processing.
* *
* @param hp Home path * @param hp Home path
* @param port TCP and UDP port for packets and HTTP control * @param port TCP and UDP port for packets and HTTP control
@ -88,11 +91,19 @@ public:
NetworkConfigMaster *master = (NetworkConfigMaster *)0, NetworkConfigMaster *master = (NetworkConfigMaster *)0,
const char *overrideRootTopology = (const char *)0); const char *overrideRootTopology = (const char *)0);
/**
* Deletion will block until service stops if it's still running
*/
virtual ~One(); virtual ~One();
/**
* Execute the service main I/O loop until terminated
*
* The terminate() method may be called from a signal handler or another
* thread to terminate execution. Otherwise this will not return unless
* another condition terminates execution such as a fatal error.
*
* @param
*/
virtual ReasonForTermination run() = 0;
/** /**
* @return Reason for terminating or ONE_STILL_RUNNING if running * @return Reason for terminating or ONE_STILL_RUNNING if running
*/ */
@ -104,14 +115,7 @@ public:
virtual std::string fatalErrorMessage() const = 0; virtual std::string fatalErrorMessage() const = 0;
/** /**
* Block until service terminates * Terminate background service (can be called from other threads)
*/
virtual void waitForTermination() = 0;
/**
* Terminate background service
*
* Actual shutdown might take a few seconds.
*/ */
virtual void terminate() = 0; virtual void terminate() = 0;