From 9774f789f21c05267ef35cc35684b0a7ce2efb2a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 22 May 2015 14:52:23 -0700 Subject: [PATCH] TCP fallback tunneling is now working. That hurt more than expected. --- node/Node.cpp | 8 +- node/Node.hpp | 1 - service/ControlPlane.cpp | 16 ++-- service/OneService.cpp | 128 ++++++++++++++++++---------- service/OneService.hpp | 5 ++ service/README.md | 1 + tcp-proxy/tcp-proxy.cpp | 176 ++++++++++++++++++--------------------- 7 files changed, 181 insertions(+), 154 deletions(-) diff --git a/node/Node.cpp b/node/Node.cpp index 6b3f1f2c9..c5c9873cd 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -77,7 +77,6 @@ Node::Node( _networks(), _networks_m(), _now(now), - _startTimeAfterInactivity(0), _lastPingCheck(0), _lastHousekeepingRun(0), _lastBeacon(0) @@ -217,17 +216,12 @@ ZT1_ResultCode Node::processBackgroundTasks(uint64_t now,volatile uint64_t *next if ((now - _lastPingCheck) >= ZT_PING_CHECK_INVERVAL) { _lastPingCheck = now; - // This is used to compute whether we appear to be "online" or not - if ((now - _startTimeAfterInactivity) > (ZT_PING_CHECK_INVERVAL * 3)) - _startTimeAfterInactivity = now; - try { _PingPeersThatNeedPing pfunc(RR,now); RR->topology->eachPeer<_PingPeersThatNeedPing &>(pfunc); - const uint64_t lastActivityAgo = now - std::max(_startTimeAfterInactivity,pfunc.lastReceiveFromUpstream); bool oldOnline = _online; - _online = (lastActivityAgo < ZT_PEER_ACTIVITY_TIMEOUT); + _online = ((now - pfunc.lastReceiveFromUpstream) < ZT_PEER_ACTIVITY_TIMEOUT); if (oldOnline != _online) postEvent(_online ? ZT1_EVENT_ONLINE : ZT1_EVENT_OFFLINE); } catch ( ... ) { diff --git a/node/Node.hpp b/node/Node.hpp index f8678115d..1d9372e46 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -229,7 +229,6 @@ private: Mutex _backgroundTasksLock; uint64_t _now; - uint64_t _startTimeAfterInactivity; uint64_t _lastPingCheck; uint64_t _lastHousekeepingRun; uint64_t _lastBeacon; diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp index 2e8290edf..71b3fd3fb 100644 --- a/service/ControlPlane.cpp +++ b/service/ControlPlane.cpp @@ -360,18 +360,20 @@ unsigned int ControlPlane::handleRequest( _node->status(&status); Utils::snprintf(json,sizeof(json), "{\n" - "\t\"address\":\"%.10llx\",\n" - "\t\"publicIdentity\":\"%s\",\n" - "\t\"online\":%s,\n" - "\t\"versionMajor\":%d,\n" - "\t\"versionMinor\":%d,\n" - "\t\"versionRev\":%d,\n" - "\t\"version\":\"%d.%d.%d\",\n" + "\t\"address\": \"%.10llx\",\n" + "\t\"publicIdentity\": \"%s\",\n" + "\t\"online\": %s,\n" + "\t\"tcpFallbackActive\": %s,\n" + "\t\"versionMajor\": %d,\n" + "\t\"versionMinor\": %d,\n" + "\t\"versionRev\": %d,\n" + "\t\"version\": \"%d.%d.%d\",\n" "\t\"clock\": %llu\n" "}\n", status.address, status.publicIdentity, (status.online) ? "true" : "false", + (_svc->tcpFallbackActive()) ? "true" : "false", ZEROTIER_ONE_VERSION_MAJOR, ZEROTIER_ONE_VERSION_MINOR, ZEROTIER_ONE_VERSION_REVISION, diff --git a/service/OneService.cpp b/service/OneService.cpp index c0b47af12..797825a7b 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,16 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +/** + * Uncomment to enable UDP breakage switch + * + * If this is defined, the presence of a file called /tmp/ZT_BREAK_UDP + * will cause direct UDP TX/RX to stop working. This can be used to + * test TCP tunneling fallback and other robustness features. Deleting + * this file will cause it to start working again. + */ +//#define ZT_BREAK_UDP + #ifdef ZT_ENABLE_NETWORK_CONTROLLER #include "../controller/SqliteNetworkController.hpp" #else @@ -369,6 +379,7 @@ struct TcpConnection std::string body; std::string writeBuf; + Mutex writeBuf_m; }; class OneServiceImpl : public OneService @@ -380,12 +391,13 @@ public: #ifdef ZT_ENABLE_NETWORK_CONTROLLER _controller((_homePath + ZT_PATH_SEPARATOR_S + ZT1_CONTROLLER_DB_PATH).c_str()), #endif - _phy(this,true), + _phy(this,false), _overrideRootTopology((overrideRootTopology) ? overrideRootTopology : ""), _node((Node *)0), _controlPlane((ControlPlane *)0), _lastDirectReceiveFromGlobal(0), _lastSendToGlobal(0), + _lastRestart(0), _nextBackgroundTaskDeadline(0), _tcpFallbackTunnel((TcpConnection *)0), _termReason(ONE_STILL_RUNNING), @@ -481,6 +493,8 @@ public: } _nextBackgroundTaskDeadline = 0; + uint64_t clockShouldBe = OSUtils::now(); + _lastRestart = clockShouldBe; uint64_t lastTapMulticastGroupCheck = 0; uint64_t lastTcpFallbackResolve = 0; #ifdef ZT_AUTO_UPDATE @@ -496,13 +510,18 @@ public: break; } else _run_m.unlock(); - uint64_t dl = _nextBackgroundTaskDeadline; uint64_t now = OSUtils::now(); + + uint64_t dl = _nextBackgroundTaskDeadline; if (dl <= now) { _node->processBackgroundTasks(now,&_nextBackgroundTaskDeadline); dl = _nextBackgroundTaskDeadline; } + // Attempt to detect sleep/wake events by detecting delay overruns + if ((now > clockShouldBe)&&((now - clockShouldBe) > 2000)) + _lastRestart = now; + #ifdef ZT_AUTO_UPDATE if ((now - lastSoftwareUpdateCheck) >= ZT_AUTO_UPDATE_CHECK_PERIOD) { lastSoftwareUpdateCheck = OSUtils::now(); @@ -532,6 +551,7 @@ public: } const unsigned long delay = (dl > now) ? (unsigned long)(dl - now) : 100; + clockShouldBe = now + (uint64_t)delay; _phy.poll(delay); } } catch (std::exception &exc) { @@ -585,6 +605,11 @@ public: return std::string(); } + virtual bool tcpFallbackActive() const + { + return (_tcpFallbackTunnel != (TcpConnection *)0); + } + virtual void terminate() { _run_m.lock(); @@ -597,7 +622,11 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { - if (reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) +#ifdef ZT_BREAK_UDP + if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) + return; +#endif + if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT1_ResultCode rc = _node->processWirePacket( OSUtils::now(), @@ -624,7 +653,7 @@ public: TcpConnection *tc = &(_tcpConnections[sock]); tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; - tc->shouldKeepAlive = true; // unused + tc->shouldKeepAlive = true; tc->parent = this; tc->sock = sock; // from and parser are not used @@ -634,8 +663,6 @@ public: tc->writeBuf = ""; *uptr = (void *)tc; - _tcpFallbackTunnel = tc; - // Send "hello" message tc->writeBuf.push_back((char)0x17); tc->writeBuf.push_back((char)0x03); @@ -647,6 +674,8 @@ public: tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff)); tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff)); _phy.tcpSetNotifyWritable(sock,true); + + _tcpFallbackTunnel = tc; } inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) @@ -699,11 +728,7 @@ public: case TcpConnection::TCP_TUNNEL_OUTGOING: tc->body.append((const char *)data,len); - if (tc->body.length() > 65535) { - // sanity limit -- a message will never be this big since mlen is 16-bit - _phy.close(sock); - return; - } else if (tc->body.length() >= 5) { + while (tc->body.length() >= 5) { const char *data = tc->body.data(); const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) ); if (tc->body.length() >= (mlen + 5)) { @@ -768,7 +793,7 @@ public: if (tc->body.length() > (mlen + 5)) tc->body = tc->body.substr(mlen + 5); else tc->body = ""; - } + } else break; } break; @@ -778,18 +803,23 @@ public: inline void phyOnTcpWritable(PhySocket *sock,void **uptr) { TcpConnection *tc = reinterpret_cast(*uptr); - if (tc->writeBuf.length()) { + Mutex::Lock _l(tc->writeBuf_m); + if (tc->writeBuf.length() > 0) { long sent = (long)_phy.tcpSend(sock,tc->writeBuf.data(),(unsigned long)tc->writeBuf.length(),true); if (sent > 0) { tc->lastActivity = OSUtils::now(); - if ((unsigned long)sent == (unsigned long)tc->writeBuf.length()) { + if ((unsigned long)sent >= (unsigned long)tc->writeBuf.length()) { tc->writeBuf = ""; _phy.tcpSetNotifyWritable(sock,false); if (!tc->shouldKeepAlive) _phy.close(sock); // will call close handler to delete from _tcpConnections - } else tc->writeBuf = tc->writeBuf.substr(sent); + } else { + tc->writeBuf = tc->writeBuf.substr(sent); + } } - } else _phy.tcpSetNotifyWritable(sock,false); // sanity check... shouldn't happen + } else { + _phy.tcpSetNotifyWritable(sock,false); + } } inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) @@ -939,24 +969,26 @@ public: int result = -1; switch(addr->ss_family) { case AF_INET: - //if (_v4UdpSocket) - //result = (_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif + if (_v4UdpSocket) + result = ((_phy.udpSend(_v4UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif - if (reinterpret_cast(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL) { +#ifdef ZT1_TCP_FALLBACK_RELAY + // TCP fallback tunnel support + if ((len >= 16)&&(reinterpret_cast(addr)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) { uint64_t now = OSUtils::now(); - /* Engage TCP fallback if we've sent something more than PING_CHECK_INTERVAL ago, - * less than twice the fallback period ago, and haven't heard anything in the fallback - * timeout period. In that case our packets to the global IP scope are probably - * being blocked, so start trying to tunnel them out. - * - * Note that we *always* send them as plain UDP above. This way if we ever get unblocked - * or if our connectivity changes for the better, we will instantly "fail forward" back - * to plain vanilla UDP and abandon the tunnel. - * - * TCP fallback is currently only supported for tunneling to IPv4 addresses. */ - if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > ZT_PING_CHECK_INVERVAL)&&((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)) { + // Engage TCP tunnel fallback if we haven't received anything valid from a global + // IP address in ZT1_TCP_FALLBACK_AFTER milliseconds. If we do start getting + // valid direct traffic we'll stop using it and close the socket after a while. + if (((now - _lastDirectReceiveFromGlobal) > ZT1_TCP_FALLBACK_AFTER)&&((now - _lastRestart) > ZT1_TCP_FALLBACK_AFTER)) { if (_tcpFallbackTunnel) { + Mutex::Lock _l(_tcpFallbackTunnel->writeBuf_m); if (!_tcpFallbackTunnel->writeBuf.length()) _phy.tcpSetNotifyWritable(_tcpFallbackTunnel->sock,true); unsigned long mlen = len + 7; @@ -969,7 +1001,8 @@ public: _tcpFallbackTunnel->writeBuf.append(reinterpret_cast(reinterpret_cast(&(reinterpret_cast(addr)->sin_addr.s_addr))),4); _tcpFallbackTunnel->writeBuf.append(reinterpret_cast(reinterpret_cast(&(reinterpret_cast(addr)->sin_port))),2); _tcpFallbackTunnel->writeBuf.append((const char *)data,len); - } else { + result = 0; + } else if (((now - _lastSendToGlobal) < ZT1_TCP_FALLBACK_AFTER)&&((now - _lastSendToGlobal) > (ZT_PING_CHECK_INVERVAL / 2))) { std::vector tunnelIps(_tcpFallbackResolver.get()); if (tunnelIps.empty()) { if (!_tcpFallbackResolver.running()) @@ -985,11 +1018,18 @@ public: _lastSendToGlobal = now; } +#endif // ZT1_TCP_FALLBACK_RELAY break; case AF_INET6: +#ifdef ZT_BREAK_UDP + if (!OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) { +#endif if (_v6UdpSocket) - result = (_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) ? 0 : -1); + result = ((_phy.udpSend(_v6UdpSocket,(const struct sockaddr *)addr,data,len) != 0) ? 0 : -1); +#ifdef ZT_BREAK_UDP + } +#endif break; default: return -1; @@ -1039,16 +1079,19 @@ public: } Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr); - tc->writeBuf.assign(tmpn); - tc->writeBuf.append("Content-Type: "); - tc->writeBuf.append(contentType); - Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); - tc->writeBuf.append(tmpn); - if (!tc->shouldKeepAlive) - tc->writeBuf.append("Connection: close\r\n"); - tc->writeBuf.append("\r\n"); - if (tc->parser.method != HTTP_HEAD) - tc->writeBuf.append(data); + { + Mutex::Lock _l(tc->writeBuf_m); + tc->writeBuf.assign(tmpn); + tc->writeBuf.append("Content-Type: "); + tc->writeBuf.append(contentType); + Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); + tc->writeBuf.append(tmpn); + if (!tc->shouldKeepAlive) + tc->writeBuf.append("Connection: close\r\n"); + tc->writeBuf.append("\r\n"); + if (tc->parser.method != HTTP_HEAD) + tc->writeBuf.append(data); + } _phy.tcpSetNotifyWritable(tc->sock,true); } @@ -1092,6 +1135,7 @@ private: ControlPlane *_controlPlane; uint64_t _lastDirectReceiveFromGlobal; uint64_t _lastSendToGlobal; + uint64_t _lastRestart; volatile uint64_t _nextBackgroundTaskDeadline; std::map< uint64_t,EthernetTap * > _taps; diff --git a/service/OneService.hpp b/service/OneService.hpp index aea314f59..7964958c7 100644 --- a/service/OneService.hpp +++ b/service/OneService.hpp @@ -126,6 +126,11 @@ public: */ virtual std::string portDeviceName(uint64_t nwid) const = 0; + /** + * @return True if TCP fallback is currently active + */ + virtual bool tcpFallbackActive() const = 0; + /** * Terminate background service (can be called from other threads) */ diff --git a/service/README.md b/service/README.md index 1900a6127..acad97a14 100644 --- a/service/README.md +++ b/service/README.md @@ -26,6 +26,7 @@ A *jsonp* URL argument may be supplied to request JSONP encapsulation. A JSONP r addressstring10-digit hexadecimal ZeroTier address of this nodeno publicIdentitystringFull public ZeroTier identity of this nodeno onlinebooleanDoes this node appear to have upstream network access?no +tcpFallbackActivebooleanIs TCP fallback mode active?no versionMajorintegerZeroTier major versionno versionMinorintegerZeroTier minor versionno versionRevintegerZeroTier revisionno diff --git a/tcp-proxy/tcp-proxy.cpp b/tcp-proxy/tcp-proxy.cpp index cbc3d0cca..f7ba2c2f6 100644 --- a/tcp-proxy/tcp-proxy.cpp +++ b/tcp-proxy/tcp-proxy.cpp @@ -25,6 +25,17 @@ * LLC. Start here: http://www.zerotier.com/ */ +// HACK! Will eventually use epoll() or something in Phy<> instead of select(). +// Also be sure to change ulimit -n and fs.file-max in /etc/sysctl.conf on relays. +#if defined(__linux__) || defined(__LINUX__) || defined(__LINUX) || defined(LINUX) +#include +#include +#undef __FD_SETSIZE +#define __FD_SETSIZE 1048576 +#undef FD_SETSIZE +#define FD_SETSIZE 1048576 +#endif + #include #include #include @@ -41,10 +52,7 @@ #include "../osdep/Phy.hpp" -#define ZT_TCP_PROXY_UDP_POOL_SIZE 1024 -#define ZT_TCP_PROXY_UDP_POOL_START_PORT 10000 #define ZT_TCP_PROXY_CONNECTION_TIMEOUT_SECONDS 300 - #define ZT_TCP_PROXY_TCP_PORT 443 using namespace ZeroTier; @@ -90,8 +98,7 @@ struct TcpProxyService; struct TcpProxyService { Phy *phy; - PhySocket *udpPool[ZT_TCP_PROXY_UDP_POOL_SIZE]; - + int udpPortCounter; struct Client { char tcpReadBuf[131072]; @@ -99,99 +106,101 @@ struct TcpProxyService unsigned long tcpWritePtr; unsigned long tcpReadPtr; PhySocket *tcp; - PhySocket *assignedUdp; + PhySocket *udp; time_t lastActivity; bool newVersion; }; - std::map< PhySocket *,Client > clients; - struct ReverseMappingKey + PhySocket *getUnusedUdp(void *uptr) { - uint64_t sourceZTAddress; - PhySocket *sendingUdpSocket; - uint32_t destIp; - unsigned int destPort; - - ReverseMappingKey() {} - ReverseMappingKey(uint64_t zt,PhySocket *s,uint32_t ip,unsigned int port) : sourceZTAddress(zt),sendingUdpSocket(s),destIp(ip),destPort(port) {} - inline bool operator<(const ReverseMappingKey &k) const throw() { return (memcmp((const void *)this,(const void *)&k,sizeof(ReverseMappingKey)) < 0); } - inline bool operator==(const ReverseMappingKey &k) const throw() { return (memcmp((const void *)this,(const void *)&k,sizeof(ReverseMappingKey)) == 0); } - }; - - std::map< ReverseMappingKey,Client * > reverseMappings; + for(int i=0;i<65535;++i) { + ++udpPortCounter; + if (udpPortCounter > 0xfffe) + udpPortCounter = 1024; + struct sockaddr_in laddr; + memset(&laddr,0,sizeof(struct sockaddr_in)); + laddr.sin_family = AF_INET; + laddr.sin_port = htons((uint16_t)udpPortCounter); + PhySocket *udp = phy->udpBind(reinterpret_cast(&laddr),uptr); + if (udp) + return udp; + } + return (PhySocket *)0; + } void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { - if ((from->sa_family == AF_INET)&&(len > 16)&&(len < 2048)) { - const uint64_t destZt = ( - (((uint64_t)(((const unsigned char *)data)[8])) << 32) | - (((uint64_t)(((const unsigned char *)data)[9])) << 24) | - (((uint64_t)(((const unsigned char *)data)[10])) << 16) | - (((uint64_t)(((const unsigned char *)data)[11])) << 8) | - ((uint64_t)(((const unsigned char *)data)[12])) ); - const uint32_t fromIp = ((const struct sockaddr_in *)from)->sin_addr.s_addr; - const unsigned int fromPort = ntohs(((const struct sockaddr_in *)from)->sin_port); + if (!*uptr) + return; + if ((from->sa_family == AF_INET)&&(len >= 16)&&(len < 2048)) { + Client &c = *((Client *)*uptr); + c.lastActivity = time((time_t *)0); - std::map< ReverseMappingKey,Client * >::iterator rm(reverseMappings.find(ReverseMappingKey(destZt,sock,fromIp,fromPort))); - if (rm != reverseMappings.end()) { - Client &c = *(rm->second); + unsigned long mlen = len; + if (c.newVersion) + mlen += 7; // new clients get IP info - unsigned long mlen = len; - if (c.newVersion) - mlen += 7; // new clients get IP info + if ((c.tcpWritePtr + 5 + mlen) <= sizeof(c.tcpWriteBuf)) { + if (!c.tcpWritePtr) + phy->tcpSetNotifyWritable(c.tcp,true); - if ((c.tcpWritePtr + 5 + mlen) <= sizeof(c.tcpWriteBuf)) { - if (!c.tcpWritePtr) - phy->tcpSetNotifyWritable(c.tcp,true); + c.tcpWriteBuf[c.tcpWritePtr++] = 0x17; // look like TLS data + c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 + c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 - c.tcpWriteBuf[c.tcpWritePtr++] = 0x17; // look like TLS data - c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 - c.tcpWriteBuf[c.tcpWritePtr++] = 0x03; // look like TLS 1.2 + c.tcpWriteBuf[c.tcpWritePtr++] = (char)((mlen >> 8) & 0xff); + c.tcpWriteBuf[c.tcpWritePtr++] = (char)(mlen & 0xff); - c.tcpWriteBuf[c.tcpWritePtr++] = (char)((mlen >> 8) & 0xff); - c.tcpWriteBuf[c.tcpWritePtr++] = (char)(mlen & 0xff); - - if (c.newVersion) { - c.tcpWriteBuf[c.tcpWritePtr++] = (char)4; // IPv4 - *((uint32_t *)(c.tcpWriteBuf + c.tcpWritePtr)) = fromIp; - c.tcpWritePtr += 4; - c.tcpWriteBuf[c.tcpWritePtr++] = (char)((fromPort >> 8) & 0xff); - c.tcpWriteBuf[c.tcpWritePtr++] = (char)(fromPort & 0xff); - } - - for(unsigned long i=0;isin_addr.s_addr; + c.tcpWritePtr += 4; + *((uint16_t *)(c.tcpWriteBuf + c.tcpWritePtr)) = ((const struct sockaddr_in *)from)->sin_port; + c.tcpWritePtr += 2; } + + for(unsigned long i=0;i %.16llx\n",inet_ntoa(reinterpret_cast(from)->sin_addr),(int)ntohs(reinterpret_cast(from)->sin_port),(unsigned long long)&c); } } void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { - // unused, we don't initiate + // unused, we don't initiate outbound connections } void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) { Client &c = clients[sockN]; + PhySocket *udp = getUnusedUdp((void *)&c); + if (!udp) { + phy->close(sockN); + clients.erase(sockN); + //printf("** TCP rejected, no more UDP ports to assign\n"); + return; + } c.tcpWritePtr = 0; c.tcpReadPtr = 0; c.tcp = sockN; - c.assignedUdp = udpPool[rand() % ZT_TCP_PROXY_UDP_POOL_SIZE]; + c.udp = udp; c.lastActivity = time((time_t *)0); c.newVersion = false; *uptrN = (void *)&c; + //printf("<< TCP from %s -> %.16llx\n",inet_ntoa(reinterpret_cast(from)->sin_addr),(unsigned long long)&c); } void phyOnTcpClose(PhySocket *sock,void **uptr) { - for(std::map< ReverseMappingKey,Client * >::iterator rm(reverseMappings.begin());rm!=reverseMappings.end();) { - if (rm->second == (Client *)*uptr) - reverseMappings.erase(rm++); - else ++rm; - } + if (!*uptr) + return; + Client &c = *((Client *)*uptr); + phy->close(c.udp); clients.erase(sock); + //printf("** TCP %.16llx closed\n",(unsigned long long)*uptr); } void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) @@ -212,6 +221,7 @@ struct TcpProxyService if (mlen == 4) { // Right now just sending this means the client is 'new enough' for the IP header c.newVersion = true; + //printf("<< TCP %.16llx HELLO\n",(unsigned long long)*uptr); } else if (mlen >= 7) { char *payload = c.tcpReadBuf + 5; unsigned long payloadLen = mlen; @@ -241,22 +251,8 @@ struct TcpProxyService // Note: we do not relay to privileged ports... just an abuse prevention rule. if ((ntohs(dest.sin_port) > 1024)&&(payloadLen >= 16)) { - if ((payloadLen >= 28)&&(payload[13] != (char)0xff)) { - // Learn reverse mappings -- we will route replies to these packets - // back to their sending TCP socket. They're on a first come first - // served basis. - const uint64_t sourceZt = ( - (((uint64_t)(((const unsigned char *)payload)[13])) << 32) | - (((uint64_t)(((const unsigned char *)payload)[14])) << 24) | - (((uint64_t)(((const unsigned char *)payload)[15])) << 16) | - (((uint64_t)(((const unsigned char *)payload)[16])) << 8) | - ((uint64_t)(((const unsigned char *)payload)[17])) ); - ReverseMappingKey k(sourceZt,c.assignedUdp,dest.sin_addr.s_addr,ntohl(dest.sin_port)); - if (reverseMappings.count(k) == 0) - reverseMappings[k] = &c; - } - - phy->udpSend(c.assignedUdp,(const struct sockaddr *)&dest,payload,payloadLen); + phy->udpSend(c.udp,(const struct sockaddr *)&dest,payload,payloadLen); + //printf(">> TCP %.16llx to %s:%d\n",(unsigned long long)*uptr,inet_ntoa(dest.sin_addr),(int)ntohs(dest.sin_port)); } } @@ -284,11 +280,13 @@ struct TcpProxyService std::vector toClose; time_t now = time((time_t *)0); for(std::map< PhySocket *,Client >::iterator c(clients.begin());c!=clients.end();++c) { - if ((now - c->second.lastActivity) >= ZT_TCP_PROXY_CONNECTION_TIMEOUT_SECONDS) + if ((now - c->second.lastActivity) >= ZT_TCP_PROXY_CONNECTION_TIMEOUT_SECONDS) { toClose.push_back(c->first); + toClose.push_back(c->second.udp); + } } for(std::vector::iterator s(toClose.begin());s!=toClose.end();++s) - phy->close(*s); // will call phyOnTcpClose() which does cleanup + phy->close(*s); } }; @@ -299,25 +297,9 @@ int main(int argc,char **argv) srand(time((time_t *)0)); TcpProxyService svc; - Phy phy(&svc,true); + Phy phy(&svc,false); svc.phy = &phy; - - { - int poolSize = 0; - for(unsigned int p=ZT_TCP_PROXY_UDP_POOL_START_PORT;((poolSize