2015-10-14 21:12:12 +00:00
/*
* ZeroTier One - Network Virtualization Everywhere
* Copyright ( C ) 2011 - 2015 ZeroTier , Inc .
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation , either version 3 of the License , or
* ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
*
* You should have received a copy of the GNU General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*
* - -
*
* ZeroTier may be used and distributed under the terms of the GPLv3 , which
* are available at : http : //www.gnu.org/licenses/gpl-3.0.html
*
* If you would like to embed ZeroTier into a commercial application or
* redistribute it in a modified binary form , please contact ZeroTier Networks
* LLC . Start here : http : //www.zerotier.com/
*/
# ifdef ZT_ENABLE_CLUSTER
# include <stdint.h>
# include <stdio.h>
# include <stdlib.h>
# include <string.h>
2015-10-20 22:27:53 +00:00
# include <math.h>
2015-10-14 21:12:12 +00:00
# include <algorithm>
# include <utility>
2015-10-20 22:27:53 +00:00
# include "../version.h"
2015-10-14 21:12:12 +00:00
# include "Cluster.hpp"
# include "RuntimeEnvironment.hpp"
# include "MulticastGroup.hpp"
# include "CertificateOfMembership.hpp"
# include "Salsa20.hpp"
# include "Poly1305.hpp"
# include "Packet.hpp"
2015-10-20 22:27:53 +00:00
# include "Identity.hpp"
2015-10-14 21:12:12 +00:00
# include "Peer.hpp"
# include "Switch.hpp"
# include "Node.hpp"
namespace ZeroTier {
2015-10-20 22:27:53 +00:00
static inline double _dist3d ( int x1 , int y1 , int z1 , int x2 , int y2 , int z2 )
throw ( )
{
double dx = ( ( double ) x2 - ( double ) x1 ) ;
double dy = ( ( double ) y2 - ( double ) y1 ) ;
double dz = ( ( double ) z2 - ( double ) z1 ) ;
return sqrt ( ( dx * dx ) + ( dy * dy ) + ( dz * dz ) ) ;
}
Cluster : : Cluster (
const RuntimeEnvironment * renv ,
uint16_t id ,
const std : : vector < InetAddress > & zeroTierPhysicalEndpoints ,
int32_t x ,
int32_t y ,
int32_t z ,
void ( * sendFunction ) ( void * , unsigned int , const void * , unsigned int ) ,
void * sendFunctionArg ,
int ( * addressToLocationFunction ) ( void * , const struct sockaddr_storage * , int * , int * , int * ) ,
void * addressToLocationFunctionArg ) :
2015-10-14 21:12:12 +00:00
RR ( renv ) ,
_sendFunction ( sendFunction ) ,
2015-10-20 22:27:53 +00:00
_sendFunctionArg ( sendFunctionArg ) ,
_addressToLocationFunction ( addressToLocationFunction ) ,
_addressToLocationFunctionArg ( addressToLocationFunctionArg ) ,
2015-10-14 21:12:12 +00:00
_x ( x ) ,
_y ( y ) ,
_z ( z ) ,
2015-10-19 23:18:57 +00:00
_id ( id ) ,
2015-10-20 22:27:53 +00:00
_zeroTierPhysicalEndpoints ( zeroTierPhysicalEndpoints ) ,
_members ( new _Member [ ZT_CLUSTER_MAX_MEMBERS ] )
2015-10-14 21:12:12 +00:00
{
uint16_t stmp [ ZT_SHA512_DIGEST_LEN / sizeof ( uint16_t ) ] ;
// Generate master secret by hashing the secret from our Identity key pair
RR - > identity . sha512PrivateKey ( _masterSecret ) ;
// Generate our inbound message key, which is the master secret XORed with our ID and hashed twice
memcpy ( stmp , _masterSecret , sizeof ( stmp ) ) ;
stmp [ 0 ] ^ = Utils : : hton ( id ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
memcpy ( _key , stmp , sizeof ( _key ) ) ;
Utils : : burn ( stmp , sizeof ( stmp ) ) ;
}
Cluster : : ~ Cluster ( )
{
Utils : : burn ( _masterSecret , sizeof ( _masterSecret ) ) ;
Utils : : burn ( _key , sizeof ( _key ) ) ;
2015-10-19 23:18:57 +00:00
delete [ ] _members ;
2015-10-14 21:12:12 +00:00
}
void Cluster : : handleIncomingStateMessage ( const void * msg , unsigned int len )
{
Buffer < ZT_CLUSTER_MAX_MESSAGE_LENGTH > dmsg ;
{
// FORMAT: <[16] iv><[8] MAC><... data>
if ( ( len < 24 ) | | ( len > ZT_CLUSTER_MAX_MESSAGE_LENGTH ) )
return ;
// 16-byte IV: first 8 bytes XORed with key, last 8 bytes used as Salsa20 64-bit IV
char keytmp [ 32 ] ;
memcpy ( keytmp , _key , 32 ) ;
for ( int i = 0 ; i < 8 ; + + i )
keytmp [ i ] ^ = reinterpret_cast < const char * > ( msg ) [ i ] ;
Salsa20 s20 ( keytmp , 256 , reinterpret_cast < const char * > ( msg ) + 8 ) ;
Utils : : burn ( keytmp , sizeof ( keytmp ) ) ;
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey [ ZT_POLY1305_KEY_LEN ] ;
memset ( polykey , 0 , sizeof ( polykey ) ) ;
s20 . encrypt12 ( polykey , polykey , sizeof ( polykey ) ) ;
// Compute 16-byte MAC
char mac [ ZT_POLY1305_MAC_LEN ] ;
Poly1305 : : compute ( mac , reinterpret_cast < const char * > ( msg ) + 24 , len - 24 , polykey ) ;
// Check first 8 bytes of MAC against 64-bit MAC in stream
if ( ! Utils : : secureEq ( mac , reinterpret_cast < const char * > ( msg ) + 16 , 8 ) )
return ;
// Decrypt!
2015-10-14 22:49:41 +00:00
dmsg . setSize ( len - 24 ) ;
s20 . decrypt12 ( reinterpret_cast < const char * > ( msg ) + 24 , const_cast < void * > ( dmsg . data ( ) ) , dmsg . size ( ) ) ;
2015-10-14 21:12:12 +00:00
}
2015-10-20 22:27:53 +00:00
if ( dmsg . size ( ) < 4 )
2015-10-14 21:12:12 +00:00
return ;
const uint16_t fromMemberId = dmsg . at < uint16_t > ( 0 ) ;
unsigned int ptr = 2 ;
2015-10-23 18:51:18 +00:00
if ( fromMemberId = = _id ) // sanity check: we don't talk to ourselves
2015-10-20 22:27:53 +00:00
return ;
const uint16_t toMemberId = dmsg . at < uint16_t > ( ptr ) ;
ptr + = 2 ;
2015-10-23 18:51:18 +00:00
if ( toMemberId ! = _id ) // sanity check: message not for us?
2015-10-20 22:27:53 +00:00
return ;
2015-10-14 21:12:12 +00:00
2015-10-23 18:51:18 +00:00
{ // make sure sender is actually considered a member
Mutex : : Lock _l3 ( _memberIds_m ) ;
if ( std : : find ( _memberIds . begin ( ) , _memberIds . end ( ) , fromMemberId ) = = _memberIds . end ( ) )
return ;
}
{
_Member & m = _members [ fromMemberId ] ;
Mutex : : Lock mlck ( m . lock ) ;
try {
while ( ptr < dmsg . size ( ) ) {
const unsigned int mlen = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
const unsigned int nextPtr = ptr + mlen ;
if ( nextPtr > dmsg . size ( ) )
break ;
int mtype = - 1 ;
try {
switch ( ( StateMessageType ) ( mtype = ( int ) dmsg [ ptr + + ] ) ) {
default :
break ;
case STATE_MESSAGE_ALIVE : {
ptr + = 7 ; // skip version stuff, not used yet
m . x = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . y = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . z = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
ptr + = 8 ; // skip local clock, not used
m . load = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
ptr + = 8 ; // skip flags, unused
2015-10-20 23:48:49 +00:00
# ifdef ZT_TRACE
2015-10-23 18:51:18 +00:00
std : : string addrs ;
2015-10-20 23:48:49 +00:00
# endif
2015-10-23 18:51:18 +00:00
unsigned int physicalAddressCount = dmsg [ ptr + + ] ;
m . zeroTierPhysicalEndpoints . clear ( ) ;
for ( unsigned int i = 0 ; i < physicalAddressCount ; + + i ) {
m . zeroTierPhysicalEndpoints . push_back ( InetAddress ( ) ) ;
ptr + = m . zeroTierPhysicalEndpoints . back ( ) . deserialize ( dmsg , ptr ) ;
if ( ! ( m . zeroTierPhysicalEndpoints . back ( ) ) ) {
m . zeroTierPhysicalEndpoints . pop_back ( ) ;
}
2015-10-20 23:48:49 +00:00
# ifdef ZT_TRACE
2015-10-23 18:51:18 +00:00
else {
if ( addrs . length ( ) > 0 )
addrs . push_back ( ' , ' ) ;
addrs . append ( m . zeroTierPhysicalEndpoints . back ( ) . toString ( ) ) ;
}
2015-10-20 23:48:49 +00:00
# endif
2015-10-23 18:51:18 +00:00
}
m . lastReceivedAliveAnnouncement = RR - > node - > now ( ) ;
2015-10-20 23:48:49 +00:00
# ifdef ZT_TRACE
2015-10-23 20:03:34 +00:00
TRACE ( " [%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s " , ( unsigned int ) fromMemberId , m . x , m . y , m . z , addrs . c_str ( ) ) ;
2015-10-20 23:48:49 +00:00
# endif
2015-10-23 18:51:18 +00:00
} break ;
case STATE_MESSAGE_HAVE_PEER : {
try {
Identity id ;
ptr + = id . deserialize ( dmsg , ptr ) ;
if ( id ) {
RR - > topology - > saveIdentity ( id ) ;
{ // Add or update peer affinity entry
_PeerAffinity pa ( id . address ( ) , fromMemberId , RR - > node - > now ( ) ) ;
Mutex : : Lock _l2 ( _peerAffinities_m ) ;
std : : vector < _PeerAffinity > : : iterator i ( std : : lower_bound ( _peerAffinities . begin ( ) , _peerAffinities . end ( ) , pa ) ) ; // O(log(n))
if ( ( i ! = _peerAffinities . end ( ) ) & & ( i - > key = = pa . key ) ) {
i - > timestamp = pa . timestamp ;
} else {
_peerAffinities . push_back ( pa ) ;
std : : sort ( _peerAffinities . begin ( ) , _peerAffinities . end ( ) ) ; // probably a more efficient way to insert but okay for now
}
}
TRACE ( " [%u] has %s " , ( unsigned int ) fromMemberId , id . address ( ) . toString ( ) . c_str ( ) ) ;
}
} catch ( . . . ) {
// ignore invalid identities
}
} break ;
case STATE_MESSAGE_MULTICAST_LIKE : {
const uint64_t nwid = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
const Address address ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const MAC mac ( dmsg . field ( ptr , 6 ) , 6 ) ; ptr + = 6 ;
const uint32_t adi = dmsg . at < uint32_t > ( ptr ) ; ptr + = 4 ;
RR - > mc - > add ( RR - > node - > now ( ) , nwid , MulticastGroup ( mac , adi ) , address ) ;
TRACE ( " [%u] %s likes %s/%u on %.16llu " , ( unsigned int ) fromMemberId , address . toString ( ) . c_str ( ) , mac . toString ( ) . c_str ( ) , ( unsigned int ) adi , nwid ) ;
} break ;
case STATE_MESSAGE_COM : {
CertificateOfMembership com ;
ptr + = com . deserialize ( dmsg , ptr ) ;
if ( com ) {
TRACE ( " [%u] COM for %s on %.16llu rev %llu " , ( unsigned int ) fromMemberId , com . issuedTo ( ) . toString ( ) . c_str ( ) , com . networkId ( ) , com . revision ( ) ) ;
}
} break ;
case STATE_MESSAGE_RELAY : {
const unsigned int numRemotePeerPaths = dmsg [ ptr + + ] ;
InetAddress remotePeerPaths [ 256 ] ; // size is 8-bit, so 256 is max
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i )
ptr + = remotePeerPaths [ i ] . deserialize ( dmsg , ptr ) ;
const unsigned int packetLen = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
const void * packet = ( const void * ) dmsg . field ( ptr , packetLen ) ; ptr + = packetLen ;
if ( packetLen > = ZT_PROTO_MIN_FRAGMENT_LENGTH ) { // ignore anything too short to contain a dest address
const Address destinationAddress ( reinterpret_cast < const char * > ( packet ) + 8 , ZT_ADDRESS_LENGTH ) ;
TRACE ( " [%u] relay %u bytes to %s (%u remote paths included) " , ( unsigned int ) fromMemberId , packetLen , destinationAddress . toString ( ) . c_str ( ) , numRemotePeerPaths ) ;
SharedPtr < Peer > destinationPeer ( RR - > topology - > getPeer ( destinationAddress ) ) ;
if ( destinationPeer ) {
if (
( destinationPeer - > send ( RR , packet , packetLen , RR - > node - > now ( ) ) ) & &
( numRemotePeerPaths > 0 ) & &
( packetLen > = 18 ) & &
( reinterpret_cast < const unsigned char * > ( packet ) [ ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR ] = = ZT_PACKET_FRAGMENT_INDICATOR )
) {
// If remote peer paths were sent with this relayed packet, we do
// RENDEZVOUS. It's handled here for cluster-relayed packets since
// we don't have both Peer records so this is a different path.
const Address remotePeerAddress ( reinterpret_cast < const char * > ( packet ) + 13 , ZT_ADDRESS_LENGTH ) ;
InetAddress bestDestV4 , bestDestV6 ;
destinationPeer - > getBestActiveAddresses ( RR - > node - > now ( ) , bestDestV4 , bestDestV6 ) ;
InetAddress bestRemoteV4 , bestRemoteV6 ;
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i ) {
if ( ( bestRemoteV4 ) & & ( bestRemoteV6 ) )
2015-10-14 21:12:12 +00:00
break ;
2015-10-23 18:51:18 +00:00
switch ( remotePeerPaths [ i ] . ss_family ) {
case AF_INET :
if ( ! bestRemoteV4 )
bestRemoteV4 = remotePeerPaths [ i ] ;
break ;
case AF_INET6 :
if ( ! bestRemoteV6 )
bestRemoteV6 = remotePeerPaths [ i ] ;
break ;
}
2015-10-14 21:12:12 +00:00
}
2015-10-23 18:51:18 +00:00
Packet rendezvousForDest ( destinationAddress , RR - > identity . address ( ) , Packet : : VERB_RENDEZVOUS ) ;
rendezvousForDest . append ( ( uint8_t ) 0 ) ;
remotePeerAddress . appendTo ( rendezvousForDest ) ;
Buffer < 2048 > rendezvousForOtherEnd ;
remotePeerAddress . appendTo ( rendezvousForOtherEnd ) ;
rendezvousForOtherEnd . append ( ( uint8_t ) Packet : : VERB_RENDEZVOUS ) ;
const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd . size ( ) ;
rendezvousForOtherEnd . addSize ( 2 ) ; // space for actual packet payload length
rendezvousForOtherEnd . append ( ( uint8_t ) 0 ) ; // flags == 0
destinationAddress . appendTo ( rendezvousForOtherEnd ) ;
bool haveMatch = false ;
if ( ( bestDestV6 ) & & ( bestRemoteV6 ) ) {
haveMatch = true ;
rendezvousForDest . append ( ( uint16_t ) bestRemoteV6 . port ( ) ) ;
rendezvousForDest . append ( ( uint8_t ) 16 ) ;
rendezvousForDest . append ( bestRemoteV6 . rawIpData ( ) , 16 ) ;
rendezvousForOtherEnd . append ( ( uint16_t ) bestDestV6 . port ( ) ) ;
rendezvousForOtherEnd . append ( ( uint8_t ) 16 ) ;
rendezvousForOtherEnd . append ( bestDestV6 . rawIpData ( ) , 16 ) ;
rendezvousForOtherEnd . setAt < uint16_t > ( rendezvousForOtherEndPayloadSizePtr , ( uint16_t ) ( 9 + 16 ) ) ;
} else if ( ( bestDestV4 ) & & ( bestRemoteV4 ) ) {
haveMatch = true ;
rendezvousForDest . append ( ( uint16_t ) bestRemoteV4 . port ( ) ) ;
rendezvousForDest . append ( ( uint8_t ) 4 ) ;
rendezvousForDest . append ( bestRemoteV4 . rawIpData ( ) , 4 ) ;
rendezvousForOtherEnd . append ( ( uint16_t ) bestDestV4 . port ( ) ) ;
rendezvousForOtherEnd . append ( ( uint8_t ) 4 ) ;
rendezvousForOtherEnd . append ( bestDestV4 . rawIpData ( ) , 4 ) ;
rendezvousForOtherEnd . setAt < uint16_t > ( rendezvousForOtherEndPayloadSizePtr , ( uint16_t ) ( 9 + 4 ) ) ;
}
2015-10-14 21:12:12 +00:00
2015-10-23 18:51:18 +00:00
if ( haveMatch ) {
_send ( fromMemberId , STATE_MESSAGE_PROXY_SEND , rendezvousForOtherEnd . data ( ) , rendezvousForOtherEnd . size ( ) ) ;
RR - > sw - > send ( rendezvousForDest , true , 0 ) ;
}
2015-10-14 21:12:12 +00:00
}
}
}
2015-10-23 18:51:18 +00:00
} break ;
case STATE_MESSAGE_PROXY_SEND : {
const Address rcpt ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ;
const Packet : : Verb verb = ( Packet : : Verb ) dmsg [ ptr + + ] ;
const unsigned int len = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
Packet outp ( rcpt , RR - > identity . address ( ) , verb ) ;
outp . append ( dmsg . field ( ptr , len ) , len ) ;
RR - > sw - > send ( outp , true , 0 ) ;
TRACE ( " [%u] proxy send %s to %s length %u " , ( unsigned int ) fromMemberId , Packet : : verbString ( verb ) , rcpt . toString ( ) . c_str ( ) , len ) ;
} break ;
}
} catch ( . . . ) {
TRACE ( " invalid message of size %u type %d (inner decode), discarding " , mlen , mtype ) ;
// drop invalids
2015-10-14 21:12:12 +00:00
}
2015-10-23 18:51:18 +00:00
ptr = nextPtr ;
}
} catch ( . . . ) {
TRACE ( " invalid message (outer loop), discarding " ) ;
// drop invalids
2015-10-14 21:12:12 +00:00
}
}
}
2015-10-21 00:22:53 +00:00
bool Cluster : : sendViaCluster ( const Address & fromPeerAddress , const Address & toPeerAddress , const void * data , unsigned int len )
{
if ( len > 16384 ) // sanity check
return false ;
uint64_t mostRecentTimestamp = 0 ;
uint16_t canHasPeer = 0 ;
{ // Anyone got this peer?
Mutex : : Lock _l2 ( _peerAffinities_m ) ;
std : : vector < _PeerAffinity > : : iterator i ( std : : lower_bound ( _peerAffinities . begin ( ) , _peerAffinities . end ( ) , _PeerAffinity ( toPeerAddress , 0 , 0 ) ) ) ; // O(log(n))
while ( ( i ! = _peerAffinities . end ( ) ) & & ( i - > address ( ) = = toPeerAddress ) ) {
uint16_t mid = i - > clusterMemberId ( ) ;
if ( ( mid ! = _id ) & & ( i - > timestamp > mostRecentTimestamp ) ) {
mostRecentTimestamp = i - > timestamp ;
canHasPeer = mid ;
}
}
}
const uint64_t now = RR - > node - > now ( ) ;
if ( ( now - mostRecentTimestamp ) < ZT_PEER_ACTIVITY_TIMEOUT ) {
Buffer < 16384 > buf ;
InetAddress v4 , v6 ;
if ( fromPeerAddress ) {
SharedPtr < Peer > fromPeer ( RR - > topology - > getPeer ( fromPeerAddress ) ) ;
if ( fromPeer )
fromPeer - > getBestActiveAddresses ( now , v4 , v6 ) ;
}
buf . append ( ( uint8_t ) ( ( v4 ) ? ( ( v6 ) ? 2 : 1 ) : ( ( v6 ) ? 1 : 0 ) ) ) ;
if ( v4 )
v4 . serialize ( buf ) ;
if ( v6 )
v6 . serialize ( buf ) ;
buf . append ( ( uint16_t ) len ) ;
buf . append ( data , len ) ;
{
Mutex : : Lock _l2 ( _members [ canHasPeer ] . lock ) ;
_send ( canHasPeer , STATE_MESSAGE_RELAY , buf . data ( ) , buf . size ( ) ) ;
}
2015-10-21 00:27:57 +00:00
2015-10-23 20:03:34 +00:00
TRACE ( " sendViaCluster(): relaying %u bytes from %s to %s by way of %u " , len , fromPeerAddress . toString ( ) . c_str ( ) , toPeerAddress . toString ( ) . c_str ( ) , ( unsigned int ) canHasPeer ) ;
2015-10-21 00:27:57 +00:00
return true ;
2015-10-23 20:03:34 +00:00
} else {
TRACE ( " sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it! " , len , fromPeerAddress . toString ( ) . c_str ( ) , toPeerAddress . toString ( ) . c_str ( ) ) ;
return false ;
2015-10-21 00:22:53 +00:00
}
}
2015-10-20 22:27:53 +00:00
void Cluster : : replicateHavePeer ( const Identity & peerId )
2015-10-14 21:12:12 +00:00
{
2015-10-20 23:24:21 +00:00
{ // Use peer affinity table to track our own last announce time for peers
_PeerAffinity pa ( peerId . address ( ) , _id , RR - > node - > now ( ) ) ;
Mutex : : Lock _l2 ( _peerAffinities_m ) ;
std : : vector < _PeerAffinity > : : iterator i ( std : : lower_bound ( _peerAffinities . begin ( ) , _peerAffinities . end ( ) , pa ) ) ; // O(log(n))
if ( ( i ! = _peerAffinities . end ( ) ) & & ( i - > key = = pa . key ) ) {
if ( ( pa . timestamp - i - > timestamp ) > = ZT_CLUSTER_HAVE_PEER_ANNOUNCE_PERIOD ) {
i - > timestamp = pa . timestamp ;
// continue to announcement
} else {
// we've already announced this peer recently, so skip
return ;
}
} else {
_peerAffinities . push_back ( pa ) ;
std : : sort ( _peerAffinities . begin ( ) , _peerAffinities . end ( ) ) ; // probably a more efficient way to insert but okay for now
// continue to announcement
}
}
// announcement
Buffer < 4096 > buf ;
peerId . serialize ( buf , false ) ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , STATE_MESSAGE_HAVE_PEER , buf . data ( ) , buf . size ( ) ) ;
}
}
2015-10-14 21:12:12 +00:00
}
void Cluster : : replicateMulticastLike ( uint64_t nwid , const Address & peerAddress , const MulticastGroup & group )
{
2015-10-23 21:50:07 +00:00
Buffer < 2048 > buf ;
2015-10-20 23:24:21 +00:00
buf . append ( ( uint64_t ) nwid ) ;
peerAddress . appendTo ( buf ) ;
group . mac ( ) . appendTo ( buf ) ;
buf . append ( ( uint32_t ) group . adi ( ) ) ;
2015-10-23 21:50:07 +00:00
TRACE ( " replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members " , peerAddress . toString ( ) . c_str ( ) , nwid , group . mac ( ) . toString ( ) . c_str ( ) , ( unsigned int ) group . adi ( ) ) ;
2015-10-20 23:24:21 +00:00
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , STATE_MESSAGE_MULTICAST_LIKE , buf . data ( ) , buf . size ( ) ) ;
}
}
2015-10-14 21:12:12 +00:00
}
void Cluster : : replicateCertificateOfNetworkMembership ( const CertificateOfMembership & com )
{
2015-10-23 21:50:07 +00:00
Buffer < 2048 > buf ;
2015-10-20 23:24:21 +00:00
com . serialize ( buf ) ;
2015-10-23 21:50:07 +00:00
TRACE ( " replicating %s COM for %.16llx to all members " , com . issuedTo ( ) . toString ( ) . c_str ( ) , com . networkId ( ) ) ;
2015-10-20 23:24:21 +00:00
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , STATE_MESSAGE_COM , buf . data ( ) , buf . size ( ) ) ;
}
}
2015-10-14 21:12:12 +00:00
}
void Cluster : : doPeriodicTasks ( )
{
2015-10-20 22:27:53 +00:00
const uint64_t now = RR - > node - > now ( ) ;
2015-10-14 21:12:12 +00:00
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
2015-10-20 22:27:53 +00:00
if ( ( now - _members [ * mid ] . lastAnnouncedAliveTo ) > = ( ( ZT_CLUSTER_TIMEOUT / 2 ) - 1000 ) ) {
Buffer < 2048 > alive ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_MAJOR ) ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_MINOR ) ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_REVISION ) ;
alive . append ( ( uint8_t ) ZT_PROTO_VERSION ) ;
if ( _addressToLocationFunction ) {
alive . append ( ( int32_t ) _x ) ;
alive . append ( ( int32_t ) _y ) ;
alive . append ( ( int32_t ) _z ) ;
} else {
alive . append ( ( int32_t ) 0 ) ;
alive . append ( ( int32_t ) 0 ) ;
alive . append ( ( int32_t ) 0 ) ;
}
alive . append ( ( uint64_t ) now ) ;
alive . append ( ( uint64_t ) 0 ) ; // TODO: compute and send load average
alive . append ( ( uint64_t ) 0 ) ; // unused/reserved flags
alive . append ( ( uint8_t ) _zeroTierPhysicalEndpoints . size ( ) ) ;
for ( std : : vector < InetAddress > : : const_iterator pe ( _zeroTierPhysicalEndpoints . begin ( ) ) ; pe ! = _zeroTierPhysicalEndpoints . end ( ) ; + + pe )
pe - > serialize ( alive ) ;
2015-10-20 23:24:21 +00:00
_send ( * mid , STATE_MESSAGE_ALIVE , alive . data ( ) , alive . size ( ) ) ;
2015-10-20 22:27:53 +00:00
_members [ * mid ] . lastAnnouncedAliveTo = now ;
}
_flush ( * mid ) ; // does nothing if nothing to flush
2015-10-14 21:12:12 +00:00
}
}
}
void Cluster : : addMember ( uint16_t memberId )
{
2015-10-22 23:02:01 +00:00
if ( ( memberId > = ZT_CLUSTER_MAX_MEMBERS ) | | ( memberId = = _id ) )
2015-10-20 22:27:53 +00:00
return ;
2015-10-14 21:12:12 +00:00
Mutex : : Lock _l2 ( _members [ memberId ] . lock ) ;
2015-10-20 22:27:53 +00:00
{
Mutex : : Lock _l ( _memberIds_m ) ;
if ( std : : find ( _memberIds . begin ( ) , _memberIds . end ( ) , memberId ) ! = _memberIds . end ( ) )
return ;
_memberIds . push_back ( memberId ) ;
std : : sort ( _memberIds . begin ( ) , _memberIds . end ( ) ) ;
}
_members [ memberId ] . clear ( ) ;
2015-10-14 21:12:12 +00:00
// Generate this member's message key from the master and its ID
uint16_t stmp [ ZT_SHA512_DIGEST_LEN / sizeof ( uint16_t ) ] ;
memcpy ( stmp , _masterSecret , sizeof ( stmp ) ) ;
stmp [ 0 ] ^ = Utils : : hton ( memberId ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
memcpy ( _members [ memberId ] . key , stmp , sizeof ( _members [ memberId ] . key ) ) ;
Utils : : burn ( stmp , sizeof ( stmp ) ) ;
// Prepare q
_members [ memberId ] . q . clear ( ) ;
char iv [ 16 ] ;
Utils : : getSecureRandom ( iv , 16 ) ;
_members [ memberId ] . q . append ( iv , 16 ) ;
_members [ memberId ] . q . addSize ( 8 ) ; // room for MAC
2015-10-14 22:49:41 +00:00
_members [ memberId ] . q . append ( ( uint16_t ) _id ) ;
2015-10-20 22:27:53 +00:00
_members [ memberId ] . q . append ( ( uint16_t ) memberId ) ;
}
void Cluster : : removeMember ( uint16_t memberId )
{
Mutex : : Lock _l ( _memberIds_m ) ;
std : : vector < uint16_t > newMemberIds ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
if ( * mid ! = memberId )
newMemberIds . push_back ( * mid ) ;
}
_memberIds = newMemberIds ;
}
2015-10-21 00:36:10 +00:00
bool Cluster : : redirectPeer ( const Address & peerAddress , const InetAddress & peerPhysicalAddress , bool offload )
2015-10-20 22:27:53 +00:00
{
if ( ! peerPhysicalAddress ) // sanity check
return false ;
2015-10-23 20:03:34 +00:00
2015-10-20 22:27:53 +00:00
if ( _addressToLocationFunction ) {
// Pick based on location if it can be determined
int px = 0 , py = 0 , pz = 0 ;
if ( _addressToLocationFunction ( _addressToLocationFunctionArg , reinterpret_cast < const struct sockaddr_storage * > ( & peerPhysicalAddress ) , & px , & py , & pz ) = = 0 ) {
2015-10-23 20:03:34 +00:00
TRACE ( " no geolocation available for %s " , peerPhysicalAddress . toIpString ( ) . c_str ( ) ) ;
2015-10-20 22:27:53 +00:00
return false ;
}
// Find member closest to this peer
const uint64_t now = RR - > node - > now ( ) ;
std : : vector < InetAddress > best ; // initial "best" is for peer to stay put
const double currentDistance = _dist3d ( _x , _y , _z , px , py , pz ) ;
double bestDistance = ( offload ? 2147483648.0 : currentDistance ) ;
unsigned int bestMember = _id ;
2015-10-23 20:03:34 +00:00
TRACE ( " %s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm) " , peerPhysicalAddress . toString ( ) . c_str ( ) , px , py , pz , _x , _y , _z , bestDistance ) ;
2015-10-20 22:27:53 +00:00
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
_Member & m = _members [ * mid ] ;
Mutex : : Lock _ml ( m . lock ) ;
// Consider member if it's alive and has sent us a location and one or more physical endpoints to send peers to
if ( ( ( now - m . lastReceivedAliveAnnouncement ) < ZT_CLUSTER_TIMEOUT ) & & ( ( m . x ! = 0 ) | | ( m . y ! = 0 ) | | ( m . z ! = 0 ) ) & & ( m . zeroTierPhysicalEndpoints . size ( ) > 0 ) ) {
double mdist = _dist3d ( m . x , m . y , m . z , px , py , pz ) ;
if ( mdist < bestDistance ) {
2015-10-23 20:03:34 +00:00
bestDistance = mdist ;
2015-10-20 22:27:53 +00:00
bestMember = * mid ;
best = m . zeroTierPhysicalEndpoints ;
}
}
}
}
if ( best . size ( ) > 0 ) {
2015-10-23 20:03:34 +00:00
TRACE ( " %s seems closer to %u at %fkm, suggesting redirect... " , peerAddress . toString ( ) . c_str ( ) , bestMember , bestDistance ) ;
2015-10-20 22:27:53 +00:00
/* if (peer->remoteVersionProtocol() >= 5) {
// If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic
} else { */
// Otherwise send VERB_RENDEZVOUS for ourselves, which will trick peers into trying other endpoints for us even if they're too old for PUSH_DIRECT_PATHS
for ( std : : vector < InetAddress > : : const_iterator a ( best . begin ( ) ) ; a ! = best . end ( ) ; + + a ) {
if ( ( a - > ss_family = = AF_INET ) | | ( a - > ss_family = = AF_INET6 ) ) {
2015-10-21 00:36:10 +00:00
Packet outp ( peerAddress , RR - > identity . address ( ) , Packet : : VERB_RENDEZVOUS ) ;
2015-10-20 22:27:53 +00:00
outp . append ( ( uint8_t ) 0 ) ; // no flags
RR - > identity . address ( ) . appendTo ( outp ) ; // HACK: rendezvous with ourselves! with really old peers this will only work if I'm a root server!
outp . append ( ( uint16_t ) a - > port ( ) ) ;
if ( a - > ss_family = = AF_INET ) {
outp . append ( ( uint8_t ) 4 ) ;
outp . append ( a - > rawIpData ( ) , 4 ) ;
} else {
outp . append ( ( uint8_t ) 16 ) ;
outp . append ( a - > rawIpData ( ) , 16 ) ;
}
RR - > sw - > send ( outp , true , 0 ) ;
}
}
//}
return true ;
} else {
2015-10-21 00:36:10 +00:00
TRACE ( " peer %s is at [%d,%d,%d], distance to us is %f and this seems to be the best " , peerAddress . toString ( ) . c_str ( ) , px , py , pz , currentDistance ) ;
2015-10-20 22:27:53 +00:00
return false ;
}
} else {
// TODO: pick based on load if no location info?
return false ;
}
2015-10-14 21:12:12 +00:00
}
2015-10-26 19:41:08 +00:00
void Cluster : : status ( ZT_ClusterStatus & status ) const
{
const uint64_t now = RR - > node - > now ( ) ;
memset ( & status , 0 , sizeof ( ZT_ClusterStatus ) ) ;
ZT_ClusterMemberStatus * ms [ ZT_CLUSTER_MAX_MEMBERS ] ;
memset ( ms , 0 , sizeof ( ms ) ) ;
status . myId = _id ;
2015-10-26 20:06:10 +00:00
ms [ _id ] = & ( status . members [ status . clusterSize + + ] ) ;
2015-10-26 19:41:08 +00:00
ms [ _id ] - > id = _id ;
ms [ _id ] - > alive = 1 ;
ms [ _id ] - > x = _x ;
ms [ _id ] - > y = _y ;
ms [ _id ] - > z = _z ;
ms [ _id ] - > peers = RR - > topology - > countAlive ( ) ;
for ( std : : vector < InetAddress > : : const_iterator ep ( _zeroTierPhysicalEndpoints . begin ( ) ) ; ep ! = _zeroTierPhysicalEndpoints . end ( ) ; + + ep ) {
if ( ms [ _id ] - > numZeroTierPhysicalEndpoints > = ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES ) // sanity check
break ;
memcpy ( & ( ms [ _id ] - > zeroTierPhysicalEndpoints [ ms [ _id ] - > numZeroTierPhysicalEndpoints + + ] ) , & ( * ep ) , sizeof ( struct sockaddr_storage ) ) ;
}
{
Mutex : : Lock _l1 ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
if ( status . clusterSize > = ZT_CLUSTER_MAX_MEMBERS ) // sanity check
break ;
2015-10-26 20:06:10 +00:00
ZT_ClusterMemberStatus * s = ms [ * mid ] = & ( status . members [ status . clusterSize + + ] ) ;
2015-10-26 19:41:08 +00:00
_Member & m = _members [ * mid ] ;
Mutex : : Lock ml ( m . lock ) ;
s - > id = * mid ;
s - > msSinceLastHeartbeat = ( unsigned int ) std : : min ( ( uint64_t ) ( ~ ( ( unsigned int ) 0 ) ) , ( now - m . lastReceivedAliveAnnouncement ) ) ;
s - > alive = ( s - > msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT ) ? 1 : 0 ;
s - > x = m . x ;
s - > y = m . y ;
s - > z = m . z ;
s - > load = m . load ;
for ( std : : vector < InetAddress > : : const_iterator ep ( m . zeroTierPhysicalEndpoints . begin ( ) ) ; ep ! = m . zeroTierPhysicalEndpoints . end ( ) ; + + ep ) {
if ( s - > numZeroTierPhysicalEndpoints > = ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES ) // sanity check
break ;
memcpy ( & ( s - > zeroTierPhysicalEndpoints [ s - > numZeroTierPhysicalEndpoints + + ] ) , & ( * ep ) , sizeof ( struct sockaddr_storage ) ) ;
}
}
}
{
Mutex : : Lock _l2 ( _peerAffinities_m ) ;
for ( std : : vector < _PeerAffinity > : : const_iterator pi ( _peerAffinities . begin ( ) ) ; pi ! = _peerAffinities . end ( ) ; + + pi ) {
unsigned int mid = pi - > clusterMemberId ( ) ;
if ( ( ms [ mid ] ) & & ( mid ! = _id ) & & ( ( now - pi - > timestamp ) < ZT_PEER_ACTIVITY_TIMEOUT ) )
+ + ms [ mid ] - > peers ;
}
}
}
2015-10-20 23:24:21 +00:00
void Cluster : : _send ( uint16_t memberId , StateMessageType type , const void * msg , unsigned int len )
2015-10-14 21:12:12 +00:00
{
2015-10-22 23:02:01 +00:00
if ( ( len + 3 ) > ( ZT_CLUSTER_MAX_MESSAGE_LENGTH - ( 24 + 2 + 2 ) ) ) // sanity check
return ;
2015-10-14 21:12:12 +00:00
_Member & m = _members [ memberId ] ;
// assumes m.lock is locked!
2015-10-20 23:24:21 +00:00
if ( ( m . q . size ( ) + len + 3 ) > ZT_CLUSTER_MAX_MESSAGE_LENGTH )
_flush ( memberId ) ;
m . q . append ( ( uint16_t ) ( len + 1 ) ) ;
m . q . append ( ( uint8_t ) type ) ;
m . q . append ( msg , len ) ;
2015-10-14 21:12:12 +00:00
}
void Cluster : : _flush ( uint16_t memberId )
{
_Member & m = _members [ memberId ] ;
// assumes m.lock is locked!
2015-10-20 22:27:53 +00:00
if ( m . q . size ( ) > ( 24 + 2 + 2 ) ) { // 16-byte IV + 8-byte MAC + 2 byte from-member-ID + 2 byte to-member-ID
2015-10-14 21:12:12 +00:00
// Create key from member's key and IV
char keytmp [ 32 ] ;
memcpy ( keytmp , m . key , 32 ) ;
for ( int i = 0 ; i < 8 ; + + i )
keytmp [ i ] ^ = m . q [ i ] ;
Salsa20 s20 ( keytmp , 256 , m . q . field ( 8 , 8 ) ) ;
Utils : : burn ( keytmp , sizeof ( keytmp ) ) ;
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey [ ZT_POLY1305_KEY_LEN ] ;
memset ( polykey , 0 , sizeof ( polykey ) ) ;
s20 . encrypt12 ( polykey , polykey , sizeof ( polykey ) ) ;
// Encrypt m.q in place
s20 . encrypt12 ( reinterpret_cast < const char * > ( m . q . data ( ) ) + 24 , const_cast < char * > ( reinterpret_cast < const char * > ( m . q . data ( ) ) ) + 24 , m . q . size ( ) - 24 ) ;
// Add MAC for authentication (encrypt-then-MAC)
char mac [ ZT_POLY1305_MAC_LEN ] ;
Poly1305 : : compute ( mac , reinterpret_cast < const char * > ( m . q . data ( ) ) + 24 , m . q . size ( ) - 24 , polykey ) ;
memcpy ( m . q . field ( 16 , 8 ) , mac , 8 ) ;
// Send!
2015-10-20 22:27:53 +00:00
_sendFunction ( _sendFunctionArg , memberId , m . q . data ( ) , m . q . size ( ) ) ;
2015-10-14 21:12:12 +00:00
// Prepare for more
m . q . clear ( ) ;
char iv [ 16 ] ;
Utils : : getSecureRandom ( iv , 16 ) ;
m . q . append ( iv , 16 ) ;
m . q . addSize ( 8 ) ; // room for MAC
2015-10-20 22:27:53 +00:00
m . q . append ( ( uint16_t ) _id ) ; // from member ID
m . q . append ( ( uint16_t ) memberId ) ; // to member ID
2015-10-14 21:12:12 +00:00
}
}
} // namespace ZeroTier
# endif // ZT_ENABLE_CLUSTER