diff --git a/node/Cluster.cpp b/node/Cluster.cpp index c943e62bf..4088c9678 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -202,7 +202,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } m.lastReceivedAliveAnnouncement = RR->node->now(); #ifdef ZT_TRACE - TRACE("[%u] I'm alive! peers may be redirected to: %s",(unsigned int)fromMemberId,addrs.c_str()); + TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); #endif } break; @@ -406,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size()); } + TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); return true; + } else { + TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); + return false; } - - return false; } void Cluster::replicateHavePeer(const Identity &peerId) @@ -564,11 +566,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy { if (!peerPhysicalAddress) // sanity check return false; + if (_addressToLocationFunction) { // Pick based on location if it can be determined int px = 0,py = 0,pz = 0; if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast(&peerPhysicalAddress),&px,&py,&pz) == 0) { - // No geo-info so no change + TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str()); return false; } @@ -578,6 +581,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy const double currentDistance = _dist3d(_x,_y,_z,px,py,pz); double bestDistance = (offload ? 2147483648.0 : currentDistance); unsigned int bestMember = _id; + TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance); { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -588,6 +592,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) { double mdist = _dist3d(m.x,m.y,m.z,px,py,pz); if (mdist < bestDistance) { + bestDistance = mdist; bestMember = *mid; best = m.zeroTierPhysicalEndpoints; } @@ -596,7 +601,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy } if (best.size() > 0) { - TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance); + TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance); /* if (peer->remoteVersionProtocol() >= 5) { // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 6c9a2917b..daa811858 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -145,7 +145,7 @@ public: STATE_MESSAGE_RELAY = 5, /** - * Request to send a packet to a locally-known peer: + * Request that a cluster member send a packet to a locally-known peer: * <[5] ZeroTier address of recipient> * <[1] packet verb> * <[2] length of packet payload> diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp index 7329322af..1b70f17cb 100644 --- a/node/SelfAwareness.cpp +++ b/node/SelfAwareness.cpp @@ -94,7 +94,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi Mutex::Lock _l(_phy_m); - PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,scope)]; + PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)]; if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) { entry.mySurface = myPhysicalAddress; @@ -105,14 +105,15 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi entry.ts = now; TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str()); - // Erase all entries (other than this one) for this scope to prevent thrashing - // Note: we should probably not use 'entry' after this + // Erase all entries in this scope that were not reported by this remote address to prevent 'thrashing' + // due to multiple reports of endpoint change. + // Don't use 'entry' after this since hash table gets modified. { Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy); PhySurfaceKey *k = (PhySurfaceKey *)0; PhySurfaceEntry *e = (PhySurfaceEntry *)0; while (i.next(k,e)) { - if ((k->reporter != reporter)&&(k->scope == scope)) + if ((k->reporterPhysicalAddress != reporterPhysicalAddress)&&(k->scope == scope)) _phy.erase(*k); } } diff --git a/node/SelfAwareness.hpp b/node/SelfAwareness.hpp index 3133553ee..400b05e6e 100644 --- a/node/SelfAwareness.hpp +++ b/node/SelfAwareness.hpp @@ -69,14 +69,14 @@ private: struct PhySurfaceKey { Address reporter; + InetAddress reporterPhysicalAddress; InetAddress::IpScope scope; - inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); } - PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {} - PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {} - inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); } - inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(scope == k.scope)); } + PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {} + + inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); } + inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); } }; struct PhySurfaceEntry { diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp index b47a9b2a8..9baa75064 100644 --- a/service/ClusterGeoIpService.cpp +++ b/service/ClusterGeoIpService.cpp @@ -72,11 +72,14 @@ ClusterGeoIpService::~ClusterGeoIpService() bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) { + InetAddress ipNoPort(ip); + ipNoPort.setPort(0); // we index cache by IP only const uint64_t now = OSUtils::now(); + bool r = false; { Mutex::Lock _l(_cache_m); - std::map< InetAddress,_CE >::iterator c(_cache.find(ip)); + std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort)); if (c != _cache.end()) { x = c->second.x; y = c->second.y; @@ -90,8 +93,9 @@ bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) { Mutex::Lock _l(_sOutputLock); if (_sOutputFd >= 0) { - std::string ips(ip.toIpString()); + std::string ips(ipNoPort.toIpString()); ips.push_back('\n'); + //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str()); ::write(_sOutputFd,ips.data(),ips.length()); } } @@ -153,6 +157,7 @@ void ClusterGeoIpService::threadMain() if ((buf[i] == '\n')||(buf[i] == '\r')) { linebuf[lineptr] = (char)0; if (lineptr > 0) { + //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf); try { std::vector result(Utils::split(linebuf,",","","")); if ((result.size() >= 7)&&(result[1] == "1")) { @@ -163,6 +168,7 @@ void ClusterGeoIpService::threadMain() ce.x = (int)::strtol(result[4].c_str(),(char **)0,10); ce.y = (int)::strtol(result[5].c_str(),(char **)0,10); ce.z = (int)::strtol(result[6].c_str(),(char **)0,10); + //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z); { Mutex::Lock _l2(_cache_m); _cache[rip] = ce; diff --git a/service/OneService.cpp b/service/OneService.cpp index a64d680b9..729812edb 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -1473,6 +1473,7 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } +#ifdef ZT_ENABLE_CLUSTER static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) { OneServiceImpl *const impl = reinterpret_cast(uptr); @@ -1485,6 +1486,7 @@ static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr, OneServiceImpl *const impl = reinterpret_cast(uptr); return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast(addr)),*x,*y,*z)); } +#endif static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }