diff --git a/cluster-geo/cluster-geo.exe b/cluster-geo/cluster-geo.exe index ae7206109..56b76e0d4 100755 --- a/cluster-geo/cluster-geo.exe +++ b/cluster-geo/cluster-geo.exe @@ -3,11 +3,11 @@ export PATH=/bin:/usr/bin:/usr/local/bin:/sbin:/usr/sbin cd `dirname $0` -if [ ! -d cluster-geo -o ! -f cluster-geo/index.js ]; then +if [ ! -d cluster-geo -o ! -f cluster-geo/cluster-geo.js ]; then echo 'Cannot find ./cluster-geo containing NodeJS script files.' exit 1 fi cd cluster-geo -exec node index.js +exec node --harmony cluster-geo.js diff --git a/cluster-geo/cluster-geo/index.js b/cluster-geo/cluster-geo/cluster-geo.js similarity index 74% rename from cluster-geo/cluster-geo/index.js rename to cluster-geo/cluster-geo/cluster-geo.js index 0e903ade7..44af84920 100644 --- a/cluster-geo/cluster-geo/index.js +++ b/cluster-geo/cluster-geo/cluster-geo.js @@ -1,3 +1,5 @@ +"use strict"; + // // GeoIP lookup service // @@ -20,10 +22,10 @@ function lookup(ip,callback) cache.get(ip,function(err,cachedEntryJson) { if ((!err)&&(cachedEntryJson)) { try { - var cachedEntry = JSON.parse(cachedEntryJson.toString()); + let cachedEntry = JSON.parse(cachedEntryJson.toString()); if (cachedEntry) { - var ts = cachedEntry.ts; - var r = cachedEntry.r; + let ts = cachedEntry.ts; + let r = cachedEntry.r; if ((ts)&&(r)) { if ((Date.now() - ts) < CACHE_TTL) { r._cached = true; @@ -57,24 +59,24 @@ process.stdin.on('readable',function() { var chunk; while (null !== (chunk = process.stdin.read())) { for(var i=0;i 0) { - var ip = linebuf; + let ip = linebuf; lookup(ip,function(err,result) { if ((err)||(!result)||(!result.location)) { return process.stdout.write(ip+',0,0,0,0,0,0\n'); } else { - var lat = parseFloat(result.location.latitude); - var lon = parseFloat(result.location.longitude); + let lat = parseFloat(result.location.latitude); + let lon = parseFloat(result.location.longitude); // Convert to X,Y,Z coordinates from Earth's origin, Earth-as-sphere approximation. - var latRadians = lat * 0.01745329251994; // PI / 180 - var lonRadians = lon * 0.01745329251994; // PI / 180 - var cosLat = Math.cos(latRadians); - var x = Math.round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers - var y = Math.round(6371.0 * Math.sin(latRadians)); - var z = Math.round(6371.0 * cosLat * Math.sin(lonRadians)); + let latRadians = lat * 0.01745329251994; // PI / 180 + let lonRadians = lon * 0.01745329251994; // PI / 180 + let cosLat = Math.cos(latRadians); + let x = Math.round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers + let y = Math.round(6371.0 * Math.sin(latRadians)); + let z = Math.round(6371.0 * cosLat * Math.sin(lonRadians)); return process.stdout.write(ip+',1,'+lat+','+lon+','+x+','+y+','+z+'\n'); } diff --git a/cluster-geo/cluster-geo/package.json b/cluster-geo/cluster-geo/package.json index 1927197ef..4cd1ce007 100644 --- a/cluster-geo/cluster-geo/package.json +++ b/cluster-geo/cluster-geo/package.json @@ -2,7 +2,7 @@ "name": "cluster-geo", "version": "1.0.0", "description": "Cluster GEO-IP Query Service", - "main": "index.js", + "main": "cluster-geo.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, diff --git a/include/ZeroTierOne.h b/include/ZeroTierOne.h index 135c8e114..7af4f7601 100644 --- a/include/ZeroTierOne.h +++ b/include/ZeroTierOne.h @@ -131,12 +131,17 @@ extern "C" { /** * Maximum number of cluster members (and max member ID plus one) */ -#define ZT_CLUSTER_MAX_MEMBERS 256 +#define ZT_CLUSTER_MAX_MEMBERS 128 + +/** + * Maximum number of physical ZeroTier addresses a cluster member can report + */ +#define ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES 16 /** * Maximum allowed cluster message length in bytes */ -#define ZT_CLUSTER_MAX_MESSAGE_LENGTH 65535 +#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 6) /** * A null/empty sockaddr (all zero) to signify an unspecified socket address @@ -879,6 +884,78 @@ typedef struct { unsigned int nextHopCount; } ZT_CircuitTestReport; +/** + * A cluster member's status + */ +typedef struct { + /** + * This cluster member's ID (from 0 to 1-ZT_CLUSTER_MAX_MEMBERS) + */ + unsigned int id; + + /** + * Number of milliseconds since last 'alive' heartbeat message received via cluster backplane address + */ + unsigned int msSinceLastHeartbeat; + + /** + * Non-zero if cluster member is alive + */ + int alive; + + /** + * X, Y, and Z coordinates of this member (if specified, otherwise zero) + * + * What these mean depends on the location scheme being used for + * location-aware clustering. At present this is GeoIP and these + * will be the X, Y, and Z coordinates of the location on a spherical + * approximation of Earth where Earth's core is the origin (in km). + * They don't have to be perfect and need only be comparable with others + * to find shortest path via the standard vector distance formula. + */ + int x,y,z; + + /** + * Cluster member's last reported load + */ + uint64_t load; + + /** + * Number of peers this cluster member "has" + */ + uint64_t peers; + + /** + * Physical ZeroTier endpoints for this member (where peers are sent when directed here) + */ + struct sockaddr_storage zeroTierPhysicalEndpoints[ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES]; + + /** + * Number of physical ZeroTier endpoints this member is announcing + */ + unsigned int numZeroTierPhysicalEndpoints; +} ZT_ClusterMemberStatus; + +/** + * ZeroTier cluster status + */ +typedef struct { + /** + * My cluster member ID (a record for 'self' is included in member[]) + */ + unsigned int myId; + + /** + * Number of cluster members + */ + unsigned int clusterSize; + + /** + * Cluster member statuses + */ + ZT_ClusterMemberStatus members[ZT_CLUSTER_MAX_MEMBERS]; +} ZT_ClusterStatus; + /** * An instance of a ZeroTier One node (opaque) */ @@ -1439,6 +1516,17 @@ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId); */ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len); +/** + * Get the current status of the cluster from this node's point of view + * + * Calling this without clusterInit() or without cluster support will just + * zero out the structure and show a cluster size of zero. + * + * @param node Node instance + * @param cs Cluster status structure to fill with data + */ +void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs); + /** * Get ZeroTier One version * diff --git a/node/Cluster.cpp b/node/Cluster.cpp index d9514db53..9d25593aa 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -143,212 +143,223 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) return; const uint16_t fromMemberId = dmsg.at(0); unsigned int ptr = 2; - if (fromMemberId == _id) + if (fromMemberId == _id) // sanity check: we don't talk to ourselves return; const uint16_t toMemberId = dmsg.at(ptr); ptr += 2; - if (toMemberId != _id) + if (toMemberId != _id) // sanity check: message not for us? return; - _Member &m = _members[fromMemberId]; - Mutex::Lock mlck(m.lock); + { // make sure sender is actually considered a member + Mutex::Lock _l3(_memberIds_m); + if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end()) + return; + } - try { - while (ptr < dmsg.size()) { - const unsigned int mlen = dmsg.at(ptr); ptr += 2; - const unsigned int nextPtr = ptr + mlen; + { + _Member &m = _members[fromMemberId]; + Mutex::Lock mlck(m.lock); - int mtype = -1; - try { - switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { - default: - break; + try { + while (ptr < dmsg.size()) { + const unsigned int mlen = dmsg.at(ptr); ptr += 2; + const unsigned int nextPtr = ptr + mlen; + if (nextPtr > dmsg.size()) + break; - case STATE_MESSAGE_ALIVE: { - ptr += 7; // skip version stuff, not used yet - m.x = dmsg.at(ptr); ptr += 4; - m.y = dmsg.at(ptr); ptr += 4; - m.z = dmsg.at(ptr); ptr += 4; - ptr += 8; // skip local clock, not used - m.load = dmsg.at(ptr); ptr += 8; - ptr += 8; // skip flags, unused + int mtype = -1; + try { + switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { + default: + break; + + case STATE_MESSAGE_ALIVE: { + ptr += 7; // skip version stuff, not used yet + m.x = dmsg.at(ptr); ptr += 4; + m.y = dmsg.at(ptr); ptr += 4; + m.z = dmsg.at(ptr); ptr += 4; + ptr += 8; // skip local clock, not used + m.load = dmsg.at(ptr); ptr += 8; + ptr += 8; // skip flags, unused #ifdef ZT_TRACE - std::string addrs; + std::string addrs; +#endif + unsigned int physicalAddressCount = dmsg[ptr++]; + m.zeroTierPhysicalEndpoints.clear(); + for(unsigned int i=0;i 0) + addrs.push_back(','); + addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); + } #endif - unsigned int physicalAddressCount = dmsg[ptr++]; - for(unsigned int i=0;inode->now(); #ifdef ZT_TRACE - else { - if (addrs.length() > 0) - addrs.push_back(','); - addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); - } + TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); #endif - } - m.lastReceivedAliveAnnouncement = RR->node->now(); -#ifdef ZT_TRACE - TRACE("[%u] I'm alive! send me peers at %s",(unsigned int)fromMemberId,addrs.c_str()); -#endif - } break; + } break; - case STATE_MESSAGE_HAVE_PEER: { - try { - Identity id; - ptr += id.deserialize(dmsg,ptr); - if (id) { - RR->topology->saveIdentity(id); + case STATE_MESSAGE_HAVE_PEER: { + try { + Identity id; + ptr += id.deserialize(dmsg,ptr); + if (id) { + RR->topology->saveIdentity(id); - { // Add or update peer affinity entry - _PeerAffinity pa(id.address(),fromMemberId,RR->node->now()); - Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) - if ((i != _peerAffinities.end())&&(i->key == pa.key)) { - i->timestamp = pa.timestamp; - } else { - _peerAffinities.push_back(pa); - std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now - } - } - - TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); - } - } catch ( ... ) { - // ignore invalid identities - } - } break; - - case STATE_MESSAGE_MULTICAST_LIKE: { - const uint64_t nwid = dmsg.at(ptr); ptr += 8; - const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const MAC mac(dmsg.field(ptr,6),6); ptr += 6; - const uint32_t adi = dmsg.at(ptr); ptr += 4; - RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); - TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); - } break; - - case STATE_MESSAGE_COM: { - CertificateOfMembership com; - ptr += com.deserialize(dmsg,ptr); - if (com) { - TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); - } - } break; - - case STATE_MESSAGE_RELAY: { - const unsigned int numRemotePeerPaths = dmsg[ptr++]; - InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max - for(unsigned int i=0;i(ptr); ptr += 2; - const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; - - if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address - const Address destinationAddress(reinterpret_cast(packet) + 8,ZT_ADDRESS_LENGTH); - TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); - - SharedPtr destinationPeer(RR->topology->getPeer(destinationAddress)); - if (destinationPeer) { - if ( - (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& - (numRemotePeerPaths > 0)&& - (packetLen >= 18)&& - (reinterpret_cast(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) - ) { - // If remote peer paths were sent with this relayed packet, we do - // RENDEZVOUS. It's handled here for cluster-relayed packets since - // we don't have both Peer records so this is a different path. - - const Address remotePeerAddress(reinterpret_cast(packet) + 13,ZT_ADDRESS_LENGTH); - - InetAddress bestDestV4,bestDestV6; - destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); - InetAddress bestRemoteV4,bestRemoteV6; - for(unsigned int i=0;inode->now()); + Mutex::Lock _l2(_peerAffinities_m); + std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) + if ((i != _peerAffinities.end())&&(i->key == pa.key)) { + i->timestamp = pa.timestamp; + } else { + _peerAffinities.push_back(pa); + std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now } - } + } - Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); - rendezvousForDest.append((uint8_t)0); - remotePeerAddress.appendTo(rendezvousForDest); + TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); + } + } catch ( ... ) { + // ignore invalid identities + } + } break; - Buffer<2048> rendezvousForOtherEnd; - remotePeerAddress.appendTo(rendezvousForOtherEnd); - rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); - const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); - rendezvousForOtherEnd.addSize(2); // space for actual packet payload length - rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 - destinationAddress.appendTo(rendezvousForOtherEnd); + case STATE_MESSAGE_MULTICAST_LIKE: { + const uint64_t nwid = dmsg.at(ptr); ptr += 8; + const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const MAC mac(dmsg.field(ptr,6),6); ptr += 6; + const uint32_t adi = dmsg.at(ptr); ptr += 4; + RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); + TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); + } break; - bool haveMatch = false; - if ((bestDestV6)&&(bestRemoteV6)) { - haveMatch = true; + case STATE_MESSAGE_COM: { + CertificateOfMembership com; + ptr += com.deserialize(dmsg,ptr); + if (com) { + TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); + } + } break; - rendezvousForDest.append((uint16_t)bestRemoteV6.port()); - rendezvousForDest.append((uint8_t)16); - rendezvousForDest.append(bestRemoteV6.rawIpData(),16); + case STATE_MESSAGE_RELAY: { + const unsigned int numRemotePeerPaths = dmsg[ptr++]; + InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max + for(unsigned int i=0;i(ptr); ptr += 2; + const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; - rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); - rendezvousForOtherEnd.append((uint8_t)16); - rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); - rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); - } else if ((bestDestV4)&&(bestRemoteV4)) { - haveMatch = true; + if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address + const Address destinationAddress(reinterpret_cast(packet) + 8,ZT_ADDRESS_LENGTH); + TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); - rendezvousForDest.append((uint16_t)bestRemoteV4.port()); - rendezvousForDest.append((uint8_t)4); - rendezvousForDest.append(bestRemoteV4.rawIpData(),4); + SharedPtr destinationPeer(RR->topology->getPeer(destinationAddress)); + if (destinationPeer) { + if ( + (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& + (numRemotePeerPaths > 0)&& + (packetLen >= 18)&& + (reinterpret_cast(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) + ) { + // If remote peer paths were sent with this relayed packet, we do + // RENDEZVOUS. It's handled here for cluster-relayed packets since + // we don't have both Peer records so this is a different path. - rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); - rendezvousForOtherEnd.append((uint8_t)4); - rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); - rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); - } + const Address remotePeerAddress(reinterpret_cast(packet) + 13,ZT_ADDRESS_LENGTH); - if (haveMatch) { - _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); - RR->sw->send(rendezvousForDest,true,0); + InetAddress bestDestV4,bestDestV6; + destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); + InetAddress bestRemoteV4,bestRemoteV6; + for(unsigned int i=0;iidentity.address(),Packet::VERB_RENDEZVOUS); + rendezvousForDest.append((uint8_t)0); + remotePeerAddress.appendTo(rendezvousForDest); + + Buffer<2048> rendezvousForOtherEnd; + remotePeerAddress.appendTo(rendezvousForOtherEnd); + rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); + const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); + rendezvousForOtherEnd.addSize(2); // space for actual packet payload length + rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 + destinationAddress.appendTo(rendezvousForOtherEnd); + + bool haveMatch = false; + if ((bestDestV6)&&(bestRemoteV6)) { + haveMatch = true; + + rendezvousForDest.append((uint16_t)bestRemoteV6.port()); + rendezvousForDest.append((uint8_t)16); + rendezvousForDest.append(bestRemoteV6.rawIpData(),16); + + rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); + rendezvousForOtherEnd.append((uint8_t)16); + rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); + rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); + } else if ((bestDestV4)&&(bestRemoteV4)) { + haveMatch = true; + + rendezvousForDest.append((uint16_t)bestRemoteV4.port()); + rendezvousForDest.append((uint8_t)4); + rendezvousForDest.append(bestRemoteV4.rawIpData(),4); + + rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); + rendezvousForOtherEnd.append((uint8_t)4); + rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); + rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); + } + + if (haveMatch) { + _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); + RR->sw->send(rendezvousForDest,true,0); + } } } } - } - } break; + } break; - case STATE_MESSAGE_PROXY_SEND: { - const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); - const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; - const unsigned int len = dmsg.at(ptr); ptr += 2; - Packet outp(rcpt,RR->identity.address(),verb); - outp.append(dmsg.field(ptr,len),len); - RR->sw->send(outp,true,0); - TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); - } break; + case STATE_MESSAGE_PROXY_SEND: { + const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); + const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; + const unsigned int len = dmsg.at(ptr); ptr += 2; + Packet outp(rcpt,RR->identity.address(),verb); + outp.append(dmsg.field(ptr,len),len); + RR->sw->send(outp,true,0); + TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); + } break; + } + } catch ( ... ) { + TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); - // drop invalids - } - ptr = nextPtr; + ptr = nextPtr; + } + } catch ( ... ) { + TRACE("invalid message (outer loop), discarding"); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message (outer loop), discarding"); - // drop invalids } } @@ -395,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size()); } + TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); return true; + } else { + TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); + return false; } - - return false; } void Cluster::replicateHavePeer(const Identity &peerId) @@ -436,11 +449,12 @@ void Cluster::replicateHavePeer(const Identity &peerId) void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) { - Buffer<4096> buf; + Buffer<2048> buf; buf.append((uint64_t)nwid); peerAddress.appendTo(buf); group.mac().appendTo(buf); buf.append((uint32_t)group.adi()); + TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi()); { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -452,8 +466,9 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) { - Buffer<4096> buf; + Buffer<2048> buf; com.serialize(buf); + TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId()); { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -504,7 +519,7 @@ void Cluster::doPeriodicTasks() void Cluster::addMember(uint16_t memberId) { - if (memberId >= ZT_CLUSTER_MAX_MEMBERS) + if ((memberId >= ZT_CLUSTER_MAX_MEMBERS)||(memberId == _id)) return; Mutex::Lock _l2(_members[memberId].lock); @@ -553,11 +568,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy { if (!peerPhysicalAddress) // sanity check return false; + if (_addressToLocationFunction) { // Pick based on location if it can be determined int px = 0,py = 0,pz = 0; if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast(&peerPhysicalAddress),&px,&py,&pz) == 0) { - // No geo-info so no change + TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str()); return false; } @@ -567,6 +583,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy const double currentDistance = _dist3d(_x,_y,_z,px,py,pz); double bestDistance = (offload ? 2147483648.0 : currentDistance); unsigned int bestMember = _id; + TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance); { Mutex::Lock _l(_memberIds_m); for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -577,6 +594,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) { double mdist = _dist3d(m.x,m.y,m.z,px,py,pz); if (mdist < bestDistance) { + bestDistance = mdist; bestMember = *mid; best = m.zeroTierPhysicalEndpoints; } @@ -585,7 +603,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy } if (best.size() > 0) { - TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance); + TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance); /* if (peer->remoteVersionProtocol() >= 5) { // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic @@ -620,8 +638,66 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy } } +void Cluster::status(ZT_ClusterStatus &status) const +{ + const uint64_t now = RR->node->now(); + memset(&status,0,sizeof(ZT_ClusterStatus)); + ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS]; + memset(ms,0,sizeof(ms)); + + status.myId = _id; + + ms[_id] = &(status.members[status.clusterSize++]); + ms[_id]->id = _id; + ms[_id]->alive = 1; + ms[_id]->x = _x; + ms[_id]->y = _y; + ms[_id]->z = _z; + ms[_id]->peers = RR->topology->countAlive(); + for(std::vector::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { + if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check + break; + memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + } + + { + Mutex::Lock _l1(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check + break; + ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]); + _Member &m = _members[*mid]; + Mutex::Lock ml(m.lock); + + s->id = *mid; + s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement)); + s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0; + s->x = m.x; + s->y = m.y; + s->z = m.z; + s->load = m.load; + for(std::vector::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) { + if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check + break; + memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + } + } + } + + { + Mutex::Lock _l2(_peerAffinities_m); + for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) { + unsigned int mid = pi->clusterMemberId(); + if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT)) + ++ms[mid]->peers; + } + } +} + void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len) { + if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check + return; _Member &m = _members[memberId]; // assumes m.lock is locked! if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH) diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 2e60fd6ba..be3466593 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -47,7 +47,7 @@ /** * Timeout for cluster members being considered "alive" */ -#define ZT_CLUSTER_TIMEOUT 30000 +#define ZT_CLUSTER_TIMEOUT 10000 /** * How often should we announce that we have a peer? @@ -57,7 +57,7 @@ /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100 namespace ZeroTier { @@ -145,7 +145,7 @@ public: STATE_MESSAGE_RELAY = 5, /** - * Request to send a packet to a locally-known peer: + * Request that a cluster member send a packet to a locally-known peer: * <[5] ZeroTier address of recipient> * <[1] packet verb> * <[2] length of packet payload> @@ -254,6 +254,13 @@ public: */ bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload); + /** + * Fill out ZT_ClusterStatus structure (from core API) + * + * @param status Reference to structure to hold result (anything there is replaced) + */ + void status(ZT_ClusterStatus &status) const; + private: void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len); void _flush(uint16_t memberId); diff --git a/node/InetAddress.hpp b/node/InetAddress.hpp index 50db272ac..ecafcf51f 100644 --- a/node/InetAddress.hpp +++ b/node/InetAddress.hpp @@ -409,6 +409,16 @@ struct InetAddress : public sockaddr_storage switch(b[p++]) { case 0: return 1; + case 0x01: + // TODO: Ethernet address (but accept for forward compatibility) + return 7; + case 0x02: + // TODO: Bluetooth address (but accept for forward compatibility) + return 7; + case 0x03: + // TODO: Other address types (but accept for forward compatibility) + // These could be extended/optional things like AF_UNIX, LTE Direct, shared memory, etc. + return (unsigned int)(b.template at(p) + 3); // other addresses begin with 16-bit non-inclusive length case 0x04: ss_family = AF_INET; memcpy(&(reinterpret_cast(this)->sin_addr.s_addr),b.field(p,4),4); p += 4; diff --git a/node/Network.cpp b/node/Network.cpp index 46f932419..cd30e386d 100644 --- a/node/Network.cpp +++ b/node/Network.cpp @@ -144,7 +144,15 @@ void Network::multicastUnsubscribe(const MulticastGroup &mg) bool Network::tryAnnounceMulticastGroupsTo(const SharedPtr &peer) { Mutex::Lock _l(_lock); - return _tryAnnounceMulticastGroupsTo(RR->topology->rootAddresses(),_allMulticastGroups(),peer,RR->node->now()); + if ( + (_isAllowed(peer)) || + (peer->address() == this->controller()) || + (RR->topology->isRoot(peer->identity())) + ) { + _announceMulticastGroupsTo(peer->address(),_allMulticastGroups()); + return true; + } + return false; } bool Network::applyConfiguration(const SharedPtr &conf) @@ -400,77 +408,80 @@ bool Network::_isAllowed(const SharedPtr &peer) const return false; // default position on any failure } -bool Network::_tryAnnounceMulticastGroupsTo(const std::vector
&alwaysAddresses,const std::vector &allMulticastGroups,const SharedPtr &peer,uint64_t now) const -{ - // assumes _lock is locked - if ( - (_isAllowed(peer)) || - (peer->address() == this->controller()) || - (std::find(alwaysAddresses.begin(),alwaysAddresses.end(),peer->address()) != alwaysAddresses.end()) - ) { - - if ((_config)&&(_config->com())&&(!_config->isPublic())&&(peer->needsOurNetworkMembershipCertificate(_id,now,true))) { - Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE); - _config->com().serialize(outp); - outp.armor(peer->key(),true); - peer->send(RR,outp.data(),outp.size(),now); - } - - { - Packet outp(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE); - - for(std::vector::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) { - if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) { - outp.armor(peer->key(),true); - peer->send(RR,outp.data(),outp.size(),now); - outp.reset(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE); - } - - // network ID, MAC, ADI - outp.append((uint64_t)_id); - mg->mac().appendTo(outp); - outp.append((uint32_t)mg->adi()); - } - - if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) { - outp.armor(peer->key(),true); - peer->send(RR,outp.data(),outp.size(),now); - } - } - - return true; - } - return false; -} - -class _AnnounceMulticastGroupsToAll +class _GetPeersThatNeedMulticastAnnouncement { public: - _AnnounceMulticastGroupsToAll(const RuntimeEnvironment *renv,Network *nw) : + _GetPeersThatNeedMulticastAnnouncement(const RuntimeEnvironment *renv,Network *nw) : _now(renv->node->now()), + _controller(nw->controller()), _network(nw), - _rootAddresses(renv->topology->rootAddresses()), - _allMulticastGroups(nw->_allMulticastGroups()) + _rootAddresses(renv->topology->rootAddresses()) {} - - inline void operator()(Topology &t,const SharedPtr &p) { _network->_tryAnnounceMulticastGroupsTo(_rootAddresses,_allMulticastGroups,p,_now); } - + inline void operator()(Topology &t,const SharedPtr &p) + { + if ( + (_network->_isAllowed(p)) || + (p->address() == _controller) || + (std::find(_rootAddresses.begin(),_rootAddresses.end(),p->address()) != _rootAddresses.end()) + ) { + peers.push_back(p->address()); + } + } + std::vector
peers; private: uint64_t _now; + Address _controller; Network *_network; std::vector
_rootAddresses; - std::vector _allMulticastGroups; }; void Network::_announceMulticastGroups() { // Assumes _lock is locked - _AnnounceMulticastGroupsToAll afunc(RR,this); - RR->topology->eachPeer<_AnnounceMulticastGroupsToAll &>(afunc); + + _GetPeersThatNeedMulticastAnnouncement gpfunc(RR,this); + RR->topology->eachPeer<_GetPeersThatNeedMulticastAnnouncement &>(gpfunc); + + std::vector allMulticastGroups(_allMulticastGroups()); + for(std::vector
::const_iterator pa(gpfunc.peers.begin());pa!=gpfunc.peers.end();++pa) + _announceMulticastGroupsTo(*pa,allMulticastGroups); +} + +void Network::_announceMulticastGroupsTo(const Address &peerAddress,const std::vector &allMulticastGroups) const +{ + // Assumes _lock is locked + + // We push COMs ahead of MULTICAST_LIKE since they're used for access control -- a COM is a public + // credential so "over-sharing" isn't really an issue (and we only do so with roots). + if ((_config)&&(_config->com())&&(!_config->isPublic())) { + Packet outp(peerAddress,RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE); + _config->com().serialize(outp); + RR->sw->send(outp,true,0); + } + + { + Packet outp(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE); + + for(std::vector::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) { + if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) { + RR->sw->send(outp,true,0); + outp.reset(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE); + } + + // network ID, MAC, ADI + outp.append((uint64_t)_id); + mg->mac().appendTo(outp); + outp.append((uint32_t)mg->adi()); + } + + if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) + RR->sw->send(outp,true,0); + } } std::vector Network::_allMulticastGroups() const { // Assumes _lock is locked + std::vector mgs; mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1); mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end()); @@ -479,6 +490,7 @@ std::vector Network::_allMulticastGroups() const mgs.push_back(Network::BROADCAST); std::sort(mgs.begin(),mgs.end()); mgs.erase(std::unique(mgs.begin(),mgs.end()),mgs.end()); + return mgs; } diff --git a/node/Network.hpp b/node/Network.hpp index f7939323e..0effa8e20 100644 --- a/node/Network.hpp +++ b/node/Network.hpp @@ -56,7 +56,7 @@ namespace ZeroTier { class RuntimeEnvironment; class Peer; -class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp +class _GetPeersThatNeedMulticastAnnouncement; /** * A virtual LAN @@ -64,7 +64,7 @@ class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp class Network : NonCopyable { friend class SharedPtr; - friend class _AnnounceMulticastGroupsToAll; + friend class _GetPeersThatNeedMulticastAnnouncement; // internal function object public: /** @@ -344,6 +344,7 @@ private: bool _isAllowed(const SharedPtr &peer) const; bool _tryAnnounceMulticastGroupsTo(const std::vector
&rootAddresses,const std::vector &allMulticastGroups,const SharedPtr &peer,uint64_t now) const; void _announceMulticastGroups(); + void _announceMulticastGroupsTo(const Address &peerAddress,const std::vector &allMulticastGroups) const; std::vector _allMulticastGroups() const; const RuntimeEnvironment *RR; diff --git a/node/Node.cpp b/node/Node.cpp index 6eea3d3df..2b2989039 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -211,8 +211,15 @@ public: } } - // If this is a network preferred relay, also always ping and if a stable endpoint is specified use that if not alive if (!upstream) { + // If I am a root server, only ping other root servers -- roots don't ping "down" + // since that would just be a waste of bandwidth and could potentially cause route + // flapping in Cluster mode. + if (RR->topology->amRoot()) + return; + + // Check for network preferred relays, also considered 'upstream' and thus always + // pinged to keep links up. If they have stable addresses we will try them there. for(std::vector< std::pair >::const_iterator r(_relays.begin());r!=_relays.end();++r) { if (r->first == p->address()) { if (r->second.ss_family == AF_INET) @@ -229,18 +236,22 @@ public: // "Upstream" devices are roots and relays and get special treatment -- they stay alive // forever and we try to keep (if available) both IPv4 and IPv6 channels open to them. bool needToContactIndirect = true; - if (!p->doPingAndKeepalive(RR,_now,AF_INET)) { + if (p->doPingAndKeepalive(RR,_now,AF_INET)) { + needToContactIndirect = false; + } else { if (stableEndpoint4) { needToContactIndirect = false; p->attemptToContactAt(RR,InetAddress(),stableEndpoint4,_now); } - } else needToContactIndirect = false; - if (!p->doPingAndKeepalive(RR,_now,AF_INET6)) { + } + if (p->doPingAndKeepalive(RR,_now,AF_INET6)) { + needToContactIndirect = false; + } else { if (stableEndpoint6) { needToContactIndirect = false; p->attemptToContactAt(RR,InetAddress(),stableEndpoint6,_now); } - } else needToContactIndirect = false; + } if (needToContactIndirect) { // If this is an upstream and we have no stable endpoint for either IPv4 or IPv6, @@ -625,6 +636,18 @@ void Node::clusterHandleIncomingMessage(const void *msg,unsigned int len) #endif } +void Node::clusterStatus(ZT_ClusterStatus *cs) +{ + if (!cs) + return; +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) + RR->cluster->status(*cs); + else +#endif + memset(cs,0,sizeof(ZT_ClusterStatus)); +} + /****************************************************************************/ /* Node methods used only within node/ */ /****************************************************************************/ @@ -936,15 +959,6 @@ enum ZT_ResultCode ZT_Node_clusterInit( } } -/** - * Add a member to this cluster - * - * Calling this without having called clusterInit() will do nothing. - * - * @param node Node instance - * @param memberId Member ID (must be less than or equal to ZT_CLUSTER_MAX_MEMBERS) - * @return OK or error if clustering is disabled, ID invalid, etc. - */ enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId) { try { @@ -954,14 +968,6 @@ enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId) } } -/** - * Remove a member from this cluster - * - * Calling this without having called clusterInit() will do nothing. - * - * @param node Node instance - * @param memberId Member ID to remove (nothing happens if not present) - */ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId) { try { @@ -969,18 +975,6 @@ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId) } catch ( ... ) {} } -/** - * Handle an incoming cluster state message - * - * The message itself contains cluster member IDs, and invalid or badly - * addressed messages will be silently discarded. - * - * Calling this without having called clusterInit() will do nothing. - * - * @param node Node instance - * @param msg Cluster message - * @param len Length of cluster message - */ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len) { try { @@ -988,6 +982,13 @@ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned } catch ( ... ) {} } +void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs) +{ + try { + reinterpret_cast(node)->clusterStatus(cs); + } catch ( ... ) {} +} + void ZT_version(int *major,int *minor,int *revision,unsigned long *featureFlags) { if (major) *major = ZEROTIER_ONE_VERSION_MAJOR; diff --git a/node/Node.hpp b/node/Node.hpp index b8bd4dc54..4094a79e9 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -124,6 +124,7 @@ public: ZT_ResultCode clusterAddMember(unsigned int memberId); void clusterRemoveMember(unsigned int memberId); void clusterHandleIncomingMessage(const void *msg,unsigned int len); + void clusterStatus(ZT_ClusterStatus *cs); // Internal functions ------------------------------------------------------ diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp index 7329322af..1b70f17cb 100644 --- a/node/SelfAwareness.cpp +++ b/node/SelfAwareness.cpp @@ -94,7 +94,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi Mutex::Lock _l(_phy_m); - PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,scope)]; + PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)]; if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) { entry.mySurface = myPhysicalAddress; @@ -105,14 +105,15 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi entry.ts = now; TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str()); - // Erase all entries (other than this one) for this scope to prevent thrashing - // Note: we should probably not use 'entry' after this + // Erase all entries in this scope that were not reported by this remote address to prevent 'thrashing' + // due to multiple reports of endpoint change. + // Don't use 'entry' after this since hash table gets modified. { Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy); PhySurfaceKey *k = (PhySurfaceKey *)0; PhySurfaceEntry *e = (PhySurfaceEntry *)0; while (i.next(k,e)) { - if ((k->reporter != reporter)&&(k->scope == scope)) + if ((k->reporterPhysicalAddress != reporterPhysicalAddress)&&(k->scope == scope)) _phy.erase(*k); } } diff --git a/node/SelfAwareness.hpp b/node/SelfAwareness.hpp index 3133553ee..400b05e6e 100644 --- a/node/SelfAwareness.hpp +++ b/node/SelfAwareness.hpp @@ -69,14 +69,14 @@ private: struct PhySurfaceKey { Address reporter; + InetAddress reporterPhysicalAddress; InetAddress::IpScope scope; - inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); } - PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {} - PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {} - inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); } - inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(scope == k.scope)); } + PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {} + + inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); } + inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); } }; struct PhySurfaceEntry { diff --git a/node/Topology.cpp b/node/Topology.cpp index 88c8856c4..e56d1f47b 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -35,14 +35,9 @@ namespace ZeroTier { -// Default World #define ZT_DEFAULT_WORLD_LENGTH 494 static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x4f,0xdf,0xbf,0xfc,0xbb,0x6c,0x7e,0x15,0x67,0x85,0x1b,0xb4,0x65,0x04,0x01,0xaf,0x56,0xbf,0xe7,0x63,0x9d,0x77,0xef,0xa4,0x1e,0x61,0x53,0x88,0xcb,0x8d,0x78,0xe5,0x47,0x38,0x98,0x5a,0x6c,0x8a,0xdd,0xe6,0x9c,0x65,0xdf,0x1a,0x80,0x63,0xce,0x2e,0x4d,0x48,0x24,0x3d,0x68,0x87,0x96,0x13,0x89,0xba,0x25,0x6f,0xc9,0xb0,0x9f,0x20,0xc5,0x4c,0x51,0x7b,0x30,0xb7,0x5f,0xba,0xca,0xa4,0xc5,0x48,0xa3,0x15,0xab,0x2f,0x1d,0x64,0xe8,0x04,0x42,0xb3,0x1c,0x51,0x8b,0x2a,0x04,0x01,0xf8,0xe1,0x81,0xaf,0x60,0x2f,0x70,0x3e,0xcd,0x0b,0x21,0x38,0x19,0x62,0x02,0xbd,0x0e,0x33,0x1d,0x0a,0x7b,0xf1,0xec,0xad,0xef,0x54,0xb3,0x7b,0x17,0x84,0xaa,0xda,0x0a,0x85,0x5d,0x0b,0x1c,0x05,0x83,0xb9,0x0e,0x3e,0xe3,0xb4,0xd1,0x8b,0x5b,0x64,0xf7,0xcf,0xe1,0xff,0x5d,0xc2,0x2a,0xcf,0x60,0x7b,0x09,0xb4,0xa3,0x86,0x3c,0x5a,0x7e,0x31,0xa0,0xc7,0xb4,0x86,0xe3,0x41,0x33,0x04,0x7e,0x19,0x87,0x6a,0xba,0x00,0x2a,0x6e,0x2b,0x23,0x18,0x93,0x0f,0x60,0xeb,0x09,0x7f,0x70,0xd0,0xf4,0xb0,0x28,0xb2,0xcd,0x6d,0x3d,0x0c,0x63,0xc0,0x14,0xb9,0x03,0x9f,0xf3,0x53,0x90,0xe4,0x11,0x81,0xf2,0x16,0xfb,0x2e,0x6f,0xa8,0xd9,0x5c,0x1e,0xe9,0x66,0x71,0x56,0x41,0x19,0x05,0xc3,0xdc,0xcf,0xea,0x78,0xd8,0xc6,0xdf,0xaf,0xba,0x68,0x81,0x70,0xb3,0xfa,0x00,0x01,0x04,0xc6,0xc7,0x61,0xdc,0x27,0x09,0x88,0x41,0x40,0x8a,0x2e,0x00,0xbb,0x1d,0x31,0xf2,0xc3,0x23,0xe2,0x64,0xe9,0xe6,0x41,0x72,0xc1,0xa7,0x4f,0x77,0x89,0x95,0x55,0xed,0x10,0x75,0x1c,0xd5,0x6e,0x86,0x40,0x5c,0xde,0x11,0x8d,0x02,0xdf,0xfe,0x55,0x5d,0x46,0x2c,0xcf,0x6a,0x85,0xb5,0x63,0x1c,0x12,0x35,0x0c,0x8d,0x5d,0xc4,0x09,0xba,0x10,0xb9,0x02,0x5d,0x0f,0x44,0x5c,0xf4,0x49,0xd9,0x2b,0x1c,0x00,0x01,0x04,0x6b,0xbf,0x2e,0xd2,0x27,0x09,0x8a,0xcf,0x05,0x9f,0xe3,0x00,0x48,0x2f,0x6e,0xe5,0xdf,0xe9,0x02,0x31,0x9b,0x41,0x9d,0xe5,0xbd,0xc7,0x65,0x20,0x9c,0x0e,0xcd,0xa3,0x8c,0x4d,0x6e,0x4f,0xcf,0x0d,0x33,0x65,0x83,0x98,0xb4,0x52,0x7d,0xcd,0x22,0xf9,0x31,0x12,0xfb,0x9b,0xef,0xd0,0x2f,0xd7,0x8b,0xf7,0x26,0x1b,0x33,0x3f,0xc1,0x05,0xd1,0x92,0xa6,0x23,0xca,0x9e,0x50,0xfc,0x60,0xb3,0x74,0xa5,0x00,0x01,0x04,0xa2,0xf3,0x4d,0x6f,0x27,0x09,0x9d,0x21,0x90,0x39,0xf3,0x00,0x01,0xf0,0x92,0x2a,0x98,0xe3,0xb3,0x4e,0xbc,0xbf,0xf3,0x33,0x26,0x9d,0xc2,0x65,0xd7,0xa0,0x20,0xaa,0xb6,0x9d,0x72,0xbe,0x4d,0x4a,0xcc,0x9c,0x8c,0x92,0x94,0x78,0x57,0x71,0x25,0x6c,0xd1,0xd9,0x42,0xa9,0x0d,0x1b,0xd1,0xd2,0xdc,0xa3,0xea,0x84,0xef,0x7d,0x85,0xaf,0xe6,0x61,0x1f,0xb4,0x3f,0xf0,0xb7,0x41,0x26,0xd9,0x0a,0x6e,0x00,0x01,0x04,0x80,0xc7,0xc5,0xd9,0x27,0x09}; -// ALICE-TEST -//#define ZT_DEFAULT_WORLD_LENGTH 257 -//static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x50,0x81,0x2a,0x54,0x6f,0x72,0xb0,0x3b,0xbe,0x73,0xda,0xbd,0xfb,0x85,0x77,0x9f,0xc9,0x2e,0x17,0xc8,0x11,0x6e,0xda,0x61,0x80,0xd1,0x41,0xcb,0x7c,0x2d,0x2b,0xa4,0x34,0x75,0x19,0x64,0x20,0x80,0x0a,0x22,0x32,0xf2,0x01,0x6c,0xfe,0x79,0xa6,0x7d,0xec,0x10,0x7e,0x03,0xf3,0xa2,0xa0,0x19,0xc8,0x7c,0xfd,0x6c,0x56,0x52,0xa8,0xfb,0xdc,0xfb,0x93,0x81,0x3e,0x63,0x8b,0xb3,0xb6,0x72,0x45,0xa9,0x81,0x81,0xcc,0xea,0x7f,0x2f,0xd9,0x59,0xce,0xc8,0x51,0x12,0xc3,0xe3,0x44,0x76,0x54,0xed,0xe7,0x8d,0x34,0x0b,0x5d,0x10,0x3d,0x52,0x04,0x9b,0xe1,0xb2,0x36,0x51,0x75,0x14,0x30,0x53,0xe8,0x4b,0xe4,0x91,0x9a,0xed,0x99,0x56,0xa3,0x8d,0x5e,0x14,0xff,0x66,0xd8,0x4f,0xf7,0x3c,0x23,0xbe,0x02,0xbb,0x1e,0xb6,0x7e,0x07,0xfa,0x7c,0x7e,0x50,0xe8,0x40,0xf9,0x37,0x70,0x1a,0x75,0xcf,0x19,0xe6,0x83,0xe1,0x5c,0x20,0x1d,0x1e,0x5b,0xe5,0x6a,0xbe,0xe7,0xab,0xec,0x01,0xd6,0xdd,0xca,0x6a,0xb5,0x00,0x4e,0x76,0x12,0x07,0xd8,0xb4,0x20,0x0b,0xe4,0x4f,0x47,0x8e,0x3d,0xa1,0x48,0xc1,0x60,0x99,0x11,0x0e,0xe7,0x1b,0x64,0x58,0x6d,0xda,0x11,0x8e,0x40,0x22,0xab,0x63,0x68,0x2c,0xe1,0x37,0xda,0x8b,0xa8,0x17,0xfc,0x7f,0x73,0xaa,0x31,0x63,0xf2,0xe3,0x33,0x93,0x3e,0x29,0x94,0xc4,0x6b,0x4f,0x41,0x19,0x30,0x7b,0xe8,0x85,0x5a,0x72,0x00,0x01,0x04,0xa9,0x39,0x8f,0x68,0x27,0x09}; - Topology::Topology(const RuntimeEnvironment *renv) : RR(renv), _amRoot(false) @@ -337,6 +332,21 @@ void Topology::clean(uint64_t now) } } +unsigned long Topology::countAlive() const +{ + const uint64_t now = RR->node->now(); + unsigned long cnt = 0; + Mutex::Lock _l(_lock); + Hashtable< Address,SharedPtr >::Iterator i(const_cast(this)->_peers); + Address *a = (Address *)0; + SharedPtr *p = (SharedPtr *)0; + while (i.next(a,p)) { + if ((*p)->alive(now)) + ++cnt; + } + return cnt; +} + Identity Topology::_getIdentity(const Address &zta) { char p[128]; @@ -358,16 +368,18 @@ void Topology::_setWorld(const World &newWorld) _rootAddresses.clear(); _rootPeers.clear(); for(std::vector::const_iterator r(_world.roots().begin());r!=_world.roots().end();++r) { - if (r->identity == RR->identity) - _amRoot = true; _rootAddresses.push_back(r->identity.address()); - SharedPtr *rp = _peers.get(r->identity.address()); - if (rp) { - _rootPeers.push_back(*rp); - } else if (r->identity.address() != RR->identity.address()) { - SharedPtr newrp(new Peer(RR->identity,r->identity)); - _peers.set(r->identity.address(),newrp); - _rootPeers.push_back(newrp); + if (r->identity.address() == RR->identity.address()) { + _amRoot = true; + } else { + SharedPtr *rp = _peers.get(r->identity.address()); + if (rp) { + _rootPeers.push_back(*rp); + } else { + SharedPtr newrp(new Peer(RR->identity,r->identity)); + _peers.set(r->identity.address(),newrp); + _rootPeers.push_back(newrp); + } } } } diff --git a/node/Topology.hpp b/node/Topology.hpp index 48e264a87..ee9827b95 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -192,6 +192,11 @@ public: */ void clean(uint64_t now); + /** + * @return Number of 'alive' peers + */ + unsigned long countAlive() const; + /** * Apply a function or function object to all peers * @@ -225,6 +230,11 @@ public: return _peers.entries(); } + /** + * @return True if I am a root server in the current World + */ + inline bool amRoot() const throw() { return _amRoot; } + private: Identity _getIdentity(const Address &zta); void _setWorld(const World &newWorld); diff --git a/objects.mk b/objects.mk index 10e0c334c..6dd5ea30e 100644 --- a/objects.mk +++ b/objects.mk @@ -26,5 +26,6 @@ OBJS=\ osdep/BackgroundResolver.o \ osdep/Http.o \ osdep/OSUtils.o \ + service/ClusterGeoIpService.o \ service/ControlPlane.o \ service/OneService.o diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index 7f790e5d2..6737034e3 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -64,6 +64,12 @@ #include #include +#if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux) +#ifndef IPV6_DONTFRAG +#define IPV6_DONTFRAG 62 +#endif +#endif + #define ZT_PHY_SOCKFD_TYPE int #define ZT_PHY_SOCKFD_NULL (-1) #define ZT_PHY_SOCKFD_VALID(s) ((s) > -1) @@ -374,6 +380,9 @@ public: f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f)); #ifdef IPV6_MTU_DISCOVER f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f)); +#endif +#ifdef IPV6_DONTFRAG + f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f)); #endif } f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f)); diff --git a/service/ClusterDefinition.hpp b/service/ClusterDefinition.hpp new file mode 100644 index 000000000..d02894e4e --- /dev/null +++ b/service/ClusterDefinition.hpp @@ -0,0 +1,125 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_CLUSTERDEFINITION_HPP +#define ZT_CLUSTERDEFINITION_HPP + +#ifdef ZT_ENABLE_CLUSTER + +#include +#include + +#include "../node/Constants.hpp" +#include "../node/Utils.hpp" +#include "../osdep/OSUtils.hpp" + +namespace ZeroTier { + +/** + * Parser for cluster definition file + */ +class ClusterDefinition +{ +public: + struct MemberDefinition + { + MemberDefinition() : id(0),x(0),y(0),z(0) { name[0] = (char)0; } + + unsigned int id; + int x,y,z; + char name[256]; + InetAddress clusterEndpoint; + std::vector zeroTierEndpoints; + }; + + ClusterDefinition(uint64_t myAddress,const char *pathToClusterFile) + { + std::string cf; + if (!OSUtils::readFile(pathToClusterFile,cf)) + return; + + char myAddressStr[64]; + Utils::snprintf(myAddressStr,sizeof(myAddressStr),"%.10llx",myAddress); + + std::vector lines(Utils::split(cf.c_str(),"\r\n","","")); + for(std::vector::iterator l(lines.begin());l!=lines.end();++l) { + std::vector fields(Utils::split(l->c_str()," \t","","")); + if ((fields.size() < 5)||(fields[0][0] == '#')||(fields[0] != myAddressStr)) + continue; + + int id = Utils::strToUInt(fields[1].c_str()); + if ((id < 0)||(id > ZT_CLUSTER_MAX_MEMBERS)) + continue; + MemberDefinition &md = _md[id]; + + md.id = (unsigned int)id; + if (fields.size() >= 6) { + std::vector xyz(Utils::split(fields[5].c_str(),",","","")); + md.x = (xyz.size() > 0) ? Utils::strToInt(xyz[0].c_str()) : 0; + md.y = (xyz.size() > 1) ? Utils::strToInt(xyz[1].c_str()) : 0; + md.z = (xyz.size() > 2) ? Utils::strToInt(xyz[2].c_str()) : 0; + } + Utils::scopy(md.name,sizeof(md.name),fields[2].c_str()); + md.clusterEndpoint.fromString(fields[3]); + if (!md.clusterEndpoint) + continue; + std::vector zips(Utils::split(fields[4].c_str(),",","","")); + for(std::vector::iterator zip(zips.begin());zip!=zips.end();++zip) { + InetAddress i; + i.fromString(*zip); + if (i) + md.zeroTierEndpoints.push_back(i); + } + + _ids.push_back((unsigned int)id); + } + + std::sort(_ids.begin(),_ids.end()); + } + + inline const MemberDefinition &operator[](unsigned int id) const throw() { return _md[id]; } + inline unsigned int size() const throw() { return (unsigned int)_ids.size(); } + inline const std::vector &ids() const throw() { return _ids; } + + inline std::vector members() const + { + std::vector m; + for(std::vector::const_iterator i(_ids.begin());i!=_ids.end();++i) + m.push_back(_md[*i]); + return m; + } + +private: + MemberDefinition _md[ZT_CLUSTER_MAX_MEMBERS]; + std::vector _ids; +}; + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER + +#endif diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp new file mode 100644 index 000000000..9baa75064 --- /dev/null +++ b/service/ClusterGeoIpService.cpp @@ -0,0 +1,196 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifdef ZT_ENABLE_CLUSTER + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ClusterGeoIpService.hpp" +#include "../node/Utils.hpp" +#include "../osdep/OSUtils.hpp" + +#define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL (60 * 60 * 1000) + +namespace ZeroTier { + +ClusterGeoIpService::ClusterGeoIpService(const char *pathToExe) : + _pathToExe(pathToExe), + _sOutputFd(-1), + _sInputFd(-1), + _sPid(0), + _run(true) +{ + _thread = Thread::start(this); +} + +ClusterGeoIpService::~ClusterGeoIpService() +{ + _run = false; + long p = _sPid; + if (p > 0) { + ::kill(p,SIGTERM); + Thread::sleep(500); + ::kill(p,SIGKILL); + } + Thread::join(_thread); +} + +bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) +{ + InetAddress ipNoPort(ip); + ipNoPort.setPort(0); // we index cache by IP only + const uint64_t now = OSUtils::now(); + + bool r = false; + { + Mutex::Lock _l(_cache_m); + std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort)); + if (c != _cache.end()) { + x = c->second.x; + y = c->second.y; + z = c->second.z; + if ((now - c->second.ts) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL) + return true; + else r = true; // return true but refresh as well + } + } + + { + Mutex::Lock _l(_sOutputLock); + if (_sOutputFd >= 0) { + std::string ips(ipNoPort.toIpString()); + ips.push_back('\n'); + //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str()); + ::write(_sOutputFd,ips.data(),ips.length()); + } + } + + return r; +} + +void ClusterGeoIpService::threadMain() + throw() +{ + char linebuf[65536]; + char buf[65536]; + long n,lineptr; + + while (_run) { + { + Mutex::Lock _l(_sOutputLock); + + _sOutputFd = -1; + _sInputFd = -1; + _sPid = 0; + + int stdinfds[2] = { 0,0 }; // sub-process's stdin, our output + int stdoutfds[2] = { 0,0 }; // sub-process's stdout, our input + ::pipe(stdinfds); + ::pipe(stdoutfds); + + long p = (long)::vfork(); + if (p < 0) { + Thread::sleep(500); + continue; + } else if (p == 0) { + ::close(stdinfds[1]); + ::close(stdoutfds[0]); + ::dup2(stdinfds[0],STDIN_FILENO); + ::dup2(stdoutfds[1],STDOUT_FILENO); + ::execl(_pathToExe.c_str(),_pathToExe.c_str(),(const char *)0); + ::exit(1); + } else { + ::close(stdinfds[0]); + ::close(stdoutfds[1]); + _sOutputFd = stdinfds[1]; + _sInputFd = stdoutfds[0]; + _sPid = p; + } + } + + lineptr = 0; + while (_run) { + n = ::read(_sInputFd,buf,sizeof(buf)); + if (n <= 0) { + if (errno == EINTR) + continue; + else break; + } + for(long i=0;i (long)sizeof(linebuf)) + lineptr = 0; + if ((buf[i] == '\n')||(buf[i] == '\r')) { + linebuf[lineptr] = (char)0; + if (lineptr > 0) { + //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf); + try { + std::vector result(Utils::split(linebuf,",","","")); + if ((result.size() >= 7)&&(result[1] == "1")) { + InetAddress rip(result[0],0); + if ((rip.ss_family == AF_INET)||(rip.ss_family == AF_INET6)) { + _CE ce; + ce.ts = OSUtils::now(); + ce.x = (int)::strtol(result[4].c_str(),(char **)0,10); + ce.y = (int)::strtol(result[5].c_str(),(char **)0,10); + ce.z = (int)::strtol(result[6].c_str(),(char **)0,10); + //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z); + { + Mutex::Lock _l2(_cache_m); + _cache[rip] = ce; + } + } + } + } catch ( ... ) {} + } + lineptr = 0; + } else linebuf[lineptr++] = buf[i]; + } + } + + ::close(_sOutputFd); + ::close(_sInputFd); + ::kill(_sPid,SIGTERM); + Thread::sleep(250); + ::kill(_sPid,SIGKILL); + ::waitpid(_sPid,(int *)0,0); + } +} + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER diff --git a/service/ClusterGeoIpService.hpp b/service/ClusterGeoIpService.hpp new file mode 100644 index 000000000..fd04ba1d6 --- /dev/null +++ b/service/ClusterGeoIpService.hpp @@ -0,0 +1,94 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_CLUSTERGEOIPSERVICE_HPP +#define ZT_CLUSTERGEOIPSERVICE_HPP + +#ifdef ZT_ENABLE_CLUSTER + +#include +#include +#include + +#include "../node/Constants.hpp" +#include "../node/InetAddress.hpp" +#include "../node/Mutex.hpp" +#include "../osdep/Thread.hpp" + +namespace ZeroTier { + +/** + * Runs the Cluster GeoIP service in the background and resolves geoIP queries + */ +class ClusterGeoIpService +{ +public: + /** + * @param pathToExe Path to cluster geo-resolution service executable + */ + ClusterGeoIpService(const char *pathToExe); + + ~ClusterGeoIpService(); + + /** + * Attempt to locate an IP + * + * This returns true if x, y, and z are set. Otherwise it returns false + * and a geo-locate job is ordered in the background. This usually takes + * 500-1500ms to complete, after which time results will be available. + * If false is returned the supplied coordinate variables are unchanged. + * + * @param ip IPv4 or IPv6 address + * @param x Reference to variable to receive X + * @param y Reference to variable to receive Y + * @param z Reference to variable to receive Z + * @return True if coordinates were set + */ + bool locate(const InetAddress &ip,int &x,int &y,int &z); + + void threadMain() + throw(); + +private: + const std::string _pathToExe; + int _sOutputFd; + int _sInputFd; + volatile long _sPid; + volatile bool _run; + Thread _thread; + Mutex _sOutputLock; + + struct _CE { uint64_t ts; int x,y,z; }; + std::map< InetAddress,_CE > _cache; + Mutex _cache_m; +}; + +} // namespace ZeroTier + +#endif // ZT_ENABLE_CLUSTER + +#endif diff --git a/service/ControlPlane.cpp b/service/ControlPlane.cpp index 7affb08c6..31eca7b61 100644 --- a/service/ControlPlane.cpp +++ b/service/ControlPlane.cpp @@ -354,8 +354,38 @@ unsigned int ControlPlane::handleRequest( if (ps[0] == "status") { responseContentType = "application/json"; + ZT_NodeStatus status; _node->status(&status); + + std::string clusterJson; +#ifdef ZT_ENABLE_CLUSTER + { + ZT_ClusterStatus cs; + _node->clusterStatus(&cs); + + if (cs.clusterSize >= 1) { + char t[4096]; + Utils::snprintf(t,sizeof(t),"{\n\t\t\"myId\": %u,\n\t\t\"clusterSize\": %u,\n\t\t\"members: [\n",cs.myId,cs.clusterSize); + clusterJson.append(t); + for(unsigned int i=0;i 0) ? clusterJson.c_str() : "null")); responseBody = json; scode = 200; } else if (ps[0] == "config") { diff --git a/service/OneService.cpp b/service/OneService.cpp index 6b28c41e6..729812edb 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -58,6 +58,8 @@ #include "OneService.hpp" #include "ControlPlane.hpp" +#include "ClusterGeoIpService.hpp" +#include "ClusterDefinition.hpp" /** * Uncomment to enable UDP breakage switch @@ -366,6 +368,11 @@ static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,c static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len); static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len); +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z); +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len); static int ShttpOnMessageBegin(http_parser *parser); @@ -419,26 +426,32 @@ class OneServiceImpl : public OneService { public: OneServiceImpl(const char *hp,unsigned int port) : - _homePath((hp) ? hp : "."), - _tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY), + _homePath((hp) ? hp : ".") + ,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY) #ifdef ZT_ENABLE_NETWORK_CONTROLLER - _controller((SqliteNetworkController *)0), + ,_controller((SqliteNetworkController *)0) #endif - _phy(this,false,true), - _node((Node *)0), - _controlPlane((ControlPlane *)0), - _lastDirectReceiveFromGlobal(0), - _lastSendToGlobal(0), - _lastRestart(0), - _nextBackgroundTaskDeadline(0), - _tcpFallbackTunnel((TcpConnection *)0), - _termReason(ONE_STILL_RUNNING), - _port(0), + ,_phy(this,false,true) + ,_node((Node *)0) + ,_controlPlane((ControlPlane *)0) + ,_lastDirectReceiveFromGlobal(0) + ,_lastSendToGlobal(0) + ,_lastRestart(0) + ,_nextBackgroundTaskDeadline(0) + ,_tcpFallbackTunnel((TcpConnection *)0) + ,_termReason(ONE_STILL_RUNNING) + ,_port(0) #ifdef ZT_USE_MINIUPNPC - _v4UpnpUdpSocket((PhySocket *)0), - _upnpClient((UPNPClient *)0), + ,_v4UpnpUdpSocket((PhySocket *)0) + ,_upnpClient((UPNPClient *)0) #endif - _run(true) +#ifdef ZT_ENABLE_CLUSTER + ,_clusterMessageSocket((PhySocket *)0) + ,_clusterGeoIpService((ClusterGeoIpService *)0) + ,_clusterDefinition((ClusterDefinition *)0) + ,_clusterMemberId(0) +#endif + ,_run(true) { const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random for(int k=0;ksetNetconfMaster((void *)_controller); #endif +#ifdef ZT_ENABLE_CLUSTER + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) { + _clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str()); + if (_clusterDefinition->size() > 0) { + std::vector members(_clusterDefinition->members()); + for(std::vector::iterator m(members.begin());m!=members.end();++m) { + PhySocket *cs = _phy.udpBind(reinterpret_cast(&(m->clusterEndpoint))); + if (cs) { + if (_clusterMessageSocket) { + _phy.close(_clusterMessageSocket,false); + _phy.close(cs,false); + + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!"; + return _termReason; + } + _clusterMessageSocket = cs; + _clusterMemberId = m->id; + } + } + + if (!_clusterMessageSocket) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port."; + return _termReason; + } + + if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str())) + _clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()); + + const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId]; + InetAddress endpoints[255]; + unsigned int numEndpoints = 0; + for(std::vector::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i) + endpoints[numEndpoints++] = *i; + + if (_node->clusterInit( + _clusterMemberId, + reinterpret_cast(endpoints), + numEndpoints, + me.x, + me.y, + me.z, + &SclusterSendFunction, + this, + (_clusterGeoIpService) ? &SclusterGeoIpFunction : 0, + this) == ZT_RESULT_OK) { + + std::vector members(_clusterDefinition->members()); + for(std::vector::iterator m(members.begin());m!=members.end();++m) { + if (m->id != _clusterMemberId) + _node->clusterAddMember(m->id); + } + + } + } else { + delete _clusterDefinition; + _clusterDefinition = (ClusterDefinition *)0; + } + } +#endif + _controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str()); _controlPlane->addAuthToken(authToken.c_str()); @@ -781,10 +865,18 @@ public: inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) { +#ifdef ZT_ENABLE_CLUSTER + if (sock == _clusterMessageSocket) { + _node->clusterHandleIncomingMessage(data,len); + return; + } +#endif + #ifdef ZT_BREAK_UDP if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP")) return; #endif + if ((len >= 16)&&(reinterpret_cast(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL)) _lastDirectReceiveFromGlobal = OSUtils::now(); ZT_ResultCode rc = _node->processWirePacket( @@ -1303,7 +1395,6 @@ public: _phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections } -private: std::string _dataStorePrepPath(const char *name) const { std::string p(_homePath); @@ -1358,6 +1449,13 @@ private: UPNPClient *_upnpClient; #endif +#ifdef ZT_ENABLE_CLUSTER + PhySocket *_clusterMessageSocket; + ClusterGeoIpService *_clusterGeoIpService; + ClusterDefinition *_clusterDefinition; + unsigned int _clusterMemberId; +#endif + bool _run; Mutex _run_m; }; @@ -1375,6 +1473,21 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); } +#ifdef ZT_ENABLE_CLUSTER +static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len) +{ + OneServiceImpl *const impl = reinterpret_cast(uptr); + const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId]; + if (md.clusterEndpoint) + impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast(&(md.clusterEndpoint)),data,len); +} +static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z) +{ + OneServiceImpl *const impl = reinterpret_cast(uptr); + return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast(addr)),*x,*y,*z)); +} +#endif + static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { reinterpret_cast(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); } diff --git a/service/OneService.hpp b/service/OneService.hpp index 2f76ebaa2..4f7b988bf 100644 --- a/service/OneService.hpp +++ b/service/OneService.hpp @@ -43,6 +43,9 @@ namespace ZeroTier { * periodically checked and updates are automatically downloaded, verified * against a built-in list of update signing keys, and installed. This is * only supported for certain platforms. + * + * If built with ZT_ENABLE_CLUSTER, a 'cluster' file is checked and if + * present is read to determine the identity of other cluster members. */ class OneService { diff --git a/world/alice-test/mkworld.cpp b/world/alice-test/mkworld.cpp index e680b97ee..8940db2cf 100644 --- a/world/alice-test/mkworld.cpp +++ b/world/alice-test/mkworld.cpp @@ -133,13 +133,35 @@ int main(int argc,char **argv) std::sort(roots.back().stableEndpoints.begin(),roots.back().stableEndpoints.end()); #endif - // ALICE TEST + // NOTE -- these are temporary test identities -- this is not yet the 'real' network. + // (but these are the real nodes) + + // Alice -- global geo-clustered root #1 roots.push_back(World::Root()); roots.back().identity = Identity("d6ddca6ab5:0:4e761207d8b4200be44f478e3da148c16099110ee71b64586dda118e4022ab63682ce137da8ba817fc7f73aa3163f2e333933e2994c46b4f4119307be8855a72"); - roots.back().stableEndpoints.push_back(InetAddress("169.57.143.104/9993")); - std::sort(roots.back().stableEndpoints.begin(),roots.back().stableEndpoints.end()); + roots.back().stableEndpoints.push_back(InetAddress("188.166.94.177/9993")); // Amsterdam IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2a03:b0c0:2:d0::7d:1/9993")); // Amsterdam IPv6 + roots.back().stableEndpoints.push_back(InetAddress("159.203.97.171/9993")); // New York IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2604:a880:800:a1::54:6001/9993 ")); // New York IPv6 + roots.back().stableEndpoints.push_back(InetAddress("169.57.143.104/9993")); // Sao Paolo IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2607:f0d0:1d01:57::2/9993")); // Sao Paolo IPv6 + roots.back().stableEndpoints.push_back(InetAddress("104.238.182.83/9993")); // San Francisco IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:ac00:809:5400:ff:fe15:f3f4/9993")); // San Francisco IPv6 + roots.back().stableEndpoints.push_back(InetAddress("128.199.182.9/9993")); // Singapore IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2400:6180:0:d0::1b:1001/9993")); // Singapore IPv6 - std::sort(roots.begin(),roots.end()); + // Bob -- global geo-clustered root #2 + roots.back().identity = Identity("16ebbd6c5d:0:47d39bca9d0a5cf70148e39f6c45199e17e0e32e4e46cac01ae5bcb21224137b097f40bdd982a921c3aabdcb9ada8b4f2bb0593753bfdb21cf12eac28c8d9042"); + roots.back().stableEndpoints.push_back(InetAddress("45.33.4.67/9993")); // Dallas IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2600:3c00::f03c:91ff:fe67:b704/9993")); // Dallas IPv6 + roots.back().stableEndpoints.push_back(InetAddress("139.162.157.243/9993")); // Frankfurt (Germany) IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2a01:7e01::f03c:91ff:fe67:3ffd/9993")); // Frankfurt (Germany) IPv6 + roots.back().stableEndpoints.push_back(InetAddress("45.32.246.179/9993")); // Sydney IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:5800:8bf8:5400:ff:fe15:b39a/9993")); // Sydney IPv6 + roots.back().stableEndpoints.push_back(InetAddress("45.32.248.87/9993")); // Tokyo IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:7000:9bc9:5400:00ff:fe15:c4f5/9993")); // Tokyo IPv6 + roots.back().stableEndpoints.push_back(InetAddress("159.203.2.154/9993")); // Toronto IPv4 + roots.back().stableEndpoints.push_back(InetAddress("2604:a880:cad:d0::26:7001/9993")); // Toronto IPv6 const uint64_t id = ZT_WORLD_ID_EARTH; const uint64_t ts = OSUtils::now();