Merge branch 'adamierymenko-dev' into windows-ui

This commit is contained in:
Grant Limberg 2015-10-26 18:10:27 -07:00
commit 352b83252f
24 changed files with 1155 additions and 339 deletions

View File

@ -3,11 +3,11 @@
export PATH=/bin:/usr/bin:/usr/local/bin:/sbin:/usr/sbin
cd `dirname $0`
if [ ! -d cluster-geo -o ! -f cluster-geo/index.js ]; then
if [ ! -d cluster-geo -o ! -f cluster-geo/cluster-geo.js ]; then
echo 'Cannot find ./cluster-geo containing NodeJS script files.'
exit 1
fi
cd cluster-geo
exec node index.js
exec node --harmony cluster-geo.js

View File

@ -1,3 +1,5 @@
"use strict";
//
// GeoIP lookup service
//
@ -20,10 +22,10 @@ function lookup(ip,callback)
cache.get(ip,function(err,cachedEntryJson) {
if ((!err)&&(cachedEntryJson)) {
try {
var cachedEntry = JSON.parse(cachedEntryJson.toString());
let cachedEntry = JSON.parse(cachedEntryJson.toString());
if (cachedEntry) {
var ts = cachedEntry.ts;
var r = cachedEntry.r;
let ts = cachedEntry.ts;
let r = cachedEntry.r;
if ((ts)&&(r)) {
if ((Date.now() - ts) < CACHE_TTL) {
r._cached = true;
@ -57,24 +59,24 @@ process.stdin.on('readable',function() {
var chunk;
while (null !== (chunk = process.stdin.read())) {
for(var i=0;i<chunk.length;++i) {
var c = chunk[i];
let c = chunk[i];
if ((c == 0x0d)||(c == 0x0a)) {
if (linebuf.length > 0) {
var ip = linebuf;
let ip = linebuf;
lookup(ip,function(err,result) {
if ((err)||(!result)||(!result.location)) {
return process.stdout.write(ip+',0,0,0,0,0,0\n');
} else {
var lat = parseFloat(result.location.latitude);
var lon = parseFloat(result.location.longitude);
let lat = parseFloat(result.location.latitude);
let lon = parseFloat(result.location.longitude);
// Convert to X,Y,Z coordinates from Earth's origin, Earth-as-sphere approximation.
var latRadians = lat * 0.01745329251994; // PI / 180
var lonRadians = lon * 0.01745329251994; // PI / 180
var cosLat = Math.cos(latRadians);
var x = Math.round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers
var y = Math.round(6371.0 * Math.sin(latRadians));
var z = Math.round(6371.0 * cosLat * Math.sin(lonRadians));
let latRadians = lat * 0.01745329251994; // PI / 180
let lonRadians = lon * 0.01745329251994; // PI / 180
let cosLat = Math.cos(latRadians);
let x = Math.round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers
let y = Math.round(6371.0 * Math.sin(latRadians));
let z = Math.round(6371.0 * cosLat * Math.sin(lonRadians));
return process.stdout.write(ip+',1,'+lat+','+lon+','+x+','+y+','+z+'\n');
}

View File

@ -2,7 +2,7 @@
"name": "cluster-geo",
"version": "1.0.0",
"description": "Cluster GEO-IP Query Service",
"main": "index.js",
"main": "cluster-geo.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},

View File

@ -131,12 +131,17 @@ extern "C" {
/**
* Maximum number of cluster members (and max member ID plus one)
*/
#define ZT_CLUSTER_MAX_MEMBERS 256
#define ZT_CLUSTER_MAX_MEMBERS 128
/**
* Maximum number of physical ZeroTier addresses a cluster member can report
*/
#define ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES 16
/**
* Maximum allowed cluster message length in bytes
*/
#define ZT_CLUSTER_MAX_MESSAGE_LENGTH 65535
#define ZT_CLUSTER_MAX_MESSAGE_LENGTH (1444 * 6)
/**
* A null/empty sockaddr (all zero) to signify an unspecified socket address
@ -879,6 +884,78 @@ typedef struct {
unsigned int nextHopCount;
} ZT_CircuitTestReport;
/**
* A cluster member's status
*/
typedef struct {
/**
* This cluster member's ID (from 0 to 1-ZT_CLUSTER_MAX_MEMBERS)
*/
unsigned int id;
/**
* Number of milliseconds since last 'alive' heartbeat message received via cluster backplane address
*/
unsigned int msSinceLastHeartbeat;
/**
* Non-zero if cluster member is alive
*/
int alive;
/**
* X, Y, and Z coordinates of this member (if specified, otherwise zero)
*
* What these mean depends on the location scheme being used for
* location-aware clustering. At present this is GeoIP and these
* will be the X, Y, and Z coordinates of the location on a spherical
* approximation of Earth where Earth's core is the origin (in km).
* They don't have to be perfect and need only be comparable with others
* to find shortest path via the standard vector distance formula.
*/
int x,y,z;
/**
* Cluster member's last reported load
*/
uint64_t load;
/**
* Number of peers this cluster member "has"
*/
uint64_t peers;
/**
* Physical ZeroTier endpoints for this member (where peers are sent when directed here)
*/
struct sockaddr_storage zeroTierPhysicalEndpoints[ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES];
/**
* Number of physical ZeroTier endpoints this member is announcing
*/
unsigned int numZeroTierPhysicalEndpoints;
} ZT_ClusterMemberStatus;
/**
* ZeroTier cluster status
*/
typedef struct {
/**
* My cluster member ID (a record for 'self' is included in member[])
*/
unsigned int myId;
/**
* Number of cluster members
*/
unsigned int clusterSize;
/**
* Cluster member statuses
*/
ZT_ClusterMemberStatus members[ZT_CLUSTER_MAX_MEMBERS];
} ZT_ClusterStatus;
/**
* An instance of a ZeroTier One node (opaque)
*/
@ -1439,6 +1516,17 @@ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId);
*/
void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len);
/**
* Get the current status of the cluster from this node's point of view
*
* Calling this without clusterInit() or without cluster support will just
* zero out the structure and show a cluster size of zero.
*
* @param node Node instance
* @param cs Cluster status structure to fill with data
*/
void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs);
/**
* Get ZeroTier One version
*

View File

@ -143,212 +143,223 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
return;
const uint16_t fromMemberId = dmsg.at<uint16_t>(0);
unsigned int ptr = 2;
if (fromMemberId == _id)
if (fromMemberId == _id) // sanity check: we don't talk to ourselves
return;
const uint16_t toMemberId = dmsg.at<uint16_t>(ptr);
ptr += 2;
if (toMemberId != _id)
if (toMemberId != _id) // sanity check: message not for us?
return;
_Member &m = _members[fromMemberId];
Mutex::Lock mlck(m.lock);
{ // make sure sender is actually considered a member
Mutex::Lock _l3(_memberIds_m);
if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end())
return;
}
try {
while (ptr < dmsg.size()) {
const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
const unsigned int nextPtr = ptr + mlen;
{
_Member &m = _members[fromMemberId];
Mutex::Lock mlck(m.lock);
int mtype = -1;
try {
switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
default:
break;
try {
while (ptr < dmsg.size()) {
const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
const unsigned int nextPtr = ptr + mlen;
if (nextPtr > dmsg.size())
break;
case STATE_MESSAGE_ALIVE: {
ptr += 7; // skip version stuff, not used yet
m.x = dmsg.at<int32_t>(ptr); ptr += 4;
m.y = dmsg.at<int32_t>(ptr); ptr += 4;
m.z = dmsg.at<int32_t>(ptr); ptr += 4;
ptr += 8; // skip local clock, not used
m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
ptr += 8; // skip flags, unused
int mtype = -1;
try {
switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
default:
break;
case STATE_MESSAGE_ALIVE: {
ptr += 7; // skip version stuff, not used yet
m.x = dmsg.at<int32_t>(ptr); ptr += 4;
m.y = dmsg.at<int32_t>(ptr); ptr += 4;
m.z = dmsg.at<int32_t>(ptr); ptr += 4;
ptr += 8; // skip local clock, not used
m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
ptr += 8; // skip flags, unused
#ifdef ZT_TRACE
std::string addrs;
std::string addrs;
#endif
unsigned int physicalAddressCount = dmsg[ptr++];
m.zeroTierPhysicalEndpoints.clear();
for(unsigned int i=0;i<physicalAddressCount;++i) {
m.zeroTierPhysicalEndpoints.push_back(InetAddress());
ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
if (!(m.zeroTierPhysicalEndpoints.back())) {
m.zeroTierPhysicalEndpoints.pop_back();
}
#ifdef ZT_TRACE
else {
if (addrs.length() > 0)
addrs.push_back(',');
addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
}
#endif
unsigned int physicalAddressCount = dmsg[ptr++];
for(unsigned int i=0;i<physicalAddressCount;++i) {
m.zeroTierPhysicalEndpoints.push_back(InetAddress());
ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
if (!(m.zeroTierPhysicalEndpoints.back())) {
m.zeroTierPhysicalEndpoints.pop_back();
}
m.lastReceivedAliveAnnouncement = RR->node->now();
#ifdef ZT_TRACE
else {
if (addrs.length() > 0)
addrs.push_back(',');
addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
}
TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str());
#endif
}
m.lastReceivedAliveAnnouncement = RR->node->now();
#ifdef ZT_TRACE
TRACE("[%u] I'm alive! send me peers at %s",(unsigned int)fromMemberId,addrs.c_str());
#endif
} break;
} break;
case STATE_MESSAGE_HAVE_PEER: {
try {
Identity id;
ptr += id.deserialize(dmsg,ptr);
if (id) {
RR->topology->saveIdentity(id);
case STATE_MESSAGE_HAVE_PEER: {
try {
Identity id;
ptr += id.deserialize(dmsg,ptr);
if (id) {
RR->topology->saveIdentity(id);
{ // Add or update peer affinity entry
_PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
Mutex::Lock _l2(_peerAffinities_m);
std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
i->timestamp = pa.timestamp;
} else {
_peerAffinities.push_back(pa);
std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
}
}
TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
}
} catch ( ... ) {
// ignore invalid identities
}
} break;
case STATE_MESSAGE_MULTICAST_LIKE: {
const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
} break;
case STATE_MESSAGE_COM: {
CertificateOfMembership com;
ptr += com.deserialize(dmsg,ptr);
if (com) {
TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
}
} break;
case STATE_MESSAGE_RELAY: {
const unsigned int numRemotePeerPaths = dmsg[ptr++];
InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
for(unsigned int i=0;i<numRemotePeerPaths;++i)
ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
if (destinationPeer) {
if (
(destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
(numRemotePeerPaths > 0)&&
(packetLen >= 18)&&
(reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
) {
// If remote peer paths were sent with this relayed packet, we do
// RENDEZVOUS. It's handled here for cluster-relayed packets since
// we don't have both Peer records so this is a different path.
const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
InetAddress bestDestV4,bestDestV6;
destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
InetAddress bestRemoteV4,bestRemoteV6;
for(unsigned int i=0;i<numRemotePeerPaths;++i) {
if ((bestRemoteV4)&&(bestRemoteV6))
break;
switch(remotePeerPaths[i].ss_family) {
case AF_INET:
if (!bestRemoteV4)
bestRemoteV4 = remotePeerPaths[i];
break;
case AF_INET6:
if (!bestRemoteV6)
bestRemoteV6 = remotePeerPaths[i];
break;
{ // Add or update peer affinity entry
_PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
Mutex::Lock _l2(_peerAffinities_m);
std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
i->timestamp = pa.timestamp;
} else {
_peerAffinities.push_back(pa);
std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
}
}
}
Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
rendezvousForDest.append((uint8_t)0);
remotePeerAddress.appendTo(rendezvousForDest);
TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
}
} catch ( ... ) {
// ignore invalid identities
}
} break;
Buffer<2048> rendezvousForOtherEnd;
remotePeerAddress.appendTo(rendezvousForOtherEnd);
rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
destinationAddress.appendTo(rendezvousForOtherEnd);
case STATE_MESSAGE_MULTICAST_LIKE: {
const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
} break;
bool haveMatch = false;
if ((bestDestV6)&&(bestRemoteV6)) {
haveMatch = true;
case STATE_MESSAGE_COM: {
CertificateOfMembership com;
ptr += com.deserialize(dmsg,ptr);
if (com) {
TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
}
} break;
rendezvousForDest.append((uint16_t)bestRemoteV6.port());
rendezvousForDest.append((uint8_t)16);
rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
case STATE_MESSAGE_RELAY: {
const unsigned int numRemotePeerPaths = dmsg[ptr++];
InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
for(unsigned int i=0;i<numRemotePeerPaths;++i)
ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
rendezvousForOtherEnd.append((uint8_t)16);
rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
} else if ((bestDestV4)&&(bestRemoteV4)) {
haveMatch = true;
if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
rendezvousForDest.append((uint16_t)bestRemoteV4.port());
rendezvousForDest.append((uint8_t)4);
rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
if (destinationPeer) {
if (
(destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
(numRemotePeerPaths > 0)&&
(packetLen >= 18)&&
(reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
) {
// If remote peer paths were sent with this relayed packet, we do
// RENDEZVOUS. It's handled here for cluster-relayed packets since
// we don't have both Peer records so this is a different path.
rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
rendezvousForOtherEnd.append((uint8_t)4);
rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
}
const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
if (haveMatch) {
_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
RR->sw->send(rendezvousForDest,true,0);
InetAddress bestDestV4,bestDestV6;
destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
InetAddress bestRemoteV4,bestRemoteV6;
for(unsigned int i=0;i<numRemotePeerPaths;++i) {
if ((bestRemoteV4)&&(bestRemoteV6))
break;
switch(remotePeerPaths[i].ss_family) {
case AF_INET:
if (!bestRemoteV4)
bestRemoteV4 = remotePeerPaths[i];
break;
case AF_INET6:
if (!bestRemoteV6)
bestRemoteV6 = remotePeerPaths[i];
break;
}
}
Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
rendezvousForDest.append((uint8_t)0);
remotePeerAddress.appendTo(rendezvousForDest);
Buffer<2048> rendezvousForOtherEnd;
remotePeerAddress.appendTo(rendezvousForOtherEnd);
rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
destinationAddress.appendTo(rendezvousForOtherEnd);
bool haveMatch = false;
if ((bestDestV6)&&(bestRemoteV6)) {
haveMatch = true;
rendezvousForDest.append((uint16_t)bestRemoteV6.port());
rendezvousForDest.append((uint8_t)16);
rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
rendezvousForOtherEnd.append((uint8_t)16);
rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
} else if ((bestDestV4)&&(bestRemoteV4)) {
haveMatch = true;
rendezvousForDest.append((uint16_t)bestRemoteV4.port());
rendezvousForDest.append((uint8_t)4);
rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
rendezvousForOtherEnd.append((uint8_t)4);
rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
}
if (haveMatch) {
_send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
RR->sw->send(rendezvousForDest,true,0);
}
}
}
}
}
} break;
} break;
case STATE_MESSAGE_PROXY_SEND: {
const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
Packet outp(rcpt,RR->identity.address(),verb);
outp.append(dmsg.field(ptr,len),len);
RR->sw->send(outp,true,0);
TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
} break;
case STATE_MESSAGE_PROXY_SEND: {
const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
Packet outp(rcpt,RR->identity.address(),verb);
outp.append(dmsg.field(ptr,len),len);
RR->sw->send(outp,true,0);
TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
} break;
}
} catch ( ... ) {
TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
// drop invalids
}
} catch ( ... ) {
TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
// drop invalids
}
ptr = nextPtr;
ptr = nextPtr;
}
} catch ( ... ) {
TRACE("invalid message (outer loop), discarding");
// drop invalids
}
} catch ( ... ) {
TRACE("invalid message (outer loop), discarding");
// drop invalids
}
}
@ -395,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
_send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size());
}
TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
return true;
} else {
TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
return false;
}
return false;
}
void Cluster::replicateHavePeer(const Identity &peerId)
@ -436,11 +449,12 @@ void Cluster::replicateHavePeer(const Identity &peerId)
void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
{
Buffer<4096> buf;
Buffer<2048> buf;
buf.append((uint64_t)nwid);
peerAddress.appendTo(buf);
group.mac().appendTo(buf);
buf.append((uint32_t)group.adi());
TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi());
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@ -452,8 +466,9 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co
void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
{
Buffer<4096> buf;
Buffer<2048> buf;
com.serialize(buf);
TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId());
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@ -504,7 +519,7 @@ void Cluster::doPeriodicTasks()
void Cluster::addMember(uint16_t memberId)
{
if (memberId >= ZT_CLUSTER_MAX_MEMBERS)
if ((memberId >= ZT_CLUSTER_MAX_MEMBERS)||(memberId == _id))
return;
Mutex::Lock _l2(_members[memberId].lock);
@ -553,11 +568,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
{
if (!peerPhysicalAddress) // sanity check
return false;
if (_addressToLocationFunction) {
// Pick based on location if it can be determined
int px = 0,py = 0,pz = 0;
if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) {
// No geo-info so no change
TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str());
return false;
}
@ -567,6 +583,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
const double currentDistance = _dist3d(_x,_y,_z,px,py,pz);
double bestDistance = (offload ? 2147483648.0 : currentDistance);
unsigned int bestMember = _id;
TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance);
{
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
@ -577,6 +594,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) {
double mdist = _dist3d(m.x,m.y,m.z,px,py,pz);
if (mdist < bestDistance) {
bestDistance = mdist;
bestMember = *mid;
best = m.zeroTierPhysicalEndpoints;
}
@ -585,7 +603,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
}
if (best.size() > 0) {
TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance);
TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance);
/* if (peer->remoteVersionProtocol() >= 5) {
// If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic
@ -620,8 +638,66 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy
}
}
void Cluster::status(ZT_ClusterStatus &status) const
{
const uint64_t now = RR->node->now();
memset(&status,0,sizeof(ZT_ClusterStatus));
ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS];
memset(ms,0,sizeof(ms));
status.myId = _id;
ms[_id] = &(status.members[status.clusterSize++]);
ms[_id]->id = _id;
ms[_id]->alive = 1;
ms[_id]->x = _x;
ms[_id]->y = _y;
ms[_id]->z = _z;
ms[_id]->peers = RR->topology->countAlive();
for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) {
if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
break;
memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
}
{
Mutex::Lock _l1(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check
break;
ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]);
_Member &m = _members[*mid];
Mutex::Lock ml(m.lock);
s->id = *mid;
s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement));
s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0;
s->x = m.x;
s->y = m.y;
s->z = m.z;
s->load = m.load;
for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) {
if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check
break;
memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage));
}
}
}
{
Mutex::Lock _l2(_peerAffinities_m);
for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) {
unsigned int mid = pi->clusterMemberId();
if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT))
++ms[mid]->peers;
}
}
}
void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len)
{
if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check
return;
_Member &m = _members[memberId];
// assumes m.lock is locked!
if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)

View File

@ -47,7 +47,7 @@
/**
* Timeout for cluster members being considered "alive"
*/
#define ZT_CLUSTER_TIMEOUT 30000
#define ZT_CLUSTER_TIMEOUT 10000
/**
* How often should we announce that we have a peer?
@ -57,7 +57,7 @@
/**
* Desired period between doPeriodicTasks() in milliseconds
*/
#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50
#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100
namespace ZeroTier {
@ -145,7 +145,7 @@ public:
STATE_MESSAGE_RELAY = 5,
/**
* Request to send a packet to a locally-known peer:
* Request that a cluster member send a packet to a locally-known peer:
* <[5] ZeroTier address of recipient>
* <[1] packet verb>
* <[2] length of packet payload>
@ -254,6 +254,13 @@ public:
*/
bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload);
/**
* Fill out ZT_ClusterStatus structure (from core API)
*
* @param status Reference to structure to hold result (anything there is replaced)
*/
void status(ZT_ClusterStatus &status) const;
private:
void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len);
void _flush(uint16_t memberId);

View File

@ -409,6 +409,16 @@ struct InetAddress : public sockaddr_storage
switch(b[p++]) {
case 0:
return 1;
case 0x01:
// TODO: Ethernet address (but accept for forward compatibility)
return 7;
case 0x02:
// TODO: Bluetooth address (but accept for forward compatibility)
return 7;
case 0x03:
// TODO: Other address types (but accept for forward compatibility)
// These could be extended/optional things like AF_UNIX, LTE Direct, shared memory, etc.
return (unsigned int)(b.template at<uint16_t>(p) + 3); // other addresses begin with 16-bit non-inclusive length
case 0x04:
ss_family = AF_INET;
memcpy(&(reinterpret_cast<struct sockaddr_in *>(this)->sin_addr.s_addr),b.field(p,4),4); p += 4;

View File

@ -144,7 +144,15 @@ void Network::multicastUnsubscribe(const MulticastGroup &mg)
bool Network::tryAnnounceMulticastGroupsTo(const SharedPtr<Peer> &peer)
{
Mutex::Lock _l(_lock);
return _tryAnnounceMulticastGroupsTo(RR->topology->rootAddresses(),_allMulticastGroups(),peer,RR->node->now());
if (
(_isAllowed(peer)) ||
(peer->address() == this->controller()) ||
(RR->topology->isRoot(peer->identity()))
) {
_announceMulticastGroupsTo(peer->address(),_allMulticastGroups());
return true;
}
return false;
}
bool Network::applyConfiguration(const SharedPtr<NetworkConfig> &conf)
@ -400,77 +408,80 @@ bool Network::_isAllowed(const SharedPtr<Peer> &peer) const
return false; // default position on any failure
}
bool Network::_tryAnnounceMulticastGroupsTo(const std::vector<Address> &alwaysAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const
{
// assumes _lock is locked
if (
(_isAllowed(peer)) ||
(peer->address() == this->controller()) ||
(std::find(alwaysAddresses.begin(),alwaysAddresses.end(),peer->address()) != alwaysAddresses.end())
) {
if ((_config)&&(_config->com())&&(!_config->isPublic())&&(peer->needsOurNetworkMembershipCertificate(_id,now,true))) {
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE);
_config->com().serialize(outp);
outp.armor(peer->key(),true);
peer->send(RR,outp.data(),outp.size(),now);
}
{
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) {
if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) {
outp.armor(peer->key(),true);
peer->send(RR,outp.data(),outp.size(),now);
outp.reset(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
}
// network ID, MAC, ADI
outp.append((uint64_t)_id);
mg->mac().appendTo(outp);
outp.append((uint32_t)mg->adi());
}
if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) {
outp.armor(peer->key(),true);
peer->send(RR,outp.data(),outp.size(),now);
}
}
return true;
}
return false;
}
class _AnnounceMulticastGroupsToAll
class _GetPeersThatNeedMulticastAnnouncement
{
public:
_AnnounceMulticastGroupsToAll(const RuntimeEnvironment *renv,Network *nw) :
_GetPeersThatNeedMulticastAnnouncement(const RuntimeEnvironment *renv,Network *nw) :
_now(renv->node->now()),
_controller(nw->controller()),
_network(nw),
_rootAddresses(renv->topology->rootAddresses()),
_allMulticastGroups(nw->_allMulticastGroups())
_rootAddresses(renv->topology->rootAddresses())
{}
inline void operator()(Topology &t,const SharedPtr<Peer> &p) { _network->_tryAnnounceMulticastGroupsTo(_rootAddresses,_allMulticastGroups,p,_now); }
inline void operator()(Topology &t,const SharedPtr<Peer> &p)
{
if (
(_network->_isAllowed(p)) ||
(p->address() == _controller) ||
(std::find(_rootAddresses.begin(),_rootAddresses.end(),p->address()) != _rootAddresses.end())
) {
peers.push_back(p->address());
}
}
std::vector<Address> peers;
private:
uint64_t _now;
Address _controller;
Network *_network;
std::vector<Address> _rootAddresses;
std::vector<MulticastGroup> _allMulticastGroups;
};
void Network::_announceMulticastGroups()
{
// Assumes _lock is locked
_AnnounceMulticastGroupsToAll afunc(RR,this);
RR->topology->eachPeer<_AnnounceMulticastGroupsToAll &>(afunc);
_GetPeersThatNeedMulticastAnnouncement gpfunc(RR,this);
RR->topology->eachPeer<_GetPeersThatNeedMulticastAnnouncement &>(gpfunc);
std::vector<MulticastGroup> allMulticastGroups(_allMulticastGroups());
for(std::vector<Address>::const_iterator pa(gpfunc.peers.begin());pa!=gpfunc.peers.end();++pa)
_announceMulticastGroupsTo(*pa,allMulticastGroups);
}
void Network::_announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const
{
// Assumes _lock is locked
// We push COMs ahead of MULTICAST_LIKE since they're used for access control -- a COM is a public
// credential so "over-sharing" isn't really an issue (and we only do so with roots).
if ((_config)&&(_config->com())&&(!_config->isPublic())) {
Packet outp(peerAddress,RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE);
_config->com().serialize(outp);
RR->sw->send(outp,true,0);
}
{
Packet outp(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) {
if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) {
RR->sw->send(outp,true,0);
outp.reset(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE);
}
// network ID, MAC, ADI
outp.append((uint64_t)_id);
mg->mac().appendTo(outp);
outp.append((uint32_t)mg->adi());
}
if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH)
RR->sw->send(outp,true,0);
}
}
std::vector<MulticastGroup> Network::_allMulticastGroups() const
{
// Assumes _lock is locked
std::vector<MulticastGroup> mgs;
mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1);
mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end());
@ -479,6 +490,7 @@ std::vector<MulticastGroup> Network::_allMulticastGroups() const
mgs.push_back(Network::BROADCAST);
std::sort(mgs.begin(),mgs.end());
mgs.erase(std::unique(mgs.begin(),mgs.end()),mgs.end());
return mgs;
}

View File

@ -56,7 +56,7 @@ namespace ZeroTier {
class RuntimeEnvironment;
class Peer;
class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp
class _GetPeersThatNeedMulticastAnnouncement;
/**
* A virtual LAN
@ -64,7 +64,7 @@ class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp
class Network : NonCopyable
{
friend class SharedPtr<Network>;
friend class _AnnounceMulticastGroupsToAll;
friend class _GetPeersThatNeedMulticastAnnouncement; // internal function object
public:
/**
@ -344,6 +344,7 @@ private:
bool _isAllowed(const SharedPtr<Peer> &peer) const;
bool _tryAnnounceMulticastGroupsTo(const std::vector<Address> &rootAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const;
void _announceMulticastGroups();
void _announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const;
std::vector<MulticastGroup> _allMulticastGroups() const;
const RuntimeEnvironment *RR;

View File

@ -211,8 +211,15 @@ public:
}
}
// If this is a network preferred relay, also always ping and if a stable endpoint is specified use that if not alive
if (!upstream) {
// If I am a root server, only ping other root servers -- roots don't ping "down"
// since that would just be a waste of bandwidth and could potentially cause route
// flapping in Cluster mode.
if (RR->topology->amRoot())
return;
// Check for network preferred relays, also considered 'upstream' and thus always
// pinged to keep links up. If they have stable addresses we will try them there.
for(std::vector< std::pair<Address,InetAddress> >::const_iterator r(_relays.begin());r!=_relays.end();++r) {
if (r->first == p->address()) {
if (r->second.ss_family == AF_INET)
@ -229,18 +236,22 @@ public:
// "Upstream" devices are roots and relays and get special treatment -- they stay alive
// forever and we try to keep (if available) both IPv4 and IPv6 channels open to them.
bool needToContactIndirect = true;
if (!p->doPingAndKeepalive(RR,_now,AF_INET)) {
if (p->doPingAndKeepalive(RR,_now,AF_INET)) {
needToContactIndirect = false;
} else {
if (stableEndpoint4) {
needToContactIndirect = false;
p->attemptToContactAt(RR,InetAddress(),stableEndpoint4,_now);
}
} else needToContactIndirect = false;
if (!p->doPingAndKeepalive(RR,_now,AF_INET6)) {
}
if (p->doPingAndKeepalive(RR,_now,AF_INET6)) {
needToContactIndirect = false;
} else {
if (stableEndpoint6) {
needToContactIndirect = false;
p->attemptToContactAt(RR,InetAddress(),stableEndpoint6,_now);
}
} else needToContactIndirect = false;
}
if (needToContactIndirect) {
// If this is an upstream and we have no stable endpoint for either IPv4 or IPv6,
@ -625,6 +636,18 @@ void Node::clusterHandleIncomingMessage(const void *msg,unsigned int len)
#endif
}
void Node::clusterStatus(ZT_ClusterStatus *cs)
{
if (!cs)
return;
#ifdef ZT_ENABLE_CLUSTER
if (RR->cluster)
RR->cluster->status(*cs);
else
#endif
memset(cs,0,sizeof(ZT_ClusterStatus));
}
/****************************************************************************/
/* Node methods used only within node/ */
/****************************************************************************/
@ -936,15 +959,6 @@ enum ZT_ResultCode ZT_Node_clusterInit(
}
}
/**
* Add a member to this cluster
*
* Calling this without having called clusterInit() will do nothing.
*
* @param node Node instance
* @param memberId Member ID (must be less than or equal to ZT_CLUSTER_MAX_MEMBERS)
* @return OK or error if clustering is disabled, ID invalid, etc.
*/
enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId)
{
try {
@ -954,14 +968,6 @@ enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId)
}
}
/**
* Remove a member from this cluster
*
* Calling this without having called clusterInit() will do nothing.
*
* @param node Node instance
* @param memberId Member ID to remove (nothing happens if not present)
*/
void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId)
{
try {
@ -969,18 +975,6 @@ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId)
} catch ( ... ) {}
}
/**
* Handle an incoming cluster state message
*
* The message itself contains cluster member IDs, and invalid or badly
* addressed messages will be silently discarded.
*
* Calling this without having called clusterInit() will do nothing.
*
* @param node Node instance
* @param msg Cluster message
* @param len Length of cluster message
*/
void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len)
{
try {
@ -988,6 +982,13 @@ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned
} catch ( ... ) {}
}
void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs)
{
try {
reinterpret_cast<ZeroTier::Node *>(node)->clusterStatus(cs);
} catch ( ... ) {}
}
void ZT_version(int *major,int *minor,int *revision,unsigned long *featureFlags)
{
if (major) *major = ZEROTIER_ONE_VERSION_MAJOR;

View File

@ -124,6 +124,7 @@ public:
ZT_ResultCode clusterAddMember(unsigned int memberId);
void clusterRemoveMember(unsigned int memberId);
void clusterHandleIncomingMessage(const void *msg,unsigned int len);
void clusterStatus(ZT_ClusterStatus *cs);
// Internal functions ------------------------------------------------------

View File

@ -94,7 +94,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
Mutex::Lock _l(_phy_m);
PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,scope)];
PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)];
if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) {
entry.mySurface = myPhysicalAddress;
@ -105,14 +105,15 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi
entry.ts = now;
TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str());
// Erase all entries (other than this one) for this scope to prevent thrashing
// Note: we should probably not use 'entry' after this
// Erase all entries in this scope that were not reported by this remote address to prevent 'thrashing'
// due to multiple reports of endpoint change.
// Don't use 'entry' after this since hash table gets modified.
{
Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy);
PhySurfaceKey *k = (PhySurfaceKey *)0;
PhySurfaceEntry *e = (PhySurfaceEntry *)0;
while (i.next(k,e)) {
if ((k->reporter != reporter)&&(k->scope == scope))
if ((k->reporterPhysicalAddress != reporterPhysicalAddress)&&(k->scope == scope))
_phy.erase(*k);
}
}

View File

@ -69,14 +69,14 @@ private:
struct PhySurfaceKey
{
Address reporter;
InetAddress reporterPhysicalAddress;
InetAddress::IpScope scope;
inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {}
PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {}
inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); }
inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(scope == k.scope)); }
PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {}
inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); }
inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); }
};
struct PhySurfaceEntry
{

View File

@ -35,14 +35,9 @@
namespace ZeroTier {
// Default World
#define ZT_DEFAULT_WORLD_LENGTH 494
static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x4f,0xdf,0xbf,0xfc,0xbb,0x6c,0x7e,0x15,0x67,0x85,0x1b,0xb4,0x65,0x04,0x01,0xaf,0x56,0xbf,0xe7,0x63,0x9d,0x77,0xef,0xa4,0x1e,0x61,0x53,0x88,0xcb,0x8d,0x78,0xe5,0x47,0x38,0x98,0x5a,0x6c,0x8a,0xdd,0xe6,0x9c,0x65,0xdf,0x1a,0x80,0x63,0xce,0x2e,0x4d,0x48,0x24,0x3d,0x68,0x87,0x96,0x13,0x89,0xba,0x25,0x6f,0xc9,0xb0,0x9f,0x20,0xc5,0x4c,0x51,0x7b,0x30,0xb7,0x5f,0xba,0xca,0xa4,0xc5,0x48,0xa3,0x15,0xab,0x2f,0x1d,0x64,0xe8,0x04,0x42,0xb3,0x1c,0x51,0x8b,0x2a,0x04,0x01,0xf8,0xe1,0x81,0xaf,0x60,0x2f,0x70,0x3e,0xcd,0x0b,0x21,0x38,0x19,0x62,0x02,0xbd,0x0e,0x33,0x1d,0x0a,0x7b,0xf1,0xec,0xad,0xef,0x54,0xb3,0x7b,0x17,0x84,0xaa,0xda,0x0a,0x85,0x5d,0x0b,0x1c,0x05,0x83,0xb9,0x0e,0x3e,0xe3,0xb4,0xd1,0x8b,0x5b,0x64,0xf7,0xcf,0xe1,0xff,0x5d,0xc2,0x2a,0xcf,0x60,0x7b,0x09,0xb4,0xa3,0x86,0x3c,0x5a,0x7e,0x31,0xa0,0xc7,0xb4,0x86,0xe3,0x41,0x33,0x04,0x7e,0x19,0x87,0x6a,0xba,0x00,0x2a,0x6e,0x2b,0x23,0x18,0x93,0x0f,0x60,0xeb,0x09,0x7f,0x70,0xd0,0xf4,0xb0,0x28,0xb2,0xcd,0x6d,0x3d,0x0c,0x63,0xc0,0x14,0xb9,0x03,0x9f,0xf3,0x53,0x90,0xe4,0x11,0x81,0xf2,0x16,0xfb,0x2e,0x6f,0xa8,0xd9,0x5c,0x1e,0xe9,0x66,0x71,0x56,0x41,0x19,0x05,0xc3,0xdc,0xcf,0xea,0x78,0xd8,0xc6,0xdf,0xaf,0xba,0x68,0x81,0x70,0xb3,0xfa,0x00,0x01,0x04,0xc6,0xc7,0x61,0xdc,0x27,0x09,0x88,0x41,0x40,0x8a,0x2e,0x00,0xbb,0x1d,0x31,0xf2,0xc3,0x23,0xe2,0x64,0xe9,0xe6,0x41,0x72,0xc1,0xa7,0x4f,0x77,0x89,0x95,0x55,0xed,0x10,0x75,0x1c,0xd5,0x6e,0x86,0x40,0x5c,0xde,0x11,0x8d,0x02,0xdf,0xfe,0x55,0x5d,0x46,0x2c,0xcf,0x6a,0x85,0xb5,0x63,0x1c,0x12,0x35,0x0c,0x8d,0x5d,0xc4,0x09,0xba,0x10,0xb9,0x02,0x5d,0x0f,0x44,0x5c,0xf4,0x49,0xd9,0x2b,0x1c,0x00,0x01,0x04,0x6b,0xbf,0x2e,0xd2,0x27,0x09,0x8a,0xcf,0x05,0x9f,0xe3,0x00,0x48,0x2f,0x6e,0xe5,0xdf,0xe9,0x02,0x31,0x9b,0x41,0x9d,0xe5,0xbd,0xc7,0x65,0x20,0x9c,0x0e,0xcd,0xa3,0x8c,0x4d,0x6e,0x4f,0xcf,0x0d,0x33,0x65,0x83,0x98,0xb4,0x52,0x7d,0xcd,0x22,0xf9,0x31,0x12,0xfb,0x9b,0xef,0xd0,0x2f,0xd7,0x8b,0xf7,0x26,0x1b,0x33,0x3f,0xc1,0x05,0xd1,0x92,0xa6,0x23,0xca,0x9e,0x50,0xfc,0x60,0xb3,0x74,0xa5,0x00,0x01,0x04,0xa2,0xf3,0x4d,0x6f,0x27,0x09,0x9d,0x21,0x90,0x39,0xf3,0x00,0x01,0xf0,0x92,0x2a,0x98,0xe3,0xb3,0x4e,0xbc,0xbf,0xf3,0x33,0x26,0x9d,0xc2,0x65,0xd7,0xa0,0x20,0xaa,0xb6,0x9d,0x72,0xbe,0x4d,0x4a,0xcc,0x9c,0x8c,0x92,0x94,0x78,0x57,0x71,0x25,0x6c,0xd1,0xd9,0x42,0xa9,0x0d,0x1b,0xd1,0xd2,0xdc,0xa3,0xea,0x84,0xef,0x7d,0x85,0xaf,0xe6,0x61,0x1f,0xb4,0x3f,0xf0,0xb7,0x41,0x26,0xd9,0x0a,0x6e,0x00,0x01,0x04,0x80,0xc7,0xc5,0xd9,0x27,0x09};
// ALICE-TEST
//#define ZT_DEFAULT_WORLD_LENGTH 257
//static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x50,0x81,0x2a,0x54,0x6f,0x72,0xb0,0x3b,0xbe,0x73,0xda,0xbd,0xfb,0x85,0x77,0x9f,0xc9,0x2e,0x17,0xc8,0x11,0x6e,0xda,0x61,0x80,0xd1,0x41,0xcb,0x7c,0x2d,0x2b,0xa4,0x34,0x75,0x19,0x64,0x20,0x80,0x0a,0x22,0x32,0xf2,0x01,0x6c,0xfe,0x79,0xa6,0x7d,0xec,0x10,0x7e,0x03,0xf3,0xa2,0xa0,0x19,0xc8,0x7c,0xfd,0x6c,0x56,0x52,0xa8,0xfb,0xdc,0xfb,0x93,0x81,0x3e,0x63,0x8b,0xb3,0xb6,0x72,0x45,0xa9,0x81,0x81,0xcc,0xea,0x7f,0x2f,0xd9,0x59,0xce,0xc8,0x51,0x12,0xc3,0xe3,0x44,0x76,0x54,0xed,0xe7,0x8d,0x34,0x0b,0x5d,0x10,0x3d,0x52,0x04,0x9b,0xe1,0xb2,0x36,0x51,0x75,0x14,0x30,0x53,0xe8,0x4b,0xe4,0x91,0x9a,0xed,0x99,0x56,0xa3,0x8d,0x5e,0x14,0xff,0x66,0xd8,0x4f,0xf7,0x3c,0x23,0xbe,0x02,0xbb,0x1e,0xb6,0x7e,0x07,0xfa,0x7c,0x7e,0x50,0xe8,0x40,0xf9,0x37,0x70,0x1a,0x75,0xcf,0x19,0xe6,0x83,0xe1,0x5c,0x20,0x1d,0x1e,0x5b,0xe5,0x6a,0xbe,0xe7,0xab,0xec,0x01,0xd6,0xdd,0xca,0x6a,0xb5,0x00,0x4e,0x76,0x12,0x07,0xd8,0xb4,0x20,0x0b,0xe4,0x4f,0x47,0x8e,0x3d,0xa1,0x48,0xc1,0x60,0x99,0x11,0x0e,0xe7,0x1b,0x64,0x58,0x6d,0xda,0x11,0x8e,0x40,0x22,0xab,0x63,0x68,0x2c,0xe1,0x37,0xda,0x8b,0xa8,0x17,0xfc,0x7f,0x73,0xaa,0x31,0x63,0xf2,0xe3,0x33,0x93,0x3e,0x29,0x94,0xc4,0x6b,0x4f,0x41,0x19,0x30,0x7b,0xe8,0x85,0x5a,0x72,0x00,0x01,0x04,0xa9,0x39,0x8f,0x68,0x27,0x09};
Topology::Topology(const RuntimeEnvironment *renv) :
RR(renv),
_amRoot(false)
@ -337,6 +332,21 @@ void Topology::clean(uint64_t now)
}
}
unsigned long Topology::countAlive() const
{
const uint64_t now = RR->node->now();
unsigned long cnt = 0;
Mutex::Lock _l(_lock);
Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers);
Address *a = (Address *)0;
SharedPtr<Peer> *p = (SharedPtr<Peer> *)0;
while (i.next(a,p)) {
if ((*p)->alive(now))
++cnt;
}
return cnt;
}
Identity Topology::_getIdentity(const Address &zta)
{
char p[128];
@ -358,16 +368,18 @@ void Topology::_setWorld(const World &newWorld)
_rootAddresses.clear();
_rootPeers.clear();
for(std::vector<World::Root>::const_iterator r(_world.roots().begin());r!=_world.roots().end();++r) {
if (r->identity == RR->identity)
_amRoot = true;
_rootAddresses.push_back(r->identity.address());
SharedPtr<Peer> *rp = _peers.get(r->identity.address());
if (rp) {
_rootPeers.push_back(*rp);
} else if (r->identity.address() != RR->identity.address()) {
SharedPtr<Peer> newrp(new Peer(RR->identity,r->identity));
_peers.set(r->identity.address(),newrp);
_rootPeers.push_back(newrp);
if (r->identity.address() == RR->identity.address()) {
_amRoot = true;
} else {
SharedPtr<Peer> *rp = _peers.get(r->identity.address());
if (rp) {
_rootPeers.push_back(*rp);
} else {
SharedPtr<Peer> newrp(new Peer(RR->identity,r->identity));
_peers.set(r->identity.address(),newrp);
_rootPeers.push_back(newrp);
}
}
}
}

View File

@ -192,6 +192,11 @@ public:
*/
void clean(uint64_t now);
/**
* @return Number of 'alive' peers
*/
unsigned long countAlive() const;
/**
* Apply a function or function object to all peers
*
@ -225,6 +230,11 @@ public:
return _peers.entries();
}
/**
* @return True if I am a root server in the current World
*/
inline bool amRoot() const throw() { return _amRoot; }
private:
Identity _getIdentity(const Address &zta);
void _setWorld(const World &newWorld);

View File

@ -26,5 +26,6 @@ OBJS=\
osdep/BackgroundResolver.o \
osdep/Http.o \
osdep/OSUtils.o \
service/ClusterGeoIpService.o \
service/ControlPlane.o \
service/OneService.o

View File

@ -64,6 +64,12 @@
#include <netinet/in.h>
#include <netinet/tcp.h>
#if defined(__linux__) || defined(linux) || defined(__LINUX__) || defined(__linux)
#ifndef IPV6_DONTFRAG
#define IPV6_DONTFRAG 62
#endif
#endif
#define ZT_PHY_SOCKFD_TYPE int
#define ZT_PHY_SOCKFD_NULL (-1)
#define ZT_PHY_SOCKFD_VALID(s) ((s) > -1)
@ -374,6 +380,9 @@ public:
f = 1; setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,(void *)&f,sizeof(f));
#ifdef IPV6_MTU_DISCOVER
f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_MTU_DISCOVER,&f,sizeof(f));
#endif
#ifdef IPV6_DONTFRAG
f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f));
#endif
}
f = 0; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));

View File

@ -0,0 +1,125 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2015 ZeroTier, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* ZeroTier may be used and distributed under the terms of the GPLv3, which
* are available at: http://www.gnu.org/licenses/gpl-3.0.html
*
* If you would like to embed ZeroTier into a commercial application or
* redistribute it in a modified binary form, please contact ZeroTier Networks
* LLC. Start here: http://www.zerotier.com/
*/
#ifndef ZT_CLUSTERDEFINITION_HPP
#define ZT_CLUSTERDEFINITION_HPP
#ifdef ZT_ENABLE_CLUSTER
#include <vector>
#include <algorithm>
#include "../node/Constants.hpp"
#include "../node/Utils.hpp"
#include "../osdep/OSUtils.hpp"
namespace ZeroTier {
/**
* Parser for cluster definition file
*/
class ClusterDefinition
{
public:
struct MemberDefinition
{
MemberDefinition() : id(0),x(0),y(0),z(0) { name[0] = (char)0; }
unsigned int id;
int x,y,z;
char name[256];
InetAddress clusterEndpoint;
std::vector<InetAddress> zeroTierEndpoints;
};
ClusterDefinition(uint64_t myAddress,const char *pathToClusterFile)
{
std::string cf;
if (!OSUtils::readFile(pathToClusterFile,cf))
return;
char myAddressStr[64];
Utils::snprintf(myAddressStr,sizeof(myAddressStr),"%.10llx",myAddress);
std::vector<std::string> lines(Utils::split(cf.c_str(),"\r\n","",""));
for(std::vector<std::string>::iterator l(lines.begin());l!=lines.end();++l) {
std::vector<std::string> fields(Utils::split(l->c_str()," \t","",""));
if ((fields.size() < 5)||(fields[0][0] == '#')||(fields[0] != myAddressStr))
continue;
int id = Utils::strToUInt(fields[1].c_str());
if ((id < 0)||(id > ZT_CLUSTER_MAX_MEMBERS))
continue;
MemberDefinition &md = _md[id];
md.id = (unsigned int)id;
if (fields.size() >= 6) {
std::vector<std::string> xyz(Utils::split(fields[5].c_str(),",","",""));
md.x = (xyz.size() > 0) ? Utils::strToInt(xyz[0].c_str()) : 0;
md.y = (xyz.size() > 1) ? Utils::strToInt(xyz[1].c_str()) : 0;
md.z = (xyz.size() > 2) ? Utils::strToInt(xyz[2].c_str()) : 0;
}
Utils::scopy(md.name,sizeof(md.name),fields[2].c_str());
md.clusterEndpoint.fromString(fields[3]);
if (!md.clusterEndpoint)
continue;
std::vector<std::string> zips(Utils::split(fields[4].c_str(),",","",""));
for(std::vector<std::string>::iterator zip(zips.begin());zip!=zips.end();++zip) {
InetAddress i;
i.fromString(*zip);
if (i)
md.zeroTierEndpoints.push_back(i);
}
_ids.push_back((unsigned int)id);
}
std::sort(_ids.begin(),_ids.end());
}
inline const MemberDefinition &operator[](unsigned int id) const throw() { return _md[id]; }
inline unsigned int size() const throw() { return (unsigned int)_ids.size(); }
inline const std::vector<unsigned int> &ids() const throw() { return _ids; }
inline std::vector<MemberDefinition> members() const
{
std::vector<MemberDefinition> m;
for(std::vector<unsigned int>::const_iterator i(_ids.begin());i!=_ids.end();++i)
m.push_back(_md[*i]);
return m;
}
private:
MemberDefinition _md[ZT_CLUSTER_MAX_MEMBERS];
std::vector<unsigned int> _ids;
};
} // namespace ZeroTier
#endif // ZT_ENABLE_CLUSTER
#endif

View File

@ -0,0 +1,196 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2015 ZeroTier, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* ZeroTier may be used and distributed under the terms of the GPLv3, which
* are available at: http://www.gnu.org/licenses/gpl-3.0.html
*
* If you would like to embed ZeroTier into a commercial application or
* redistribute it in a modified binary form, please contact ZeroTier Networks
* LLC. Start here: http://www.zerotier.com/
*/
#ifdef ZT_ENABLE_CLUSTER
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <signal.h>
#include <errno.h>
#include <iostream>
#include "ClusterGeoIpService.hpp"
#include "../node/Utils.hpp"
#include "../osdep/OSUtils.hpp"
#define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL (60 * 60 * 1000)
namespace ZeroTier {
ClusterGeoIpService::ClusterGeoIpService(const char *pathToExe) :
_pathToExe(pathToExe),
_sOutputFd(-1),
_sInputFd(-1),
_sPid(0),
_run(true)
{
_thread = Thread::start(this);
}
ClusterGeoIpService::~ClusterGeoIpService()
{
_run = false;
long p = _sPid;
if (p > 0) {
::kill(p,SIGTERM);
Thread::sleep(500);
::kill(p,SIGKILL);
}
Thread::join(_thread);
}
bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z)
{
InetAddress ipNoPort(ip);
ipNoPort.setPort(0); // we index cache by IP only
const uint64_t now = OSUtils::now();
bool r = false;
{
Mutex::Lock _l(_cache_m);
std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort));
if (c != _cache.end()) {
x = c->second.x;
y = c->second.y;
z = c->second.z;
if ((now - c->second.ts) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL)
return true;
else r = true; // return true but refresh as well
}
}
{
Mutex::Lock _l(_sOutputLock);
if (_sOutputFd >= 0) {
std::string ips(ipNoPort.toIpString());
ips.push_back('\n');
//fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str());
::write(_sOutputFd,ips.data(),ips.length());
}
}
return r;
}
void ClusterGeoIpService::threadMain()
throw()
{
char linebuf[65536];
char buf[65536];
long n,lineptr;
while (_run) {
{
Mutex::Lock _l(_sOutputLock);
_sOutputFd = -1;
_sInputFd = -1;
_sPid = 0;
int stdinfds[2] = { 0,0 }; // sub-process's stdin, our output
int stdoutfds[2] = { 0,0 }; // sub-process's stdout, our input
::pipe(stdinfds);
::pipe(stdoutfds);
long p = (long)::vfork();
if (p < 0) {
Thread::sleep(500);
continue;
} else if (p == 0) {
::close(stdinfds[1]);
::close(stdoutfds[0]);
::dup2(stdinfds[0],STDIN_FILENO);
::dup2(stdoutfds[1],STDOUT_FILENO);
::execl(_pathToExe.c_str(),_pathToExe.c_str(),(const char *)0);
::exit(1);
} else {
::close(stdinfds[0]);
::close(stdoutfds[1]);
_sOutputFd = stdinfds[1];
_sInputFd = stdoutfds[0];
_sPid = p;
}
}
lineptr = 0;
while (_run) {
n = ::read(_sInputFd,buf,sizeof(buf));
if (n <= 0) {
if (errno == EINTR)
continue;
else break;
}
for(long i=0;i<n;++i) {
if (lineptr > (long)sizeof(linebuf))
lineptr = 0;
if ((buf[i] == '\n')||(buf[i] == '\r')) {
linebuf[lineptr] = (char)0;
if (lineptr > 0) {
//fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf);
try {
std::vector<std::string> result(Utils::split(linebuf,",","",""));
if ((result.size() >= 7)&&(result[1] == "1")) {
InetAddress rip(result[0],0);
if ((rip.ss_family == AF_INET)||(rip.ss_family == AF_INET6)) {
_CE ce;
ce.ts = OSUtils::now();
ce.x = (int)::strtol(result[4].c_str(),(char **)0,10);
ce.y = (int)::strtol(result[5].c_str(),(char **)0,10);
ce.z = (int)::strtol(result[6].c_str(),(char **)0,10);
//fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z);
{
Mutex::Lock _l2(_cache_m);
_cache[rip] = ce;
}
}
}
} catch ( ... ) {}
}
lineptr = 0;
} else linebuf[lineptr++] = buf[i];
}
}
::close(_sOutputFd);
::close(_sInputFd);
::kill(_sPid,SIGTERM);
Thread::sleep(250);
::kill(_sPid,SIGKILL);
::waitpid(_sPid,(int *)0,0);
}
}
} // namespace ZeroTier
#endif // ZT_ENABLE_CLUSTER

View File

@ -0,0 +1,94 @@
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright (C) 2011-2015 ZeroTier, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* --
*
* ZeroTier may be used and distributed under the terms of the GPLv3, which
* are available at: http://www.gnu.org/licenses/gpl-3.0.html
*
* If you would like to embed ZeroTier into a commercial application or
* redistribute it in a modified binary form, please contact ZeroTier Networks
* LLC. Start here: http://www.zerotier.com/
*/
#ifndef ZT_CLUSTERGEOIPSERVICE_HPP
#define ZT_CLUSTERGEOIPSERVICE_HPP
#ifdef ZT_ENABLE_CLUSTER
#include <vector>
#include <map>
#include <string>
#include "../node/Constants.hpp"
#include "../node/InetAddress.hpp"
#include "../node/Mutex.hpp"
#include "../osdep/Thread.hpp"
namespace ZeroTier {
/**
* Runs the Cluster GeoIP service in the background and resolves geoIP queries
*/
class ClusterGeoIpService
{
public:
/**
* @param pathToExe Path to cluster geo-resolution service executable
*/
ClusterGeoIpService(const char *pathToExe);
~ClusterGeoIpService();
/**
* Attempt to locate an IP
*
* This returns true if x, y, and z are set. Otherwise it returns false
* and a geo-locate job is ordered in the background. This usually takes
* 500-1500ms to complete, after which time results will be available.
* If false is returned the supplied coordinate variables are unchanged.
*
* @param ip IPv4 or IPv6 address
* @param x Reference to variable to receive X
* @param y Reference to variable to receive Y
* @param z Reference to variable to receive Z
* @return True if coordinates were set
*/
bool locate(const InetAddress &ip,int &x,int &y,int &z);
void threadMain()
throw();
private:
const std::string _pathToExe;
int _sOutputFd;
int _sInputFd;
volatile long _sPid;
volatile bool _run;
Thread _thread;
Mutex _sOutputLock;
struct _CE { uint64_t ts; int x,y,z; };
std::map< InetAddress,_CE > _cache;
Mutex _cache_m;
};
} // namespace ZeroTier
#endif // ZT_ENABLE_CLUSTER
#endif

View File

@ -354,8 +354,38 @@ unsigned int ControlPlane::handleRequest(
if (ps[0] == "status") {
responseContentType = "application/json";
ZT_NodeStatus status;
_node->status(&status);
std::string clusterJson;
#ifdef ZT_ENABLE_CLUSTER
{
ZT_ClusterStatus cs;
_node->clusterStatus(&cs);
if (cs.clusterSize >= 1) {
char t[4096];
Utils::snprintf(t,sizeof(t),"{\n\t\t\"myId\": %u,\n\t\t\"clusterSize\": %u,\n\t\t\"members: [\n",cs.myId,cs.clusterSize);
clusterJson.append(t);
for(unsigned int i=0;i<cs.clusterSize;++i) {
Utils::snprintf(t,sizeof(t),"\t\t\t{\n\t\t\t\t\"id\": %u,\n\t\t\t\t\"msSinceLastHeartbeat\": %u,\n\t\t\t\t\"alive\": %s,\n\t\t\t\t\"x\": %d,\n\t\t\t\t\"y\": %d,\n\t\t\t\t\"z\": %d,\n\t\t\t\t\"load\": %llu\n\t\t\t\t\"peers\": %llu\n\t\t\t}%s",
cs.members[i].id,
cs.members[i].msSinceLastHeartbeat,
(cs.members[i].alive != 0) ? "true" : "false",
cs.members[i].x,
cs.members[i].y,
cs.members[i].z,
cs.members[i].load,
cs.members[i].peers,
(i == (cs.clusterSize - 1)) ? "," : "");
clusterJson.append(t);
}
clusterJson.append(" ]\n\t\t}");
}
}
#endif
Utils::snprintf(json,sizeof(json),
"{\n"
"\t\"address\": \"%.10llx\",\n"
@ -368,7 +398,8 @@ unsigned int ControlPlane::handleRequest(
"\t\"versionMinor\": %d,\n"
"\t\"versionRev\": %d,\n"
"\t\"version\": \"%d.%d.%d\",\n"
"\t\"clock\": %llu\n"
"\t\"clock\": %llu,\n"
"\t\"cluster\": %s\n"
"}\n",
status.address,
status.publicIdentity,
@ -380,7 +411,8 @@ unsigned int ControlPlane::handleRequest(
ZEROTIER_ONE_VERSION_MINOR,
ZEROTIER_ONE_VERSION_REVISION,
ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION,
(unsigned long long)OSUtils::now());
(unsigned long long)OSUtils::now(),
((clusterJson.length() > 0) ? clusterJson.c_str() : "null"));
responseBody = json;
scode = 200;
} else if (ps[0] == "config") {

View File

@ -58,6 +58,8 @@
#include "OneService.hpp"
#include "ControlPlane.hpp"
#include "ClusterGeoIpService.hpp"
#include "ClusterDefinition.hpp"
/**
* Uncomment to enable UDP breakage switch
@ -366,6 +368,11 @@ static int SnodeDataStorePutFunction(ZT_Node *node,void *uptr,const char *name,c
static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct sockaddr_storage *localAddr,const struct sockaddr_storage *addr,const void *data,unsigned int len);
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
#ifdef ZT_ENABLE_CLUSTER
static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len);
static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z);
#endif
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
static int ShttpOnMessageBegin(http_parser *parser);
@ -419,26 +426,32 @@ class OneServiceImpl : public OneService
{
public:
OneServiceImpl(const char *hp,unsigned int port) :
_homePath((hp) ? hp : "."),
_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY),
_homePath((hp) ? hp : ".")
,_tcpFallbackResolver(ZT_TCP_FALLBACK_RELAY)
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
_controller((SqliteNetworkController *)0),
,_controller((SqliteNetworkController *)0)
#endif
_phy(this,false,true),
_node((Node *)0),
_controlPlane((ControlPlane *)0),
_lastDirectReceiveFromGlobal(0),
_lastSendToGlobal(0),
_lastRestart(0),
_nextBackgroundTaskDeadline(0),
_tcpFallbackTunnel((TcpConnection *)0),
_termReason(ONE_STILL_RUNNING),
_port(0),
,_phy(this,false,true)
,_node((Node *)0)
,_controlPlane((ControlPlane *)0)
,_lastDirectReceiveFromGlobal(0)
,_lastSendToGlobal(0)
,_lastRestart(0)
,_nextBackgroundTaskDeadline(0)
,_tcpFallbackTunnel((TcpConnection *)0)
,_termReason(ONE_STILL_RUNNING)
,_port(0)
#ifdef ZT_USE_MINIUPNPC
_v4UpnpUdpSocket((PhySocket *)0),
_upnpClient((UPNPClient *)0),
,_v4UpnpUdpSocket((PhySocket *)0)
,_upnpClient((UPNPClient *)0)
#endif
_run(true)
#ifdef ZT_ENABLE_CLUSTER
,_clusterMessageSocket((PhySocket *)0)
,_clusterGeoIpService((ClusterGeoIpService *)0)
,_clusterDefinition((ClusterDefinition *)0)
,_clusterMemberId(0)
#endif
,_run(true)
{
const int portTrials = (port == 0) ? 256 : 1; // if port is 0, pick random
for(int k=0;k<portTrials;++k) {
@ -510,12 +523,19 @@ public:
_phy.close(_v6UdpSocket);
_phy.close(_v4TcpListenSocket);
_phy.close(_v6TcpListenSocket);
#ifdef ZT_ENABLE_CLUSTER
_phy.close(_clusterMessageSocket);
#endif
#ifdef ZT_USE_MINIUPNPC
_phy.close(_v4UpnpUdpSocket);
delete _upnpClient;
#endif
#ifdef ZT_ENABLE_NETWORK_CONTROLLER
delete _controller;
#endif
#ifdef ZT_ENABLE_CLUSTER
delete _clusterGeoIpService;
delete _clusterDefinition;
#endif
}
@ -556,6 +576,70 @@ public:
_node->setNetconfMaster((void *)_controller);
#endif
#ifdef ZT_ENABLE_CLUSTER
if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str())) {
_clusterDefinition = new ClusterDefinition(_node->address(),(_homePath + ZT_PATH_SEPARATOR_S + "cluster").c_str());
if (_clusterDefinition->size() > 0) {
std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
PhySocket *cs = _phy.udpBind(reinterpret_cast<const struct sockaddr *>(&(m->clusterEndpoint)));
if (cs) {
if (_clusterMessageSocket) {
_phy.close(_clusterMessageSocket,false);
_phy.close(cs,false);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = "Cluster: can't determine my cluster member ID: able to bind more than one cluster message socket IP/port!";
return _termReason;
}
_clusterMessageSocket = cs;
_clusterMemberId = m->id;
}
}
if (!_clusterMessageSocket) {
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = "Cluster: can't determine my cluster member ID: unable to bind to any cluster message socket IP/port.";
return _termReason;
}
if (OSUtils::fileExists((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str()))
_clusterGeoIpService = new ClusterGeoIpService((_homePath + ZT_PATH_SEPARATOR_S + "cluster-geo.exe").c_str());
const ClusterDefinition::MemberDefinition &me = (*_clusterDefinition)[_clusterMemberId];
InetAddress endpoints[255];
unsigned int numEndpoints = 0;
for(std::vector<InetAddress>::const_iterator i(me.zeroTierEndpoints.begin());i!=me.zeroTierEndpoints.end();++i)
endpoints[numEndpoints++] = *i;
if (_node->clusterInit(
_clusterMemberId,
reinterpret_cast<const struct sockaddr_storage *>(endpoints),
numEndpoints,
me.x,
me.y,
me.z,
&SclusterSendFunction,
this,
(_clusterGeoIpService) ? &SclusterGeoIpFunction : 0,
this) == ZT_RESULT_OK) {
std::vector<ClusterDefinition::MemberDefinition> members(_clusterDefinition->members());
for(std::vector<ClusterDefinition::MemberDefinition>::iterator m(members.begin());m!=members.end();++m) {
if (m->id != _clusterMemberId)
_node->clusterAddMember(m->id);
}
}
} else {
delete _clusterDefinition;
_clusterDefinition = (ClusterDefinition *)0;
}
}
#endif
_controlPlane = new ControlPlane(this,_node,(_homePath + ZT_PATH_SEPARATOR_S + "ui").c_str());
_controlPlane->addAuthToken(authToken.c_str());
@ -781,10 +865,18 @@ public:
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len)
{
#ifdef ZT_ENABLE_CLUSTER
if (sock == _clusterMessageSocket) {
_node->clusterHandleIncomingMessage(data,len);
return;
}
#endif
#ifdef ZT_BREAK_UDP
if (OSUtils::fileExists("/tmp/ZT_BREAK_UDP"))
return;
#endif
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
ZT_ResultCode rc = _node->processWirePacket(
@ -1303,7 +1395,6 @@ public:
_phy.close(tc->sock); // will call close handler, which deletes from _tcpConnections
}
private:
std::string _dataStorePrepPath(const char *name) const
{
std::string p(_homePath);
@ -1358,6 +1449,13 @@ private:
UPNPClient *_upnpClient;
#endif
#ifdef ZT_ENABLE_CLUSTER
PhySocket *_clusterMessageSocket;
ClusterGeoIpService *_clusterGeoIpService;
ClusterDefinition *_clusterDefinition;
unsigned int _clusterMemberId;
#endif
bool _run;
Mutex _run_m;
};
@ -1375,6 +1473,21 @@ static int SnodeWirePacketSendFunction(ZT_Node *node,void *uptr,const struct soc
static void SnodeVirtualNetworkFrameFunction(ZT_Node *node,void *uptr,uint64_t nwid,uint64_t sourceMac,uint64_t destMac,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->nodeVirtualNetworkFrameFunction(nwid,sourceMac,destMac,etherType,vlanId,data,len); }
#ifdef ZT_ENABLE_CLUSTER
static void SclusterSendFunction(void *uptr,unsigned int toMemberId,const void *data,unsigned int len)
{
OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
const ClusterDefinition::MemberDefinition &md = (*(impl->_clusterDefinition))[toMemberId];
if (md.clusterEndpoint)
impl->_phy.udpSend(impl->_clusterMessageSocket,reinterpret_cast<const struct sockaddr *>(&(md.clusterEndpoint)),data,len);
}
static int SclusterGeoIpFunction(void *uptr,const struct sockaddr_storage *addr,int *x,int *y,int *z)
{
OneServiceImpl *const impl = reinterpret_cast<OneServiceImpl *>(uptr);
return (int)(impl->_clusterGeoIpService->locate(*(reinterpret_cast<const InetAddress *>(addr)),*x,*y,*z));
}
#endif
static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
{ reinterpret_cast<OneServiceImpl *>(uptr)->tapFrameHandler(nwid,from,to,etherType,vlanId,data,len); }

View File

@ -43,6 +43,9 @@ namespace ZeroTier {
* periodically checked and updates are automatically downloaded, verified
* against a built-in list of update signing keys, and installed. This is
* only supported for certain platforms.
*
* If built with ZT_ENABLE_CLUSTER, a 'cluster' file is checked and if
* present is read to determine the identity of other cluster members.
*/
class OneService
{

View File

@ -133,13 +133,35 @@ int main(int argc,char **argv)
std::sort(roots.back().stableEndpoints.begin(),roots.back().stableEndpoints.end());
#endif
// ALICE TEST
// NOTE -- these are temporary test identities -- this is not yet the 'real' network.
// (but these are the real nodes)
// Alice -- global geo-clustered root #1
roots.push_back(World::Root());
roots.back().identity = Identity("d6ddca6ab5:0:4e761207d8b4200be44f478e3da148c16099110ee71b64586dda118e4022ab63682ce137da8ba817fc7f73aa3163f2e333933e2994c46b4f4119307be8855a72");
roots.back().stableEndpoints.push_back(InetAddress("169.57.143.104/9993"));
std::sort(roots.back().stableEndpoints.begin(),roots.back().stableEndpoints.end());
roots.back().stableEndpoints.push_back(InetAddress("188.166.94.177/9993")); // Amsterdam IPv4
roots.back().stableEndpoints.push_back(InetAddress("2a03:b0c0:2:d0::7d:1/9993")); // Amsterdam IPv6
roots.back().stableEndpoints.push_back(InetAddress("159.203.97.171/9993")); // New York IPv4
roots.back().stableEndpoints.push_back(InetAddress("2604:a880:800:a1::54:6001/9993 ")); // New York IPv6
roots.back().stableEndpoints.push_back(InetAddress("169.57.143.104/9993")); // Sao Paolo IPv4
roots.back().stableEndpoints.push_back(InetAddress("2607:f0d0:1d01:57::2/9993")); // Sao Paolo IPv6
roots.back().stableEndpoints.push_back(InetAddress("104.238.182.83/9993")); // San Francisco IPv4
roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:ac00:809:5400:ff:fe15:f3f4/9993")); // San Francisco IPv6
roots.back().stableEndpoints.push_back(InetAddress("128.199.182.9/9993")); // Singapore IPv4
roots.back().stableEndpoints.push_back(InetAddress("2400:6180:0:d0::1b:1001/9993")); // Singapore IPv6
std::sort(roots.begin(),roots.end());
// Bob -- global geo-clustered root #2
roots.back().identity = Identity("16ebbd6c5d:0:47d39bca9d0a5cf70148e39f6c45199e17e0e32e4e46cac01ae5bcb21224137b097f40bdd982a921c3aabdcb9ada8b4f2bb0593753bfdb21cf12eac28c8d9042");
roots.back().stableEndpoints.push_back(InetAddress("45.33.4.67/9993")); // Dallas IPv4
roots.back().stableEndpoints.push_back(InetAddress("2600:3c00::f03c:91ff:fe67:b704/9993")); // Dallas IPv6
roots.back().stableEndpoints.push_back(InetAddress("139.162.157.243/9993")); // Frankfurt (Germany) IPv4
roots.back().stableEndpoints.push_back(InetAddress("2a01:7e01::f03c:91ff:fe67:3ffd/9993")); // Frankfurt (Germany) IPv6
roots.back().stableEndpoints.push_back(InetAddress("45.32.246.179/9993")); // Sydney IPv4
roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:5800:8bf8:5400:ff:fe15:b39a/9993")); // Sydney IPv6
roots.back().stableEndpoints.push_back(InetAddress("45.32.248.87/9993")); // Tokyo IPv4
roots.back().stableEndpoints.push_back(InetAddress("2001:19f0:7000:9bc9:5400:00ff:fe15:c4f5/9993")); // Tokyo IPv6
roots.back().stableEndpoints.push_back(InetAddress("159.203.2.154/9993")); // Toronto IPv4
roots.back().stableEndpoints.push_back(InetAddress("2604:a880:cad:d0::26:7001/9993")); // Toronto IPv6
const uint64_t id = ZT_WORLD_ID_EARTH;
const uint64_t ts = OSUtils::now();