(1) Fix bug in geo-ip service that prevented cache lookup, (2) fix problem in SelfAwareness (will need to test ALL versions in the wild with this), and (3) add more TRACE instrumentation to Cluster.

This commit is contained in:
Adam Ierymenko 2015-10-23 13:03:34 -07:00
parent f0160635a2
commit 29b966894c
6 changed files with 31 additions and 17 deletions

View File

@ -202,7 +202,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
} }
m.lastReceivedAliveAnnouncement = RR->node->now(); m.lastReceivedAliveAnnouncement = RR->node->now();
#ifdef ZT_TRACE #ifdef ZT_TRACE
TRACE("[%u] I'm alive! peers may be redirected to: %s",(unsigned int)fromMemberId,addrs.c_str()); TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str());
#endif #endif
} break; } break;
@ -406,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
_send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size()); _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size());
} }
TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
return true; return true;
} else {
TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
return false;
} }
return false;
} }
void Cluster::replicateHavePeer(const Identity &peerId) void Cluster::replicateHavePeer(const Identity &peerId)
@ -564,11 +566,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
{ {
if (!peerPhysicalAddress) // sanity check if (!peerPhysicalAddress) // sanity check
return false; return false;
if (_addressToLocationFunction) { if (_addressToLocationFunction) {
// Pick based on location if it can be determined // Pick based on location if it can be determined
int px = 0,py = 0,pz = 0; int px = 0,py = 0,pz = 0;
if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) { if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) {
// No geo-info so no change TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str());
return false; return false;
} }
@ -578,6 +581,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
const double currentDistance = _dist3d(_x,_y,_z,px,py,pz); const double currentDistance = _dist3d(_x,_y,_z,px,py,pz);
double bestDistance = (offload ? 2147483648.0 : currentDistance); double bestDistance = (offload ? 2147483648.0 : currentDistance);
unsigned int bestMember = _id; unsigned int bestMember = _id;
TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance);
{ {
Mutex::Lock _l(_memberIds_m); Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@ -588,6 +592,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) { if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) {
double mdist = _dist3d(m.x,m.y,m.z,px,py,pz); double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
if (mdist < bestDistance) { if (mdist < bestDistance) {
bestDistance = mdist;
bestMember = *mid; bestMember = *mid;
best = m.zeroTierPhysicalEndpoints; best = m.zeroTierPhysicalEndpoints;
} }
@ -596,7 +601,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
} }
if (best.size() > 0) { if (best.size() > 0) {
TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance); TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance);
/* if (peer->remoteVersionProtocol() >= 5) { /* if (peer->remoteVersionProtocol() >= 5) {
// If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic

View File

@ -145,7 +145,7 @@ public:
STATE_MESSAGE_RELAY = 5, STATE_MESSAGE_RELAY = 5,
/** /**
* Request to send a packet to a locally-known peer: * Request that a cluster member send a packet to a locally-known peer:
* <[5] ZeroTier address of recipient> * <[5] ZeroTier address of recipient>
* <[1] packet verb> * <[1] packet verb>
* <[2] length of packet payload> * <[2] length of packet payload>

View File

@ -94,7 +94,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
Mutex::Lock _l(_phy_m); Mutex::Lock _l(_phy_m);
PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,scope)]; PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)];
if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) { if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) {
entry.mySurface = myPhysicalAddress; entry.mySurface = myPhysicalAddress;
@ -105,14 +105,15 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
entry.ts = now; entry.ts = now;
TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str()); TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str());
// Erase all entries (other than this one) for this scope to prevent thrashing // Erase all entries in this scope that were not reported by this remote address to prevent 'thrashing'
// Note: we should probably not use 'entry' after this // due to multiple reports of endpoint change.
// Don't use 'entry' after this since hash table gets modified.
{ {
Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy); Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy);
PhySurfaceKey *k = (PhySurfaceKey *)0; PhySurfaceKey *k = (PhySurfaceKey *)0;
PhySurfaceEntry *e = (PhySurfaceEntry *)0; PhySurfaceEntry *e = (PhySurfaceEntry *)0;
while (i.next(k,e)) { while (i.next(k,e)) {
if ((k->reporter != reporter)&&(k->scope == scope)) if ((k->reporterPhysicalAddress != reporterPhysicalAddress)&&(k->scope == scope))
_phy.erase(*k); _phy.erase(*k);
} }
} }

View File

@ -69,14 +69,14 @@ private:
struct PhySurfaceKey struct PhySurfaceKey
{ {
Address reporter; Address reporter;
InetAddress reporterPhysicalAddress;
InetAddress::IpScope scope; InetAddress::IpScope scope;
inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {} PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {}
PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {} PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {}
inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); }
inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(scope == k.scope)); } inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); }
}; };
struct PhySurfaceEntry struct PhySurfaceEntry
{ {

View File

@ -72,11 +72,14 @@ ClusterGeoIpService::~ClusterGeoIpService()
bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
{ {
InetAddress ipNoPort(ip);
ipNoPort.setPort(0); // we index cache by IP only
const uint64_t now = OSUtils::now(); const uint64_t now = OSUtils::now();
bool r = false; bool r = false;
{ {
Mutex::Lock _l(_cache_m); Mutex::Lock _l(_cache_m);
std::map< InetAddress,_CE >::iterator c(_cache.find(ip)); std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort));
if (c != _cache.end()) { if (c != _cache.end()) {
x = c->second.x; x = c->second.x;
y = c->second.y; y = c->second.y;
@ -90,8 +93,9 @@ bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
{ {
Mutex::Lock _l(_sOutputLock); Mutex::Lock _l(_sOutputLock);
if (_sOutputFd >= 0) { if (_sOutputFd >= 0) {
std::string ips(ip.toIpString()); std::string ips(ipNoPort.toIpString());
ips.push_back('\n'); ips.push_back('\n');
//fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str());
::write(_sOutputFd,ips.data(),ips.length()); ::write(_sOutputFd,ips.data(),ips.length());
} }
} }
@ -153,6 +157,7 @@ void ClusterGeoIpService::threadMain()
if ((buf[i] == '\n')||(buf[i] == '\r')) { if ((buf[i] == '\n')||(buf[i] == '\r')) {
linebuf[lineptr] = (char)0; linebuf[lineptr] = (char)0;
if (lineptr > 0) { if (lineptr > 0) {
//fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf);
try { try {
std::vector<std::string> result(Utils::split(linebuf,",","","")); std::vector<std::string> result(Utils::split(linebuf,",","",""));
if ((result.size() >= 7)&&(result[1] == "1")) { if ((result.size() >= 7)&&(result[1] == "1")) {
@ -163,6 +168,7 @@ void ClusterGeoIpService::threadMain()
ce.x = (int)::strtol(result[4].c_str(),(char **)0,10); ce.x = (int)::strtol(result[4].c_str(),(char **)0,10);
ce.y = (int)::strtol(result[5].c_str(),(char **)0,10); ce.y = (int)::strtol(result[5].c_str(),(char **)0,10);
ce.z = (int)::strtol(result[6].c_str(),(char **)0,10); ce.z = (int)::strtol(result[6].c_str(),(char **)0,10);
//fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z);
{ {
Mutex::Lock _l2(_cache_m); Mutex::Lock _l2(_cache_m);
_cache[rip] = ce; _cache[rip] = ce;

View File

@ -1473,6 +1473,7 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } { reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); }
#ifdef ZT_ENABLE_CLUSTER
static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len)
{ {
OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
@ -1485,6 +1486,7 @@ static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,
OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr); OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z)); return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
} }
#endif
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); } { reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }