From 50aedda54d79f059cb38447d4c38f6fcf15ed6a0 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 21 May 2015 18:33:52 -0700 Subject: [PATCH 1/7] Basic TCP fallback -- now to test. --- service/OneService.cpp | 128 ++++++++++++++++++++++++++++++++--------- 1 file changed, 100 insertions(+), 28 deletions(-) diff --git a/service/OneService.cpp b/service/OneService.cpp index 3f45cd1e6..c0b47af12 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -103,12 +103,16 @@ namespace ZeroTier { typedef BSDEthernetTap EthernetTap; } // Path under ZT1 home for controller database if controller is enabled #define ZT1_CONTROLLER_DB_PATH "controller.db" -// TCP fallback relay host +// TCP fallback relay host -- geo-distributed using Amazon Route53 geo-aware DNS #define ZT1_TCP_FALLBACK_RELAY "tcp-fallback.zerotier.com" +#define ZT1_TCP_FALLBACK_RELAY_PORT 443 // Frequency at which we re-resolve the TCP fallback relay #define ZT1_TCP_FALLBACK_RERESOLVE_DELAY 86400000 +// Attempt to engage TCP fallback after this many ms of no reply to packets sent to global-scope IPs +#define ZT1_TCP_FALLBACK_AFTER 60000 + namespace ZeroTier { namespace { @@ -380,7 +384,10 @@ public: _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), _node((Node *)0), _controlPlane((ControlPlane *)0), + _lastDirectReceiveFromGlobal(0), + _lastSendToGlobal(0), _nextBackgroundTaskDeadline(0), + _tcpFallbackTunnel((TcpConnection *)0), _termReason(ONE_STILL_RUNNING), _run(true) { @@ -508,6 +515,9 @@ public: _tcpFallbackResolver.resolveNow(); } + if ((_tcpFallbackTunnel)&&((now - _lastDirectReceiveFromGlobal) < (ZT1_TCP_FALLBACK_AFTER / 2))) + _phy.close(_tcpFallbackTunnel->sock); + if ((now - lastTapMulticastGroupCheck) >= ZT_TAP_CHECK_MULTICAST_INTERVAL) { lastTapMulticastGroupCheck = now; Mutex::Lock _l(_taps_m); @@ -535,8 +545,8 @@ public: } try { - while (!_tcpConections.empty()) - _phy.close(_tcpConections.begin()->first); + while (!_tcpConnections.empty()) + _phy.close(_tcpConnections.begin()->first); } catch ( ... ) {} { @@ -587,6 +597,8 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { + if (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) + _lastDirectReceiveFromGlobal = OSUtils::now(); ZT1_ResultCode rc = _node->processWirePacket( OSUtils::now(), (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big @@ -610,7 +622,7 @@ public: // Outgoing TCP connections are always TCP fallback tunnel connections. - TcpConnection *tc = &(_tcpConections[sock]); + TcpConnection *tc = &(_tcpConnections[sock]); tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; tc->shouldKeepAlive = true; // unused tc->parent = this; @@ -622,6 +634,8 @@ public: tc->writeBuf = ""; *uptr = (void *)tc; + _tcpFallbackTunnel = tc; + // Send "hello" message tc->writeBuf.push_back((char)0x17); tc->writeBuf.push_back((char)0x03); @@ -639,7 +653,7 @@ public: { // Incoming TCP connections are HTTP JSON API requests. - TcpConnection *tc = &(_tcpConections[sockN]); + TcpConnection *tc = &(_tcpConnections[sockN]); tc->type = TcpConnection::TCP_HTTP_INCOMING; tc->shouldKeepAlive = true; tc->parent = this; @@ -661,7 +675,12 @@ public: inline void phyOnTcpClose(PhySocket *sock,void **uptr) { - _tcpConections.erase(sock); + std::map< PhySocket *,TcpConnection >::iterator tc(_tcpConnections.find(sock)); + if (tc != _tcpConnections.end()) { + if (&(tc->second) == _tcpFallbackTunnel) + _tcpFallbackTunnel = (TcpConnection *)0; + _tcpConnections.erase(tc); + } } inline void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) @@ -726,21 +745,23 @@ public: return; } - ZT1_ResultCode rc = _node->processWirePacket( - OSUtils::now(), - (const struct sockaddr_storage *)&from, // Phy<> uses sockaddr_storage, so it'll always be that big - data, - plen, - &_nextBackgroundTaskDeadline); - if (ZT1_ResultCode_isFatal(rc)) { - char tmp[256]; - Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); - Mutex::Lock _l(_termReason_m); - _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = tmp; - this->terminate(); - _phy.close(sock); - return; + if (from) { + ZT1_ResultCode rc = _node->processWirePacket( + OSUtils::now(), + reinterpret_cast(&from), + data, + plen, + &_nextBackgroundTaskDeadline); + if (ZT1_ResultCode_isFatal(rc)) { + char tmp[256]; + Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + _phy.close(sock); + return; + } } } @@ -765,7 +786,7 @@ public: tc->writeBuf = ""; _phy.tcpSetNotifyWritable(sock,false); if (!tc->shouldKeepAlive) - _phy.close(sock); // will call close handler to delete from _tcpConections + _phy.close(sock); // will call close handler to delete from _tcpConnections } else tc->writeBuf = tc->writeBuf.substr(sent); } } else _phy.tcpSetNotifyWritable(sock,false); // sanity check... shouldn't happen @@ -915,17 +936,65 @@ public: inline int nodeWirePacketSendFunction(const struct sockaddr_storage *addr,const void *data,unsigned int len) { + int result = -1; switch(addr->ss_family) { case AF_INET: - if (_v4UdpSocket) - return (_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + //if (_v4UdpSocket) + //result = (_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + + if (reinterpret_cast(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) { + uint64_t now = OSUtils::now(); + + /* Engage TCP fallback if we've sent something more than PING_CHECK_INTERVAL ago, + * less than twice the fallback period ago, and haven't heard anything in the fallback + * timeout period. In that case our packets to the global IP scope are probably + * being blocked, so start trying to tunnel them out. + * + * Note that we *always* send them as plain UDP above. This way if we ever get unblocked + * or if our connectivity changes for the better, we will instantly "fail forward" back + * to plain vanilla UDP and abandon the tunnel. + * + * TCP fallback is currently only supported for tunneling to IPv4 addresses. */ + if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > ZT_PING_CHECK_INVERVAL)&&((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)) { + if (_tcpFallbackTunnel) { + if (!_tcpFallbackTunnel->writeBuf.length()) + _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true); + unsigned long mlen = len + 7; + _tcpFallbackTunnel->writeBuf.push_back((char)0x17); + _tcpFallbackTunnel->writeBuf.push_back((char)0x03); + _tcpFallbackTunnel->writeBuf.push_back((char)0x03); // fake TLS 1.2 header + _tcpFallbackTunnel->writeBuf.push_back((char)((mlen >> 8) & 0xff)); + _tcpFallbackTunnel->writeBuf.push_back((char)(mlen & 0xff)); + _tcpFallbackTunnel->writeBuf.push_back((char)4); // IPv4 + _tcpFallbackTunnel->writeBuf.append(reinterpret_cast(reinterpret_cast(&(reinterpret_cast(addr)->sin_addr.s_addr))),4); + _tcpFallbackTunnel->writeBuf.append(reinterpret_cast(reinterpret_cast(&(reinterpret_cast(addr)->sin_port))),2); + _tcpFallbackTunnel->writeBuf.append((const char *)data,len); + } else { + std::vector tunnelIps(_tcpFallbackResolver.get()); + if (tunnelIps.empty()) { + if (!_tcpFallbackResolver.running()) + _tcpFallbackResolver.resolveNow(); + } else { + bool connected = false; + InetAddress addr(tunnelIps[(unsigned long)now % tunnelIps.size()]); + addr.setPort(ZT1_TCP_FALLBACK_RELAY_PORT); + _phy.tcpConnect(reinterpret_cast(&addr),connected); + } + } + } + + _lastSendToGlobal = now; + } + break; case AF_INET6: if (_v6UdpSocket) - return (_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + result = (_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); break; + default: + return -1; } - return -1; + return result; } inline void nodeVirtualNetworkFrameFunction(uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) @@ -987,7 +1056,7 @@ public: inline void onHttpResponseFromClient(TcpConnection *tc) { if (!tc->shouldKeepAlive) - _phy.close(tc->sock); // will call close handler, which deletes from _tcpConections + _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } private: @@ -1021,13 +1090,16 @@ private: PhySocket *_v4TcpListenSocket; PhySocket *_v6TcpListenSocket; ControlPlane *_controlPlane; + uint64_t _lastDirectReceiveFromGlobal; + uint64_t _lastSendToGlobal; volatile uint64_t _nextBackgroundTaskDeadline; std::map< uint64_t,EthernetTap * > _taps; std::map< uint64_t,std::vector > _tapAssignedIps; // ZeroTier assigned IPs, not user or dhcp assigned Mutex _taps_m; - std::map< PhySocket *,TcpConnection > _tcpConections; // no mutex for this since it's done in the main loop thread only + std::map< PhySocket *,TcpConnection > _tcpConnections; // no mutex for this since it's done in the main loop thread only + TcpConnection *_tcpFallbackTunnel; ReasonForTermination _termReason; std::string _fatalErrorMessage; From b388d9fdc9ff161f56e03e224eb72baa620ed28d Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 22 May 2015 10:14:44 -0700 Subject: [PATCH 2/7] TCP proxy should actually bind to its TCP port. Seems like a good idea. --- tcp-proxy/tcp-proxy.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tcp-proxy/tcp-proxy.cpp b/tcp-proxy/tcp-proxy.cpp index 9e3f5d079..cbc3d0cca 100644 --- a/tcp-proxy/tcp-proxy.cpp +++ b/tcp-proxy/tcp-proxy.cpp @@ -45,6 +45,8 @@ #define ZT_TCP_PROXY_UDP_POOL_START_PORT 10000 #define ZT_TCP_PROXY_CONNECTION_TIMEOUT_SECONDS 300 +#define ZT_TCP_PROXY_TCP_PORT 443 + using namespace ZeroTier; /* @@ -317,6 +319,17 @@ int main(int argc,char **argv) } } + { + struct sockaddr_in laddr; + memset(&laddr,0,sizeof(laddr)); + laddr.sin_family = AF_INET; + laddr.sin_port = htons(ZT_TCP_PROXY_TCP_PORT); + if (!phy.tcpListen((const struct sockaddr *)&laddr)) { + fprintf(stderr,"%s: fatal error: unable to bind TCP port %d\n",argv[0],ZT_TCP_PROXY_TCP_PORT); + return 1; + } + } + time_t lastDidHousekeeping = time((time_t *)0); for(;;) { phy.poll(120000); @@ -326,4 +339,6 @@ int main(int argc,char **argv) svc.doHousekeeping(); } } + + return 0; } From 196f27f1f0b0e4162c9e57d9dfe07509d8e28370 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 22 May 2015 13:11:55 -0700 Subject: [PATCH 3/7] Add delay to NAT-t escalation stuff to try to address GitHub issue #167 --- node/Switch.cpp | 59 ++++++++++++++++++++----------------------------- node/Switch.hpp | 2 +- 2 files changed, 25 insertions(+), 36 deletions(-) diff --git a/node/Switch.cpp b/node/Switch.cpp index 19a77db74..156878fcd 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -444,42 +444,31 @@ unsigned long Switch::doTimerTasks(uint64_t now) continue; } else { // Nope, nothing yet. Time to kill some kittens. - - switch(qi->strategyIteration++) { - - case 0: { - // First strategy: rifle method: direct packet to known port - qi->peer->attemptToContactAt(RR,qi->inaddr,now); - } break; - - case 1: { - // Second strategy: shotgun method up: try a few ports above - InetAddress tmpaddr(qi->inaddr); - int p = (int)qi->inaddr.port(); - for(int i=0;i<9;++i) { - if (++p > 0xffff) break; - tmpaddr.setPort((unsigned int)p); - qi->peer->attemptToContactAt(RR,tmpaddr,now); - } - } break; - - case 2: { - // Third strategy: shotgun method down: try a few ports below - InetAddress tmpaddr(qi->inaddr); - int p = (int)qi->inaddr.port(); - for(int i=0;i<3;++i) { - if (--p < 1024) break; - tmpaddr.setPort((unsigned int)p); - qi->peer->attemptToContactAt(RR,tmpaddr,now); - } - - // We've tried all strategies - _contactQueue.erase(qi++); - continue; - } break; - + if (qi->strategyIteration == 0) { + // First stragegy: send packet directly (we already tried this but try again) + qi->peer->attemptToContactAt(RR,qi->inaddr,now); + } else if (qi->strategyIteration <= 9) { + // Strategies 1-9: try escalating ports + InetAddress tmpaddr(qi->inaddr); + int p = (int)qi->inaddr.port() + qi->strategyIteration; + if (p < 0xffff) { + tmpaddr.setPort((unsigned int)p); + qi->peer->attemptToContactAt(RR,tmpaddr,now); + } + } else if (qi->strategyIteration <= 18) { + // Strategies 10-18: try ports below + InetAddress tmpaddr(qi->inaddr); + int p = (int)qi->inaddr.port() - (qi->strategyIteration - 9); + if (p >= 1024) { + tmpaddr.setPort((unsigned int)p); + qi->peer->attemptToContactAt(RR,tmpaddr,now); + } + } else { + // All strategies tried, expire entry + _contactQueue.erase(qi++); + continue; } - + ++qi->strategyIteration; qi->fireAtTime = now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY; nextDelay = std::min(nextDelay,(unsigned long)ZT_NAT_T_TACTICAL_ESCALATION_DELAY); } diff --git a/node/Switch.hpp b/node/Switch.hpp index 0b748247e..e944c843d 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -250,7 +250,7 @@ private: peer(p), fireAtTime(ft), inaddr(a), - strategyIteration(1) {} // start with 2nd strategy, since first was tried at inception + strategyIteration(0) {} SharedPtr peer; uint64_t fireAtTime; From cdec05af24f129ed12cf2b43eaa876521c358a0b Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 22 May 2015 13:12:14 -0700 Subject: [PATCH 4/7] One second delay between NAT-t events. --- node/Constants.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/Constants.hpp b/node/Constants.hpp index 1da10d11b..ed227f9f7 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -297,7 +297,7 @@ /** * Delay between initial direct NAT-t packet and more aggressive techniques */ -#define ZT_NAT_T_TACTICAL_ESCALATION_DELAY 2000 +#define ZT_NAT_T_TACTICAL_ESCALATION_DELAY 1000 /** * Size of anti-recursion history (see AntiRecursion.hpp) From 9774f789f21c05267ef35cc35684b0a7ce2efb2a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 22 May 2015 14:52:23 -0700 Subject: [PATCH 5/7] TCP fallback tunneling is now working. That hurt more than expected. --- node/Node.cpp | 8 +- node/Node.hpp | 1 - service/ControlPlane.cpp | 16 ++-- service/OneService.cpp | 128 ++++++++++++++++++---------- service/OneService.hpp | 5 ++ service/README.md | 1 + tcp-proxy/tcp-proxy.cpp | 176 ++++++++++++++++++--------------------- 7 files changed, 181 insertions(+), 154 deletions(-) diff --git a/node/Node.cpp b/node/Node.cpp index 6b3f1f2c9..c5c9873cd 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -77,7 +77,6 @@ Node::Node( _networks(), _networks_m(), _now(now), - _startTimeAfterInactivity(0), _lastPingCheck(0), _lastHousekeepingRun(0), _lastBeacon(0) @@ -217,17 +216,12 @@ ZT1_ResultCode Node::processBackgroundTasks(uint64_t now,volatile uint64_t *next if ((now - _lastPingCheck) >= ZT_PING_CHECK_INVERVAL) { _lastPingCheck = now; - // This is used to compute whether we appear to be "online" or not - if ((now - _startTimeAfterInactivity) > (ZT_PING_CHECK_INVERVAL * 3)) - _startTimeAfterInactivity = now; - try { _PingPeersThatNeedPing pfunc(RR,now); RR->topology->eachPeer<_PingPeersThatNeedPing &>(pfunc); - const uint64_t lastActivityAgo = now - std::max(_startTimeAfterInactivity,pfunc.lastReceiveFromUpstream); bool oldOnline = _online; - _online = (lastActivityAgo < ZT_PEER_ACTIVITY_TIMEOUT); + _online = ((now - pfunc.lastReceiveFromUpstream) < ZT_PEER_ACTIVITY_TIMEOUT); if (oldOnline != _online) postEvent(_online ? ZT1_EVENT_ONLINE : ZT1_EVENT_OFFLINE); } catch ( ... ) { diff --git a/node/Node.hpp b/node/Node.hpp index f8678115d..1d9372e46 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -229,7 +229,6 @@ private: Mutex _backgroundTasksLock; uint64_t _now; - uint64_t _startTimeAfterInactivity; uint64_t _lastPingCheck; uint64_t _lastHousekeepingRun; uint64_t _lastBeacon; diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp index 2e8290edf..71b3fd3fb 100644 --- a/service/ControlPlane.cpp +++ b/service/ControlPlane.cpp @@ -360,18 +360,20 @@ unsigned int ControlPlane::handleRequest( _node->status(&status); Utils::snprintf(json,sizeof(json), "{\n" - "\t\"address\":\"%.10llx\",\n" - "\t\"publicIdentity\":\"%s\",\n" - "\t\"online\":%s,\n" - "\t\"versionMajor\":%d,\n" - "\t\"versionMinor\":%d,\n" - "\t\"versionRev\":%d,\n" - "\t\"version\":\"%d.%d.%d\",\n" + "\t\"address\": \"%.10llx\",\n" + "\t\"publicIdentity\": \"%s\",\n" + "\t\"online\": %s,\n" + "\t\"tcpFallbackActive\": %s,\n" + "\t\"versionMajor\": %d,\n" + "\t\"versionMinor\": %d,\n" + "\t\"versionRev\": %d,\n" + "\t\"version\": \"%d.%d.%d\",\n" "\t\"clock\": %llu\n" "}\n", status.address, status.publicIdentity, (status.online) ? "true" : "false", + (_svc->tcpFallbackActive()) ? "true" : "false", ZEROTIER_ONE_VERSION_MAJOR, ZEROTIER_ONE_VERSION_MINOR, ZEROTIER_ONE_VERSION_REVISION, diff --git a/service/OneService.cpp b/service/OneService.cpp index c0b47af12..797825a7b 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,16 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +/** + * Uncomment to enable UDP breakage switch + * + * If this is defined, the presence of a file called /tmp/ZT_BREAK_UDP + * will cause direct UDP TX/RX to stop working. This can be used to + * test TCP tunneling fallback and other robustness features. Deleting + * this file will cause it to start working again. + */ +//#define ZT_BREAK_UDP + #ifdef ZT_ENABLE_NETWORK_CONTROLLER #include "../controller/SqliteNetworkController.hpp" #else @@ -369,6 +379,7 @@ struct TcpConnection std::string body; std::string writeBuf; + Mutex writeBuf_m; }; class OneServiceImpl : public OneService @@ -380,12 +391,13 @@ public: #ifdef ZT_ENABLE_NETWORK_CONTROLLER _controller((_homePath + ZT_PATH_SEPARATOR_S + ZT1_CONTROLLER_DB_PATH).c_str()), #endif - _phy(this,true), + _phy(this,false), _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), _node((Node *)0), _controlPlane((ControlPlane *)0), _lastDirectReceiveFromGlobal(0), _lastSendToGlobal(0), + _lastRestart(0), _nextBackgroundTaskDeadline(0), _tcpFallbackTunnel((TcpConnection *)0), _termReason(ONE_STILL_RUNNING), @@ -481,6 +493,8 @@ public: } _nextBackgroundTaskDeadline = 0; + uint64_t clockShouldBe = OSUtils::now(); + _lastRestart = clockShouldBe; uint64_t lastTapMulticastGroupCheck = 0; uint64_t lastTcpFallbackResolve = 0; #ifdef ZT_AUTO_UPDATE @@ -496,13 +510,18 @@ public: break; } else _run_m.unlock(); - uint64_t dl = _nextBackgroundTaskDeadline; uint64_t now = OSUtils::now(); + + uint64_t dl = _nextBackgroundTaskDeadline; if (dl <= now) { _node->processBackgroundTasks(now,&_nextBackgroundTaskDeadline); dl = _nextBackgroundTaskDeadline; } + // Attempt to detect sleep/wake events by detecting delay overruns + if ((now > clockShouldBe)&&((now - clockShouldBe) > 2000)) + _lastRestart = now; + #ifdef ZT_AUTO_UPDATE if ((now - lastSoftwareUpdateCheck) >= ZT_AUTO_UPDATE_CHECK_PERIOD) { lastSoftwareUpdateCheck = OSUtils::now(); @@ -532,6 +551,7 @@ public: } const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100; + clockShouldBe = now + (uint64_t)delay; _phy.poll(delay); } } catch (std::exception &exc) { @@ -585,6 +605,11 @@ public: return std::string(); } + virtual bool tcpFallbackActive() const + { + return (_tcpFallbackTunnel != (TcpConnection *)0); + } + virtual void terminate() { _run_m.lock(); @@ -597,7 +622,11 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { - if (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) +#ifdef ZT_BREAK_UDP + if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) + return; +#endif + if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT1_ResultCode rc = _node->processWirePacket( OSUtils::now(), @@ -624,7 +653,7 @@ public: TcpConnection *tc = &(_tcpConnections[sock]); tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; - tc->shouldKeepAlive = true; // unused + tc->shouldKeepAlive = true; tc->parent = this; tc->sock = sock; // from and parser are not used @@ -634,8 +663,6 @@ public: tc->writeBuf = ""; *uptr = (void *)tc; - _tcpFallbackTunnel = tc; - // Send "hello" message tc->writeBuf.push_back((char)0x17); tc->writeBuf.push_back((char)0x03); @@ -647,6 +674,8 @@ public: tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff)); tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff)); _phy.tcpSetNotifyWritable(sock,true); + + _tcpFallbackTunnel = tc; } inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) @@ -699,11 +728,7 @@ public: case TcpConnection::TCP_TUNNEL_OUTGOING: tc->body.append((const char *)data,len); - if (tc->body.length() > 65535) { - // sanity limit -- a message will never be this big since mlen is 16-bit - _phy.close(sock); - return; - } else if (tc->body.length() >= 5) { + while (tc->body.length() >= 5) { const char *data = tc->body.data(); const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) ); if (tc->body.length() >= (mlen + 5)) { @@ -768,7 +793,7 @@ public: if (tc->body.length() > (mlen + 5)) tc->body = tc->body.substr(mlen + 5); else tc->body = ""; - } + } else break; } break; @@ -778,18 +803,23 @@ public: inline void phyOnTcpWritable(PhySocket *sock,void **uptr) { TcpConnection *tc = reinterpret_cast(*uptr); - if (tc->writeBuf.length()) { + Mutex::Lock _l(tc->writeBuf_m); + if (tc->writeBuf.length() > 0) { long sent = (long)_phy.tcpSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); if (sent > 0) { tc->lastActivity = OSUtils::now(); - if ((unsigned long)sent == (unsigned long)tc->writeBuf.length()) { + if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) { tc->writeBuf = ""; _phy.tcpSetNotifyWritable(sock,false); if (!tc->shouldKeepAlive) _phy.close(sock); // will call close handler to delete from _tcpConnections - } else tc->writeBuf = tc->writeBuf.substr(sent); + } else { + tc->writeBuf = tc->writeBuf.substr(sent); + } } - } else _phy.tcpSetNotifyWritable(sock,false); // sanity check... shouldn't happen + } else { + _phy.tcpSetNotifyWritable(sock,false); + } } inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) @@ -939,24 +969,26 @@ public: int result = -1; switch(addr->ss_family) { case AF_INET: - //if (_v4UdpSocket) - //result = (_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif + if (_v4UdpSocket) + result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif - if (reinterpret_cast(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) { +#ifdef ZT1_TCP_FALLBACK_RELAY + // TCP fallback tunnel support + if ((len >= 16)&&(reinterpret_cast(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { uint64_t now = OSUtils::now(); - /* Engage TCP fallback if we've sent something more than PING_CHECK_INTERVAL ago, - * less than twice the fallback period ago, and haven't heard anything in the fallback - * timeout period. In that case our packets to the global IP scope are probably - * being blocked, so start trying to tunnel them out. - * - * Note that we *always* send them as plain UDP above. This way if we ever get unblocked - * or if our connectivity changes for the better, we will instantly "fail forward" back - * to plain vanilla UDP and abandon the tunnel. - * - * TCP fallback is currently only supported for tunneling to IPv4 addresses. */ - if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > ZT_PING_CHECK_INVERVAL)&&((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)) { + // Engage TCP tunnel fallback if we haven't received anything valid from a global + // IP address in ZT1_TCP_FALLBACK_AFTER milliseconds. If we do start getting + // valid direct traffic we'll stop using it and close the socket after a while. + if (((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT1_TCP_FALLBACK_AFTER)) { if (_tcpFallbackTunnel) { + Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m); if (!_tcpFallbackTunnel->writeBuf.length()) _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true); unsigned long mlen = len + 7; @@ -969,7 +1001,8 @@ public: _tcpFallbackTunnel->writeBuf.append(reinterpret_cast(reinterpret_cast(&(reinterpret_cast(addr)->sin_addr.s_addr))),4); _tcpFallbackTunnel->writeBuf.append(reinterpret_cast(reinterpret_cast(&(reinterpret_cast(addr)->sin_port))),2); _tcpFallbackTunnel->writeBuf.append((const char *)data,len); - } else { + result = 0; + } else if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) { std::vector tunnelIps(_tcpFallbackResolver.get()); if (tunnelIps.empty()) { if (!_tcpFallbackResolver.running()) @@ -985,11 +1018,18 @@ public: _lastSendToGlobal = now; } +#endif // ZT1_TCP_FALLBACK_RELAY break; case AF_INET6: +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif if (_v6UdpSocket) - result = (_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + result = ((_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif break; default: return -1; @@ -1039,16 +1079,19 @@ public: } Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr); - tc->writeBuf.assign(tmpn); - tc->writeBuf.append("Content-Type: "); - tc->writeBuf.append(contentType); - Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); - tc->writeBuf.append(tmpn); - if (!tc->shouldKeepAlive) - tc->writeBuf.append("Connection: close\r\n"); - tc->writeBuf.append("\r\n"); - if (tc->parser.method != HTTP_HEAD) - tc->writeBuf.append(data); + { + Mutex::Lock _l(tc->writeBuf_m); + tc->writeBuf.assign(tmpn); + tc->writeBuf.append("Content-Type: "); + tc->writeBuf.append(contentType); + Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); + tc->writeBuf.append(tmpn); + if (!tc->shouldKeepAlive) + tc->writeBuf.append("Connection: close\r\n"); + tc->writeBuf.append("\r\n"); + if (tc->parser.method != HTTP_HEAD) + tc->writeBuf.append(data); + } _phy.tcpSetNotifyWritable(tc->sock,true); } @@ -1092,6 +1135,7 @@ private: ControlPlane *_controlPlane; uint64_t _lastDirectReceiveFromGlobal; uint64_t _lastSendToGlobal; + uint64_t _lastRestart; volatile uint64_t _nextBackgroundTaskDeadline; std::map< uint64_t,EthernetTap * > _taps; diff --git a/service/OneService.hpp b/service/OneService.hpp index aea314f59..7964958c7 100644 --- a/service/OneService.hpp +++ b/service/OneService.hpp @@ -126,6 +126,11 @@ public: */ virtual std::string portDeviceName(uint64_t nwid) const = 0; + /** + * @return True if TCP fallback is currently active + */ + virtual bool tcpFallbackActive() const = 0; + /** * Terminate background service (can be called from other threads) */ diff --git a/service/README.md b/service/README.md index 1900a6127..acad97a14 100644 --- a/service/README.md +++ b/service/README.md @@ -26,6 +26,7 @@ A *jsonp* URL argument may be supplied to request JSONP encapsulation. A JSONP r addressstring10-digit hexadecimal ZeroTier address of this nodeno publicIdentitystringFull public ZeroTier identity of this nodeno onlinebooleanDoes this node appear to have upstream network access?no +tcpFallbackActivebooleanIs TCP fallback mode active?no versionMajorintegerZeroTier major versionno versionMinorintegerZeroTier minor versionno versionRevintegerZeroTier revisionno diff --git a/tcp-proxy/tcp-proxy.cpp b/tcp-proxy/tcp-proxy.cpp index cbc3d0cca..f7ba2c2f6 100644 --- a/tcp-proxy/tcp-proxy.cpp +++ b/tcp-proxy/tcp-proxy.cpp @@ -25,6 +25,17 @@ * LLC. Start here: http://www.zerotier.com/ */ +// HACK! Will eventually use epoll() or something in Phy<> instead of select(). +// Also be sure to change ulimit -n and fs.file-max in /etc/sysctl.conf on relays. +#if defined(__linux__) || defined(__LINUX__) || defined(__LINUX) || defined(LINUX) +#include +#include +#undef __FD_SETSIZE +#define __FD_SETSIZE 1048576 +#undef FD_SETSIZE +#define FD_SETSIZE 1048576 +#endif + #include #include #include @@ -41,10 +52,7 @@ #include "../osdep/Phy.hpp" -#define ZT_TCP_PROXY_UDP_POOL_SIZE 1024 -#define ZT_TCP_PROXY_UDP_POOL_START_PORT 10000 #define ZT_TCP_PROXY_CONNECTION_TIMEOUT_SECONDS 300 - #define ZT_TCP_PROXY_TCP_PORT 443 using namespace ZeroTier; @@ -90,8 +98,7 @@ struct TcpProxyService; struct TcpProxyService { Phy *phy; - PhySocket *udpPool[ZT_TCP_PROXY_UDP_POOL_SIZE]; - + int udpPortCounter; struct Client { char tcpReadBuf[131072]; @@ -99,99 +106,101 @@ struct TcpProxyService unsigned long tcpWritePtr; unsigned long tcpReadPtr; PhySocket *tcp; - PhySocket *assignedUdp; + PhySocket *udp; time_t lastActivity; bool newVersion; }; - std::map< PhySocket *,Client > clients; - struct ReverseMappingKey + PhySocket *getUnusedUdp(void *uptr) { - uint64_t sourceZTAddress; - PhySocket *sendingUdpSocket; - uint32_t destIp; - unsigned int destPort; - - ReverseMappingKey() {} - ReverseMappingKey(uint64_t zt,PhySocket *s,uint32_t ip,unsigned int port) : sourceZTAddress(zt),sendingUdpSocket(s),destIp(ip),destPort(port) {} - inline bool operator<(const ReverseMappingKey &k) const throw() { return (memcmp((const void *)this,(const void *)&k,sizeof(ReverseMappingKey)) < 0); } - inline bool operator==(const ReverseMappingKey &k) const throw() { return (memcmp((const void *)this,(const void *)&k,sizeof(ReverseMappingKey)) == 0); } - }; - - std::map< ReverseMappingKey,Client * > reverseMappings; + for(int i=0;i<65535;++i) { + ++udpPortCounter; + if (udpPortCounter > 0xfffe) + udpPortCounter = 1024; + struct sockaddr_in laddr; + memset(&laddr,0,sizeof(struct sockaddr_in)); + laddr.sin_family = AF_INET; + laddr.sin_port = htons((uint16_t)udpPortCounter); + PhySocket *udp = phy->udpBind(reinterpret_cast(&laddr),uptr); + if (udp) + return udp; + } + return (PhySocket *)0; + } void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { - if ((from->sa_family == AF_INET)&&(len > 16)&&(len < 2048)) { - const uint64_t destZt = ( - (((uint64_t)(((const unsigned char *)data)[8])) << 32) | - (((uint64_t)(((const unsigned char *)data)[9])) << 24) | - (((uint64_t)(((const unsigned char *)data)[10])) << 16) | - (((uint64_t)(((const unsigned char *)data)[11])) << 8) | - ((uint64_t)(((const unsigned char *)data)[12])) ); - const uint32_t fromIp = ((const struct sockaddr_in *)from)->sin_addr.s_addr; - const unsigned int fromPort = ntohs(((const struct sockaddr_in *)from)->sin_port); + if (!*uptr) + return; + if ((from->sa_family == AF_INET)&&(len >= 16)&&(len < 2048)) { + Client &c = *((Client *)*uptr); + c.lastActivity = time((time_t *)0); - std::map< ReverseMappingKey,Client * >::iterator rm(reverseMappings.find(ReverseMappingKey(destZt,sock,fromIp,fromPort))); - if (rm != reverseMappings.end()) { - Client &c = *(rm->second); + unsigned long mlen = len; + if (c.newVersion) + mlen += 7; // new clients get IP info - unsigned long mlen = len; - if (c.newVersion) - mlen += 7; // new clients get IP info + if ((c.tcpWritePtr + 5 + mlen) <= sizeof(c.tcpWriteBuf)) { + if (!c.tcpWritePtr) + phy->tcpSetNotifyWritable(c.tcp,true); - if ((c.tcpWritePtr + 5 + mlen) <= sizeof(c.tcpWriteBuf)) { - if (!c.tcpWritePtr) - phy->tcpSetNotifyWritable(c.tcp,true); + c.tcpWriteBuf[c.tcpWritePtr++] = 0x17; // look like TLS data + c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 + c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 - c.tcpWriteBuf[c.tcpWritePtr++] = 0x17; // look like TLS data - c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 - c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 + c.tcpWriteBuf[c.tcpWritePtr++] = (char)((mlen >> 8) & 0xff); + c.tcpWriteBuf[c.tcpWritePtr++] = (char)(mlen & 0xff); - c.tcpWriteBuf[c.tcpWritePtr++] = (char)((mlen >> 8) & 0xff); - c.tcpWriteBuf[c.tcpWritePtr++] = (char)(mlen & 0xff); - - if (c.newVersion) { - c.tcpWriteBuf[c.tcpWritePtr++] = (char)4; // IPv4 - *((uint32_t *)(c.tcpWriteBuf + c.tcpWritePtr)) = fromIp; - c.tcpWritePtr += 4; - c.tcpWriteBuf[c.tcpWritePtr++] = (char)((fromPort >> 8) & 0xff); - c.tcpWriteBuf[c.tcpWritePtr++] = (char)(fromPort & 0xff); - } - - for(unsigned long i=0;isin_addr.s_addr; + c.tcpWritePtr += 4; + *((uint16_t *)(c.tcpWriteBuf + c.tcpWritePtr)) = ((const struct sockaddr_in *)from)->sin_port; + c.tcpWritePtr += 2; } + + for(unsigned long i=0;i %.16llx\n",inet_ntoa(reinterpret_cast(from)->sin_addr),(int)ntohs(reinterpret_cast(from)->sin_port),(unsigned long long)&c); } } void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { - // unused, we don't initiate + // unused, we don't initiate outbound connections } void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) { Client &c = clients[sockN]; + PhySocket *udp = getUnusedUdp((void *)&c); + if (!udp) { + phy->close(sockN); + clients.erase(sockN); + //printf("** TCP rejected, no more UDP ports to assign\n"); + return; + } c.tcpWritePtr = 0; c.tcpReadPtr = 0; c.tcp = sockN; - c.assignedUdp = udpPool[rand() % ZT_TCP_PROXY_UDP_POOL_SIZE]; + c.udp = udp; c.lastActivity = time((time_t *)0); c.newVersion = false; *uptrN = (void *)&c; + //printf("<< TCP from %s -> %.16llx\n",inet_ntoa(reinterpret_cast(from)->sin_addr),(unsigned long long)&c); } void phyOnTcpClose(PhySocket *sock,void **uptr) { - for(std::map< ReverseMappingKey,Client * >::iterator rm(reverseMappings.begin());rm!=reverseMappings.end();) { - if (rm->second == (Client *)*uptr) - reverseMappings.erase(rm++); - else ++rm; - } + if (!*uptr) + return; + Client &c = *((Client *)*uptr); + phy->close(c.udp); clients.erase(sock); + //printf("** TCP %.16llx closed\n",(unsigned long long)*uptr); } void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) @@ -212,6 +221,7 @@ struct TcpProxyService if (mlen == 4) { // Right now just sending this means the client is 'new enough' for the IP header c.newVersion = true; + //printf("<< TCP %.16llx HELLO\n",(unsigned long long)*uptr); } else if (mlen >= 7) { char *payload = c.tcpReadBuf + 5; unsigned long payloadLen = mlen; @@ -241,22 +251,8 @@ struct TcpProxyService // Note: we do not relay to privileged ports... just an abuse prevention rule. if ((ntohs(dest.sin_port) > 1024)&&(payloadLen >= 16)) { - if ((payloadLen >= 28)&&(payload[13] != (char)0xff)) { - // Learn reverse mappings -- we will route replies to these packets - // back to their sending TCP socket. They're on a first come first - // served basis. - const uint64_t sourceZt = ( - (((uint64_t)(((const unsigned char *)payload)[13])) << 32) | - (((uint64_t)(((const unsigned char *)payload)[14])) << 24) | - (((uint64_t)(((const unsigned char *)payload)[15])) << 16) | - (((uint64_t)(((const unsigned char *)payload)[16])) << 8) | - ((uint64_t)(((const unsigned char *)payload)[17])) ); - ReverseMappingKey k(sourceZt,c.assignedUdp,dest.sin_addr.s_addr,ntohl(dest.sin_port)); - if (reverseMappings.count(k) == 0) - reverseMappings[k] = &c; - } - - phy->udpSend(c.assignedUdp,(const struct sockaddr *)&dest,payload,payloadLen); + phy->udpSend(c.udp,(const struct sockaddr *)&dest,payload,payloadLen); + //printf(">> TCP %.16llx to %s:%d\n",(unsigned long long)*uptr,inet_ntoa(dest.sin_addr),(int)ntohs(dest.sin_port)); } } @@ -284,11 +280,13 @@ struct TcpProxyService std::vector toClose; time_t now = time((time_t *)0); for(std::map< PhySocket *,Client >::iterator c(clients.begin());c!=clients.end();++c) { - if ((now - c->second.lastActivity) >= ZT_TCP_PROXY_CONNECTION_TIMEOUT_SECONDS) + if ((now - c->second.lastActivity) >= ZT_TCP_PROXY_CONNECTION_TIMEOUT_SECONDS) { toClose.push_back(c->first); + toClose.push_back(c->second.udp); + } } for(std::vector::iterator s(toClose.begin());s!=toClose.end();++s) - phy->close(*s); // will call phyOnTcpClose() which does cleanup + phy->close(*s); } }; @@ -299,25 +297,9 @@ int main(int argc,char **argv) srand(time((time_t *)0)); TcpProxyService svc; - Phy phy(&svc,true); + Phy phy(&svc,false); svc.phy = &phy; - - { - int poolSize = 0; - for(unsigned int p=ZT_TCP_PROXY_UDP_POOL_START_PORT;((poolSize Date: Fri, 22 May 2015 15:33:33 -0700 Subject: [PATCH 6/7] typo --- node/Switch.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/Switch.cpp b/node/Switch.cpp index 156878fcd..aadbafa00 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -445,7 +445,7 @@ unsigned long Switch::doTimerTasks(uint64_t now) } else { // Nope, nothing yet. Time to kill some kittens. if (qi->strategyIteration == 0) { - // First stragegy: send packet directly (we already tried this but try again) + // First strategy: send packet directly (we already tried this but try again) qi->peer->attemptToContactAt(RR,qi->inaddr,now); } else if (qi->strategyIteration <= 9) { // Strategies 1-9: try escalating ports From d8783b14eb465cd97950afd726e940bbe3708c8a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 22 May 2015 15:46:06 -0700 Subject: [PATCH 7/7] Build fix. --- node/Switch.cpp | 4 ++-- service/OneService.cpp | 19 ++++++++++++------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/node/Switch.cpp b/node/Switch.cpp index aadbafa00..4bdf2d84f 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -454,7 +454,7 @@ unsigned long Switch::doTimerTasks(uint64_t now) if (p < 0xffff) { tmpaddr.setPort((unsigned int)p); qi->peer->attemptToContactAt(RR,tmpaddr,now); - } + } else qi->strategyIteration = 9; } else if (qi->strategyIteration <= 18) { // Strategies 10-18: try ports below InetAddress tmpaddr(qi->inaddr); @@ -462,7 +462,7 @@ unsigned long Switch::doTimerTasks(uint64_t now) if (p >= 1024) { tmpaddr.setPort((unsigned int)p); qi->peer->attemptToContactAt(RR,tmpaddr,now); - } + } else qi->strategyIteration = 18; } else { // All strategies tried, expire entry _contactQueue.erase(qi++); diff --git a/service/OneService.cpp b/service/OneService.cpp index 797825a7b..9a72c2408 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -566,7 +566,7 @@ public: try { while (!_tcpConnections.empty()) - _phy.close(_tcpConnections.begin()->first); + _phy.close((*_tcpConnections.begin())->sock); } catch ( ... ) {} { @@ -651,7 +651,9 @@ public: // Outgoing TCP connections are always TCP fallback tunnel connections. - TcpConnection *tc = &(_tcpConnections[sock]); + TcpConnection *tc = new TcpConnection(); + _tcpConnections.insert(tc); + tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; tc->shouldKeepAlive = true; tc->parent = this; @@ -682,7 +684,9 @@ public: { // Incoming TCP connections are HTTP JSON API requests. - TcpConnection *tc = &(_tcpConnections[sockN]); + TcpConnection *tc = new TcpConnection(); + _tcpConnections.insert(tc); + tc->type = TcpConnection::TCP_HTTP_INCOMING; tc->shouldKeepAlive = true; tc->parent = this; @@ -704,11 +708,12 @@ public: inline void phyOnTcpClose(PhySocket *sock,void **uptr) { - std::map< PhySocket *,TcpConnection >::iterator tc(_tcpConnections.find(sock)); - if (tc != _tcpConnections.end()) { - if (&(tc->second) == _tcpFallbackTunnel) + TcpConnection *tc = (TcpConnection *)*uptr; + if (tc) { + if (tc == _tcpFallbackTunnel) _tcpFallbackTunnel = (TcpConnection *)0; _tcpConnections.erase(tc); + delete tc; } } @@ -1142,7 +1147,7 @@ private: std::map< uint64_t,std::vector > _tapAssignedIps; // ZeroTier assigned IPs, not user or dhcp assigned Mutex _taps_m; - std::map< PhySocket *,TcpConnection > _tcpConnections; // no mutex for this since it's done in the main loop thread only + std::set< TcpConnection * > _tcpConnections; // no mutex for this since it's done in the main loop thread only TcpConnection *_tcpFallbackTunnel; ReasonForTermination _termReason;