2015-10-14 21:12:12 +00:00
/*
* ZeroTier One - Network Virtualization Everywhere
2016-01-12 22:04:55 +00:00
* Copyright ( C ) 2011 - 2016 ZeroTier , Inc . https : //www.zerotier.com/
2015-10-14 21:12:12 +00:00
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation , either version 3 of the License , or
* ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
*
* You should have received a copy of the GNU General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
# ifdef ZT_ENABLE_CLUSTER
# include <stdint.h>
# include <stdio.h>
# include <stdlib.h>
# include <string.h>
2015-10-20 22:27:53 +00:00
# include <math.h>
2015-10-14 21:12:12 +00:00
2015-11-10 17:05:01 +00:00
# include <map>
2015-10-14 21:12:12 +00:00
# include <algorithm>
2015-11-10 17:05:01 +00:00
# include <set>
2015-10-14 21:12:12 +00:00
# include <utility>
2015-11-10 17:05:01 +00:00
# include <list>
# include <stdexcept>
2015-10-14 21:12:12 +00:00
2015-10-20 22:27:53 +00:00
# include "../version.h"
2015-10-14 21:12:12 +00:00
# include "Cluster.hpp"
# include "RuntimeEnvironment.hpp"
# include "MulticastGroup.hpp"
# include "CertificateOfMembership.hpp"
# include "Salsa20.hpp"
# include "Poly1305.hpp"
2015-10-20 22:27:53 +00:00
# include "Identity.hpp"
2015-10-27 16:36:48 +00:00
# include "Topology.hpp"
# include "Packet.hpp"
2015-10-14 21:12:12 +00:00
# include "Switch.hpp"
2015-10-27 22:00:16 +00:00
# include "Node.hpp"
2015-11-10 17:05:01 +00:00
# include "Array.hpp"
2015-10-14 21:12:12 +00:00
namespace ZeroTier {
2015-10-20 22:27:53 +00:00
static inline double _dist3d ( int x1 , int y1 , int z1 , int x2 , int y2 , int z2 )
throw ( )
{
double dx = ( ( double ) x2 - ( double ) x1 ) ;
double dy = ( ( double ) y2 - ( double ) y1 ) ;
double dz = ( ( double ) z2 - ( double ) z1 ) ;
return sqrt ( ( dx * dx ) + ( dy * dy ) + ( dz * dz ) ) ;
}
2015-11-10 17:05:01 +00:00
// An entry in _ClusterSendQueue
struct _ClusterSendQueueEntry
{
uint64_t timestamp ;
Address fromPeerAddress ;
Address toPeerAddress ;
// if we ever support larger transport MTUs this must be increased
unsigned char data [ ZT_CLUSTER_SEND_QUEUE_DATA_MAX ] ;
unsigned int len ;
bool unite ;
} ;
// A multi-index map with entry memory pooling -- this allows our queue to
// be O(log(N)) and is complex enough that it makes the code a lot cleaner
// to break it out from Cluster.
class _ClusterSendQueue
{
public :
_ClusterSendQueue ( ) :
2015-11-10 17:46:14 +00:00
_poolCount ( 0 ) { }
2015-11-10 17:05:01 +00:00
~ _ClusterSendQueue ( ) { } // memory is automatically freed when _chunks is destroyed
2015-11-10 17:46:14 +00:00
inline void enqueue ( uint64_t now , const Address & from , const Address & to , const void * data , unsigned int len , bool unite )
2015-11-10 17:05:01 +00:00
{
if ( len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX )
return ;
Mutex : : Lock _l ( _lock ) ;
2015-11-10 17:46:14 +00:00
// Delete oldest queue entry for this sender if this enqueue() would take them over the per-sender limit
2015-11-10 17:05:01 +00:00
{
std : : set < std : : pair < Address , _ClusterSendQueueEntry * > > : : iterator qi ( _bySrc . lower_bound ( std : : pair < Address , _ClusterSendQueueEntry * > ( from , ( _ClusterSendQueueEntry * ) 0 ) ) ) ;
2015-11-10 17:46:14 +00:00
std : : set < std : : pair < Address , _ClusterSendQueueEntry * > > : : iterator oldest ( qi ) ;
2015-11-10 17:05:01 +00:00
unsigned long countForSender = 0 ;
while ( ( qi ! = _bySrc . end ( ) ) & & ( qi - > first = = from ) ) {
2015-11-10 17:46:14 +00:00
if ( qi - > second - > timestamp < oldest - > second - > timestamp )
2015-11-10 17:05:01 +00:00
oldest = qi ;
2015-11-10 17:46:14 +00:00
+ + countForSender ;
2015-11-10 17:05:01 +00:00
+ + qi ;
}
2015-11-10 17:46:14 +00:00
if ( countForSender > = ZT_CLUSTER_MAX_QUEUE_PER_SENDER ) {
_byDest . erase ( std : : pair < Address , _ClusterSendQueueEntry * > ( oldest - > second - > toPeerAddress , oldest - > second ) ) ;
_pool [ _poolCount + + ] = oldest - > second ;
_bySrc . erase ( oldest ) ;
}
2015-11-10 17:05:01 +00:00
}
_ClusterSendQueueEntry * e ;
if ( _poolCount > 0 ) {
e = _pool [ - - _poolCount ] ;
} else {
if ( _chunks . size ( ) > = ZT_CLUSTER_MAX_QUEUE_CHUNKS )
return ; // queue is totally full!
_chunks . push_back ( Array < _ClusterSendQueueEntry , ZT_CLUSTER_QUEUE_CHUNK_SIZE > ( ) ) ;
e = & ( _chunks . back ( ) . data [ 0 ] ) ;
for ( unsigned int i = 1 ; i < ZT_CLUSTER_QUEUE_CHUNK_SIZE ; + + i )
_pool [ _poolCount + + ] = & ( _chunks . back ( ) . data [ i ] ) ;
}
2015-11-10 17:46:14 +00:00
e - > timestamp = now ;
2015-11-10 17:05:01 +00:00
e - > fromPeerAddress = from ;
e - > toPeerAddress = to ;
memcpy ( e - > data , data , len ) ;
e - > len = len ;
e - > unite = unite ;
_bySrc . insert ( std : : pair < Address , _ClusterSendQueueEntry * > ( from , e ) ) ;
_byDest . insert ( std : : pair < Address , _ClusterSendQueueEntry * > ( to , e ) ) ;
}
inline void expire ( uint64_t now )
{
Mutex : : Lock _l ( _lock ) ;
for ( std : : set < std : : pair < Address , _ClusterSendQueueEntry * > > : : iterator qi ( _bySrc . begin ( ) ) ; qi ! = _bySrc . end ( ) ; ) {
if ( ( now - qi - > second - > timestamp ) > ZT_CLUSTER_QUEUE_EXPIRATION ) {
_byDest . erase ( std : : pair < Address , _ClusterSendQueueEntry * > ( qi - > second - > toPeerAddress , qi - > second ) ) ;
_pool [ _poolCount + + ] = qi - > second ;
_bySrc . erase ( qi + + ) ;
} else + + qi ;
}
}
/**
* Get and dequeue entries for a given destination address
*
* After use these entries must be returned with returnToPool ( ) !
*
* @ param dest Destination address
* @ param results Array to fill with results
* @ param maxResults Size of results [ ] in pointers
* @ return Number of actual results returned
*/
inline unsigned int getByDest ( const Address & dest , _ClusterSendQueueEntry * * results , unsigned int maxResults )
{
unsigned int count = 0 ;
Mutex : : Lock _l ( _lock ) ;
std : : set < std : : pair < Address , _ClusterSendQueueEntry * > > : : iterator qi ( _byDest . lower_bound ( std : : pair < Address , _ClusterSendQueueEntry * > ( dest , ( _ClusterSendQueueEntry * ) 0 ) ) ) ;
while ( ( qi ! = _byDest . end ( ) ) & & ( qi - > first = = dest ) ) {
_bySrc . erase ( std : : pair < Address , _ClusterSendQueueEntry * > ( qi - > second - > fromPeerAddress , qi - > second ) ) ;
results [ count + + ] = qi - > second ;
if ( count = = maxResults )
break ;
_byDest . erase ( qi + + ) ;
}
return count ;
}
/**
* Return entries to pool after use
*
* @ param entries Array of entries
* @ param count Number of entries
*/
inline void returnToPool ( _ClusterSendQueueEntry * * entries , unsigned int count )
{
Mutex : : Lock _l ( _lock ) ;
for ( unsigned int i = 0 ; i < count ; + + i )
_pool [ _poolCount + + ] = entries [ i ] ;
}
private :
std : : list < Array < _ClusterSendQueueEntry , ZT_CLUSTER_QUEUE_CHUNK_SIZE > > _chunks ;
_ClusterSendQueueEntry * _pool [ ZT_CLUSTER_QUEUE_CHUNK_SIZE * ZT_CLUSTER_MAX_QUEUE_CHUNKS ] ;
unsigned long _poolCount ;
std : : set < std : : pair < Address , _ClusterSendQueueEntry * > > _bySrc ;
std : : set < std : : pair < Address , _ClusterSendQueueEntry * > > _byDest ;
Mutex _lock ;
} ;
2015-10-20 22:27:53 +00:00
Cluster : : Cluster (
const RuntimeEnvironment * renv ,
uint16_t id ,
const std : : vector < InetAddress > & zeroTierPhysicalEndpoints ,
int32_t x ,
int32_t y ,
int32_t z ,
void ( * sendFunction ) ( void * , unsigned int , const void * , unsigned int ) ,
void * sendFunctionArg ,
int ( * addressToLocationFunction ) ( void * , const struct sockaddr_storage * , int * , int * , int * ) ,
void * addressToLocationFunctionArg ) :
2015-10-14 21:12:12 +00:00
RR ( renv ) ,
2015-11-10 17:05:01 +00:00
_sendQueue ( new _ClusterSendQueue ( ) ) ,
2015-10-14 21:12:12 +00:00
_sendFunction ( sendFunction ) ,
2015-10-20 22:27:53 +00:00
_sendFunctionArg ( sendFunctionArg ) ,
_addressToLocationFunction ( addressToLocationFunction ) ,
_addressToLocationFunctionArg ( addressToLocationFunctionArg ) ,
2015-10-14 21:12:12 +00:00
_x ( x ) ,
_y ( y ) ,
_z ( z ) ,
2015-10-19 23:18:57 +00:00
_id ( id ) ,
2015-10-20 22:27:53 +00:00
_zeroTierPhysicalEndpoints ( zeroTierPhysicalEndpoints ) ,
2015-10-27 22:57:26 +00:00
_members ( new _Member [ ZT_CLUSTER_MAX_MEMBERS ] ) ,
2015-11-08 21:57:02 +00:00
_lastFlushed ( 0 ) ,
2015-11-10 17:05:01 +00:00
_lastCleanedRemotePeers ( 0 ) ,
_lastCleanedQueue ( 0 )
2015-10-14 21:12:12 +00:00
{
uint16_t stmp [ ZT_SHA512_DIGEST_LEN / sizeof ( uint16_t ) ] ;
// Generate master secret by hashing the secret from our Identity key pair
RR - > identity . sha512PrivateKey ( _masterSecret ) ;
// Generate our inbound message key, which is the master secret XORed with our ID and hashed twice
memcpy ( stmp , _masterSecret , sizeof ( stmp ) ) ;
stmp [ 0 ] ^ = Utils : : hton ( id ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
memcpy ( _key , stmp , sizeof ( _key ) ) ;
Utils : : burn ( stmp , sizeof ( stmp ) ) ;
}
Cluster : : ~ Cluster ( )
{
Utils : : burn ( _masterSecret , sizeof ( _masterSecret ) ) ;
Utils : : burn ( _key , sizeof ( _key ) ) ;
2015-10-19 23:18:57 +00:00
delete [ ] _members ;
2015-11-10 17:05:01 +00:00
delete _sendQueue ;
2015-10-14 21:12:12 +00:00
}
void Cluster : : handleIncomingStateMessage ( const void * msg , unsigned int len )
{
Buffer < ZT_CLUSTER_MAX_MESSAGE_LENGTH > dmsg ;
{
// FORMAT: <[16] iv><[8] MAC><... data>
if ( ( len < 24 ) | | ( len > ZT_CLUSTER_MAX_MESSAGE_LENGTH ) )
return ;
// 16-byte IV: first 8 bytes XORed with key, last 8 bytes used as Salsa20 64-bit IV
char keytmp [ 32 ] ;
memcpy ( keytmp , _key , 32 ) ;
for ( int i = 0 ; i < 8 ; + + i )
keytmp [ i ] ^ = reinterpret_cast < const char * > ( msg ) [ i ] ;
Salsa20 s20 ( keytmp , 256 , reinterpret_cast < const char * > ( msg ) + 8 ) ;
Utils : : burn ( keytmp , sizeof ( keytmp ) ) ;
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey [ ZT_POLY1305_KEY_LEN ] ;
memset ( polykey , 0 , sizeof ( polykey ) ) ;
s20 . encrypt12 ( polykey , polykey , sizeof ( polykey ) ) ;
// Compute 16-byte MAC
char mac [ ZT_POLY1305_MAC_LEN ] ;
Poly1305 : : compute ( mac , reinterpret_cast < const char * > ( msg ) + 24 , len - 24 , polykey ) ;
// Check first 8 bytes of MAC against 64-bit MAC in stream
if ( ! Utils : : secureEq ( mac , reinterpret_cast < const char * > ( msg ) + 16 , 8 ) )
return ;
// Decrypt!
2015-10-14 22:49:41 +00:00
dmsg . setSize ( len - 24 ) ;
s20 . decrypt12 ( reinterpret_cast < const char * > ( msg ) + 24 , const_cast < void * > ( dmsg . data ( ) ) , dmsg . size ( ) ) ;
2015-10-14 21:12:12 +00:00
}
2015-10-20 22:27:53 +00:00
if ( dmsg . size ( ) < 4 )
2015-10-14 21:12:12 +00:00
return ;
const uint16_t fromMemberId = dmsg . at < uint16_t > ( 0 ) ;
unsigned int ptr = 2 ;
2015-10-23 18:51:18 +00:00
if ( fromMemberId = = _id ) // sanity check: we don't talk to ourselves
2015-10-20 22:27:53 +00:00
return ;
const uint16_t toMemberId = dmsg . at < uint16_t > ( ptr ) ;
ptr + = 2 ;
2015-10-23 18:51:18 +00:00
if ( toMemberId ! = _id ) // sanity check: message not for us?
2015-10-20 22:27:53 +00:00
return ;
2015-10-14 21:12:12 +00:00
2015-10-23 18:51:18 +00:00
{ // make sure sender is actually considered a member
Mutex : : Lock _l3 ( _memberIds_m ) ;
if ( std : : find ( _memberIds . begin ( ) , _memberIds . end ( ) , fromMemberId ) = = _memberIds . end ( ) )
return ;
}
2015-11-08 21:57:02 +00:00
try {
while ( ptr < dmsg . size ( ) ) {
const unsigned int mlen = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
const unsigned int nextPtr = ptr + mlen ;
if ( nextPtr > dmsg . size ( ) )
break ;
int mtype = - 1 ;
try {
switch ( ( StateMessageType ) ( mtype = ( int ) dmsg [ ptr + + ] ) ) {
default :
break ;
case CLUSTER_MESSAGE_ALIVE : {
2015-11-09 17:45:43 +00:00
_Member & m = _members [ fromMemberId ] ;
2015-11-08 21:57:02 +00:00
Mutex : : Lock mlck ( m . lock ) ;
ptr + = 7 ; // skip version stuff, not used yet
m . x = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . y = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
m . z = dmsg . at < int32_t > ( ptr ) ; ptr + = 4 ;
ptr + = 8 ; // skip local clock, not used
m . load = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
m . peers = dmsg . at < uint64_t > ( ptr ) ; ptr + = 8 ;
ptr + = 8 ; // skip flags, unused
2015-10-20 23:48:49 +00:00
# ifdef ZT_TRACE
2015-11-08 21:57:02 +00:00
std : : string addrs ;
2015-10-20 23:48:49 +00:00
# endif
2015-11-08 21:57:02 +00:00
unsigned int physicalAddressCount = dmsg [ ptr + + ] ;
m . zeroTierPhysicalEndpoints . clear ( ) ;
for ( unsigned int i = 0 ; i < physicalAddressCount ; + + i ) {
m . zeroTierPhysicalEndpoints . push_back ( InetAddress ( ) ) ;
ptr + = m . zeroTierPhysicalEndpoints . back ( ) . deserialize ( dmsg , ptr ) ;
if ( ! ( m . zeroTierPhysicalEndpoints . back ( ) ) ) {
m . zeroTierPhysicalEndpoints . pop_back ( ) ;
2015-10-23 18:51:18 +00:00
}
2015-10-20 23:48:49 +00:00
# ifdef ZT_TRACE
2015-11-08 21:57:02 +00:00
else {
if ( addrs . length ( ) > 0 )
addrs . push_back ( ' , ' ) ;
addrs . append ( m . zeroTierPhysicalEndpoints . back ( ) . toString ( ) ) ;
2015-10-26 23:55:55 +00:00
}
2015-10-20 23:48:49 +00:00
# endif
2015-11-08 21:57:02 +00:00
}
# ifdef ZT_TRACE
if ( ( RR - > node - > now ( ) - m . lastReceivedAliveAnnouncement ) > = ZT_CLUSTER_TIMEOUT ) {
TRACE ( " [%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s " , ( unsigned int ) fromMemberId , m . x , m . y , m . z , addrs . c_str ( ) ) ;
}
# endif
m . lastReceivedAliveAnnouncement = RR - > node - > now ( ) ;
} break ;
case CLUSTER_MESSAGE_HAVE_PEER : {
Identity id ;
ptr + = id . deserialize ( dmsg , ptr ) ;
if ( id ) {
RR - > topology - > saveIdentity ( id ) ;
{
Mutex : : Lock _l ( _remotePeers_m ) ;
_remotePeers [ std : : pair < Address , unsigned int > ( id . address ( ) , ( unsigned int ) fromMemberId ) ] = RR - > node - > now ( ) ;
}
2015-11-10 17:05:01 +00:00
_ClusterSendQueueEntry * q [ 16384 ] ; // 16384 is "tons"
unsigned int qc = _sendQueue - > getByDest ( id . address ( ) , q , 16384 ) ;
for ( unsigned int i = 0 ; i < qc ; + + i )
this - > sendViaCluster ( q [ i ] - > fromPeerAddress , q [ i ] - > toPeerAddress , q [ i ] - > data , q [ i ] - > len , q [ i ] - > unite ) ;
_sendQueue - > returnToPool ( q , qc ) ;
2015-11-08 21:57:02 +00:00
2015-11-10 17:05:01 +00:00
TRACE ( " [%u] has %s (retried %u queued sends) " , ( unsigned int ) fromMemberId , id . address ( ) . toString ( ) . c_str ( ) , qc ) ;
2015-11-08 21:57:02 +00:00
}
} break ;
case CLUSTER_MESSAGE_WANT_PEER : {
const Address zeroTierAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
SharedPtr < Peer > peer ( RR - > topology - > getPeerNoCache ( zeroTierAddress ) ) ;
2015-11-09 22:25:28 +00:00
if ( ( peer ) & & ( peer - > hasClusterOptimalPath ( RR - > node - > now ( ) ) ) ) {
2015-11-09 17:14:26 +00:00
Buffer < 1024 > buf ;
peer - > identity ( ) . serialize ( buf ) ;
Mutex : : Lock _l2 ( _members [ fromMemberId ] . lock ) ;
_send ( fromMemberId , CLUSTER_MESSAGE_HAVE_PEER , buf . data ( ) , buf . size ( ) ) ;
2015-11-08 21:57:02 +00:00
}
} break ;
case CLUSTER_MESSAGE_REMOTE_PACKET : {
const unsigned int plen = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
if ( plen ) {
Packet remotep ( dmsg . field ( ptr , plen ) , plen ) ; ptr + = plen ;
2015-11-09 19:08:52 +00:00
//TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen);
2015-11-08 21:57:02 +00:00
switch ( remotep . verb ( ) ) {
case Packet : : VERB_WHOIS : _doREMOTE_WHOIS ( fromMemberId , remotep ) ; break ;
case Packet : : VERB_MULTICAST_GATHER : _doREMOTE_MULTICAST_GATHER ( fromMemberId , remotep ) ; break ;
default : break ; // ignore things we don't care about across cluster
2015-10-23 18:51:18 +00:00
}
2015-11-08 21:57:02 +00:00
}
} break ;
case CLUSTER_MESSAGE_PROXY_UNITE : {
const Address localPeerAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const Address remotePeerAddress ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const unsigned int numRemotePeerPaths = dmsg [ ptr + + ] ;
InetAddress remotePeerPaths [ 256 ] ; // size is 8-bit, so 256 is max
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i )
ptr + = remotePeerPaths [ i ] . deserialize ( dmsg , ptr ) ;
TRACE ( " [%u] requested that we unite local %s with remote %s " , ( unsigned int ) fromMemberId , localPeerAddress . toString ( ) . c_str ( ) , remotePeerAddress . toString ( ) . c_str ( ) ) ;
const uint64_t now = RR - > node - > now ( ) ;
SharedPtr < Peer > localPeer ( RR - > topology - > getPeerNoCache ( localPeerAddress ) ) ;
if ( ( localPeer ) & & ( numRemotePeerPaths > 0 ) ) {
InetAddress bestLocalV4 , bestLocalV6 ;
localPeer - > getBestActiveAddresses ( now , bestLocalV4 , bestLocalV6 ) ;
InetAddress bestRemoteV4 , bestRemoteV6 ;
for ( unsigned int i = 0 ; i < numRemotePeerPaths ; + + i ) {
if ( ( bestRemoteV4 ) & & ( bestRemoteV6 ) )
break ;
switch ( remotePeerPaths [ i ] . ss_family ) {
case AF_INET :
if ( ! bestRemoteV4 )
bestRemoteV4 = remotePeerPaths [ i ] ;
break ;
case AF_INET6 :
if ( ! bestRemoteV6 )
bestRemoteV6 = remotePeerPaths [ i ] ;
2015-10-27 21:04:12 +00:00
break ;
2015-10-14 21:12:12 +00:00
}
2015-11-08 21:57:02 +00:00
}
2015-10-27 21:04:12 +00:00
2015-11-08 21:57:02 +00:00
Packet rendezvousForLocal ( localPeerAddress , RR - > identity . address ( ) , Packet : : VERB_RENDEZVOUS ) ;
rendezvousForLocal . append ( ( uint8_t ) 0 ) ;
remotePeerAddress . appendTo ( rendezvousForLocal ) ;
Buffer < 2048 > rendezvousForRemote ;
remotePeerAddress . appendTo ( rendezvousForRemote ) ;
rendezvousForRemote . append ( ( uint8_t ) Packet : : VERB_RENDEZVOUS ) ;
rendezvousForRemote . addSize ( 2 ) ; // space for actual packet payload length
rendezvousForRemote . append ( ( uint8_t ) 0 ) ; // flags == 0
localPeerAddress . appendTo ( rendezvousForRemote ) ;
bool haveMatch = false ;
if ( ( bestLocalV6 ) & & ( bestRemoteV6 ) ) {
haveMatch = true ;
rendezvousForLocal . append ( ( uint16_t ) bestRemoteV6 . port ( ) ) ;
rendezvousForLocal . append ( ( uint8_t ) 16 ) ;
rendezvousForLocal . append ( bestRemoteV6 . rawIpData ( ) , 16 ) ;
rendezvousForRemote . append ( ( uint16_t ) bestLocalV6 . port ( ) ) ;
rendezvousForRemote . append ( ( uint8_t ) 16 ) ;
rendezvousForRemote . append ( bestLocalV6 . rawIpData ( ) , 16 ) ;
2015-11-09 18:25:20 +00:00
rendezvousForRemote . setAt < uint16_t > ( ZT_ADDRESS_LENGTH + 1 , ( uint16_t ) ( 9 + 16 ) ) ;
2015-11-08 21:57:02 +00:00
} else if ( ( bestLocalV4 ) & & ( bestRemoteV4 ) ) {
haveMatch = true ;
rendezvousForLocal . append ( ( uint16_t ) bestRemoteV4 . port ( ) ) ;
rendezvousForLocal . append ( ( uint8_t ) 4 ) ;
rendezvousForLocal . append ( bestRemoteV4 . rawIpData ( ) , 4 ) ;
rendezvousForRemote . append ( ( uint16_t ) bestLocalV4 . port ( ) ) ;
rendezvousForRemote . append ( ( uint8_t ) 4 ) ;
rendezvousForRemote . append ( bestLocalV4 . rawIpData ( ) , 4 ) ;
2015-11-09 18:25:20 +00:00
rendezvousForRemote . setAt < uint16_t > ( ZT_ADDRESS_LENGTH + 1 , ( uint16_t ) ( 9 + 4 ) ) ;
2015-11-08 21:57:02 +00:00
}
2015-10-27 21:04:12 +00:00
2015-11-08 21:57:02 +00:00
if ( haveMatch ) {
2015-11-09 17:45:43 +00:00
{
Mutex : : Lock _l2 ( _members [ fromMemberId ] . lock ) ;
_send ( fromMemberId , CLUSTER_MESSAGE_PROXY_SEND , rendezvousForRemote . data ( ) , rendezvousForRemote . size ( ) ) ;
}
2015-11-08 21:57:02 +00:00
RR - > sw - > send ( rendezvousForLocal , true , 0 ) ;
2015-10-14 21:12:12 +00:00
}
2015-11-08 21:57:02 +00:00
}
} break ;
case CLUSTER_MESSAGE_PROXY_SEND : {
const Address rcpt ( dmsg . field ( ptr , ZT_ADDRESS_LENGTH ) , ZT_ADDRESS_LENGTH ) ; ptr + = ZT_ADDRESS_LENGTH ;
const Packet : : Verb verb = ( Packet : : Verb ) dmsg [ ptr + + ] ;
const unsigned int len = dmsg . at < uint16_t > ( ptr ) ; ptr + = 2 ;
Packet outp ( rcpt , RR - > identity . address ( ) , verb ) ;
outp . append ( dmsg . field ( ptr , len ) , len ) ; ptr + = len ;
RR - > sw - > send ( outp , true , 0 ) ;
//TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
} break ;
2015-10-14 21:12:12 +00:00
}
2015-11-08 21:57:02 +00:00
} catch ( . . . ) {
TRACE ( " invalid message of size %u type %d (inner decode), discarding " , mlen , mtype ) ;
// drop invalids
2015-10-23 18:51:18 +00:00
}
2015-11-08 21:57:02 +00:00
ptr = nextPtr ;
2015-10-14 21:12:12 +00:00
}
2015-11-08 21:57:02 +00:00
} catch ( . . . ) {
TRACE ( " invalid message (outer loop), discarding " ) ;
// drop invalids
2015-10-14 21:12:12 +00:00
}
}
2015-11-10 02:01:23 +00:00
void Cluster : : broadcastHavePeer ( const Identity & id )
{
Buffer < 1024 > buf ;
id . serialize ( buf ) ;
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , CLUSTER_MESSAGE_HAVE_PEER , buf . data ( ) , buf . size ( ) ) ;
}
}
2015-11-09 17:14:26 +00:00
void Cluster : : sendViaCluster ( const Address & fromPeerAddress , const Address & toPeerAddress , const void * data , unsigned int len , bool unite )
2015-10-21 00:22:53 +00:00
{
2015-11-08 21:57:02 +00:00
if ( len > ZT_PROTO_MAX_PACKET_LENGTH ) // sanity check
2015-11-09 17:14:26 +00:00
return ;
2015-10-27 22:57:26 +00:00
const uint64_t now = RR - > node - > now ( ) ;
2015-10-21 00:22:53 +00:00
2015-11-08 21:57:02 +00:00
uint64_t mostRecentTs = 0 ;
unsigned int mostRecentMemberId = 0xffffffff ;
2015-11-07 00:12:41 +00:00
{
2015-11-08 21:57:02 +00:00
Mutex : : Lock _l2 ( _remotePeers_m ) ;
2015-11-09 20:24:49 +00:00
std : : map < std : : pair < Address , unsigned int > , uint64_t > : : const_iterator rpe ( _remotePeers . lower_bound ( std : : pair < Address , unsigned int > ( toPeerAddress , 0 ) ) ) ;
2015-11-08 21:57:02 +00:00
for ( ; ; ) {
2015-11-09 20:24:49 +00:00
if ( ( rpe = = _remotePeers . end ( ) ) | | ( rpe - > first . first ! = toPeerAddress ) )
2015-11-08 21:57:02 +00:00
break ;
else if ( rpe - > second > mostRecentTs ) {
mostRecentTs = rpe - > second ;
mostRecentMemberId = rpe - > first . second ;
2015-11-07 00:12:41 +00:00
}
2015-11-09 21:39:06 +00:00
+ + rpe ;
2015-11-07 00:12:41 +00:00
}
2015-11-08 21:57:02 +00:00
}
const uint64_t age = now - mostRecentTs ;
if ( age > = ( ZT_PEER_ACTIVITY_TIMEOUT / 3 ) ) {
2015-11-09 17:14:26 +00:00
const bool enqueueAndWait = ( ( age > = ZT_PEER_ACTIVITY_TIMEOUT ) | | ( mostRecentMemberId > 0xffff ) ) ;
2015-11-08 21:57:02 +00:00
// Poll everyone with WANT_PEER if the age of our most recent entry is
// approaching expiration (or has expired, or does not exist).
char tmp [ ZT_ADDRESS_LENGTH ] ;
toPeerAddress . copyTo ( tmp , ZT_ADDRESS_LENGTH ) ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , CLUSTER_MESSAGE_WANT_PEER , tmp , ZT_ADDRESS_LENGTH ) ;
}
}
// If there isn't a good place to send via, then enqueue this for retrying
// later and return after having broadcasted a WANT_PEER.
2015-11-09 17:14:26 +00:00
if ( enqueueAndWait ) {
2015-11-09 19:08:52 +00:00
TRACE ( " sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER " , fromPeerAddress . toString ( ) . c_str ( ) , toPeerAddress . toString ( ) . c_str ( ) ) ;
2015-11-10 17:05:01 +00:00
_sendQueue - > enqueue ( now , fromPeerAddress , toPeerAddress , data , len , unite ) ;
2015-11-09 17:14:26 +00:00
return ;
2015-11-08 21:57:02 +00:00
}
2015-10-21 00:22:53 +00:00
}
2015-10-28 00:36:47 +00:00
Buffer < 1024 > buf ;
2015-10-27 22:57:26 +00:00
if ( unite ) {
InetAddress v4 , v6 ;
if ( fromPeerAddress ) {
2015-11-06 01:22:22 +00:00
SharedPtr < Peer > fromPeer ( RR - > topology - > getPeerNoCache ( fromPeerAddress ) ) ;
2015-10-27 22:57:26 +00:00
if ( fromPeer )
fromPeer - > getBestActiveAddresses ( now , v4 , v6 ) ;
}
uint8_t addrCount = 0 ;
if ( v4 )
+ + addrCount ;
if ( v6 )
+ + addrCount ;
if ( addrCount ) {
toPeerAddress . appendTo ( buf ) ;
fromPeerAddress . appendTo ( buf ) ;
buf . append ( addrCount ) ;
2015-10-27 21:04:12 +00:00
if ( v4 )
2015-10-27 22:57:26 +00:00
v4 . serialize ( buf ) ;
2015-10-27 21:04:12 +00:00
if ( v6 )
2015-10-27 22:57:26 +00:00
v6 . serialize ( buf ) ;
2015-10-21 00:22:53 +00:00
}
2015-10-27 22:57:26 +00:00
}
2015-11-08 21:57:02 +00:00
2015-10-27 22:57:26 +00:00
{
2015-11-08 21:57:02 +00:00
Mutex : : Lock _l2 ( _members [ mostRecentMemberId ] . lock ) ;
2015-11-10 17:05:01 +00:00
if ( buf . size ( ) > 0 )
2015-11-09 17:45:43 +00:00
_send ( mostRecentMemberId , CLUSTER_MESSAGE_PROXY_UNITE , buf . data ( ) , buf . size ( ) ) ;
2016-04-05 17:47:13 +00:00
for ( std : : vector < InetAddress > : : const_iterator i1 ( _zeroTierPhysicalEndpoints . begin ( ) ) ; i1 ! = _zeroTierPhysicalEndpoints . end ( ) ; + + i1 ) {
for ( std : : vector < InetAddress > : : const_iterator i2 ( _members [ mostRecentMemberId ] . zeroTierPhysicalEndpoints . begin ( ) ) ; i2 ! = _members [ mostRecentMemberId ] . zeroTierPhysicalEndpoints . end ( ) ; + + i2 ) {
if ( i1 - > ss_family = = i2 - > ss_family ) {
TRACE ( " sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s) " , len , fromPeerAddress . toString ( ) . c_str ( ) , toPeerAddress . toString ( ) . c_str ( ) , ( unsigned int ) mostRecentMemberId , i1 - > toString ( ) . c_str ( ) , i2 - > toString ( ) . c_str ( ) ) ;
RR - > node - > putPacket ( * i1 , * i2 , data , len ) ;
return ;
}
}
2015-11-09 19:08:52 +00:00
}
2016-04-05 17:47:13 +00:00
TRACE ( " sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family! " , len , fromPeerAddress . toString ( ) . c_str ( ) , toPeerAddress . toString ( ) . c_str ( ) , ( unsigned int ) mostRecentMemberId ) ;
return ;
2015-10-27 22:57:26 +00:00
}
2015-10-21 00:22:53 +00:00
}
2015-11-08 21:57:02 +00:00
void Cluster : : sendDistributedQuery ( const Packet & pkt )
2015-10-14 21:12:12 +00:00
{
2015-10-28 00:36:47 +00:00
Buffer < 4096 > buf ;
2015-11-08 21:57:02 +00:00
buf . append ( ( uint16_t ) pkt . size ( ) ) ;
buf . append ( pkt . data ( ) , pkt . size ( ) ) ;
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
_send ( * mid , CLUSTER_MESSAGE_REMOTE_PACKET , buf . data ( ) , buf . size ( ) ) ;
2015-10-20 23:24:21 +00:00
}
2015-10-14 21:12:12 +00:00
}
void Cluster : : doPeriodicTasks ( )
{
2015-10-20 22:27:53 +00:00
const uint64_t now = RR - > node - > now ( ) ;
2015-11-08 21:57:02 +00:00
2015-11-02 23:15:20 +00:00
if ( ( now - _lastFlushed ) > = ZT_CLUSTER_FLUSH_PERIOD ) {
_lastFlushed = now ;
2015-11-08 21:57:02 +00:00
2015-10-14 21:12:12 +00:00
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
Mutex : : Lock _l2 ( _members [ * mid ] . lock ) ;
2015-10-20 22:27:53 +00:00
if ( ( now - _members [ * mid ] . lastAnnouncedAliveTo ) > = ( ( ZT_CLUSTER_TIMEOUT / 2 ) - 1000 ) ) {
2015-11-08 21:57:02 +00:00
_members [ * mid ] . lastAnnouncedAliveTo = now ;
2015-10-20 22:27:53 +00:00
Buffer < 2048 > alive ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_MAJOR ) ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_MINOR ) ;
alive . append ( ( uint16_t ) ZEROTIER_ONE_VERSION_REVISION ) ;
alive . append ( ( uint8_t ) ZT_PROTO_VERSION ) ;
if ( _addressToLocationFunction ) {
alive . append ( ( int32_t ) _x ) ;
alive . append ( ( int32_t ) _y ) ;
alive . append ( ( int32_t ) _z ) ;
} else {
alive . append ( ( int32_t ) 0 ) ;
alive . append ( ( int32_t ) 0 ) ;
alive . append ( ( int32_t ) 0 ) ;
}
alive . append ( ( uint64_t ) now ) ;
alive . append ( ( uint64_t ) 0 ) ; // TODO: compute and send load average
2015-11-13 20:14:28 +00:00
alive . append ( ( uint64_t ) RR - > topology - > countActive ( now ) ) ;
2015-10-20 22:27:53 +00:00
alive . append ( ( uint64_t ) 0 ) ; // unused/reserved flags
alive . append ( ( uint8_t ) _zeroTierPhysicalEndpoints . size ( ) ) ;
for ( std : : vector < InetAddress > : : const_iterator pe ( _zeroTierPhysicalEndpoints . begin ( ) ) ; pe ! = _zeroTierPhysicalEndpoints . end ( ) ; + + pe )
pe - > serialize ( alive ) ;
2015-11-08 21:57:02 +00:00
_send ( * mid , CLUSTER_MESSAGE_ALIVE , alive . data ( ) , alive . size ( ) ) ;
2015-10-20 22:27:53 +00:00
}
2015-11-09 17:45:43 +00:00
_flush ( * mid ) ;
2015-10-14 21:12:12 +00:00
}
}
2015-11-08 21:57:02 +00:00
if ( ( now - _lastCleanedRemotePeers ) > = ( ZT_PEER_ACTIVITY_TIMEOUT * 2 ) ) {
_lastCleanedRemotePeers = now ;
Mutex : : Lock _l ( _remotePeers_m ) ;
for ( std : : map < std : : pair < Address , unsigned int > , uint64_t > : : iterator rp ( _remotePeers . begin ( ) ) ; rp ! = _remotePeers . end ( ) ; ) {
if ( ( now - rp - > second ) > = ZT_PEER_ACTIVITY_TIMEOUT )
_remotePeers . erase ( rp + + ) ;
else + + rp ;
}
}
2015-11-10 17:05:01 +00:00
if ( ( now - _lastCleanedQueue ) > = ZT_CLUSTER_QUEUE_EXPIRATION ) {
_lastCleanedQueue = now ;
_sendQueue - > expire ( now ) ;
}
2015-10-14 21:12:12 +00:00
}
void Cluster : : addMember ( uint16_t memberId )
{
2015-10-22 23:02:01 +00:00
if ( ( memberId > = ZT_CLUSTER_MAX_MEMBERS ) | | ( memberId = = _id ) )
2015-10-20 22:27:53 +00:00
return ;
2015-10-14 21:12:12 +00:00
Mutex : : Lock _l2 ( _members [ memberId ] . lock ) ;
2015-10-20 22:27:53 +00:00
{
Mutex : : Lock _l ( _memberIds_m ) ;
if ( std : : find ( _memberIds . begin ( ) , _memberIds . end ( ) , memberId ) ! = _memberIds . end ( ) )
return ;
_memberIds . push_back ( memberId ) ;
std : : sort ( _memberIds . begin ( ) , _memberIds . end ( ) ) ;
}
_members [ memberId ] . clear ( ) ;
2015-10-14 21:12:12 +00:00
// Generate this member's message key from the master and its ID
uint16_t stmp [ ZT_SHA512_DIGEST_LEN / sizeof ( uint16_t ) ] ;
memcpy ( stmp , _masterSecret , sizeof ( stmp ) ) ;
stmp [ 0 ] ^ = Utils : : hton ( memberId ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
SHA512 : : hash ( stmp , stmp , sizeof ( stmp ) ) ;
memcpy ( _members [ memberId ] . key , stmp , sizeof ( _members [ memberId ] . key ) ) ;
Utils : : burn ( stmp , sizeof ( stmp ) ) ;
// Prepare q
_members [ memberId ] . q . clear ( ) ;
char iv [ 16 ] ;
Utils : : getSecureRandom ( iv , 16 ) ;
_members [ memberId ] . q . append ( iv , 16 ) ;
_members [ memberId ] . q . addSize ( 8 ) ; // room for MAC
2015-10-14 22:49:41 +00:00
_members [ memberId ] . q . append ( ( uint16_t ) _id ) ;
2015-10-20 22:27:53 +00:00
_members [ memberId ] . q . append ( ( uint16_t ) memberId ) ;
}
void Cluster : : removeMember ( uint16_t memberId )
{
Mutex : : Lock _l ( _memberIds_m ) ;
std : : vector < uint16_t > newMemberIds ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
if ( * mid ! = memberId )
newMemberIds . push_back ( * mid ) ;
}
_memberIds = newMemberIds ;
}
2015-10-27 21:37:38 +00:00
bool Cluster : : findBetterEndpoint ( InetAddress & redirectTo , const Address & peerAddress , const InetAddress & peerPhysicalAddress , bool offload )
2015-10-20 22:27:53 +00:00
{
if ( _addressToLocationFunction ) {
// Pick based on location if it can be determined
int px = 0 , py = 0 , pz = 0 ;
if ( _addressToLocationFunction ( _addressToLocationFunctionArg , reinterpret_cast < const struct sockaddr_storage * > ( & peerPhysicalAddress ) , & px , & py , & pz ) = = 0 ) {
2015-10-27 16:36:48 +00:00
TRACE ( " no geolocation data for %s (geo-lookup is lazy/async so it may work next time) " , peerPhysicalAddress . toIpString ( ) . c_str ( ) ) ;
2015-10-27 21:37:38 +00:00
return false ;
2015-10-20 22:27:53 +00:00
}
// Find member closest to this peer
const uint64_t now = RR - > node - > now ( ) ;
2015-10-28 00:36:47 +00:00
std : : vector < InetAddress > best ;
2015-10-20 22:27:53 +00:00
const double currentDistance = _dist3d ( _x , _y , _z , px , py , pz ) ;
double bestDistance = ( offload ? 2147483648.0 : currentDistance ) ;
unsigned int bestMember = _id ;
{
Mutex : : Lock _l ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
_Member & m = _members [ * mid ] ;
Mutex : : Lock _ml ( m . lock ) ;
// Consider member if it's alive and has sent us a location and one or more physical endpoints to send peers to
if ( ( ( now - m . lastReceivedAliveAnnouncement ) < ZT_CLUSTER_TIMEOUT ) & & ( ( m . x ! = 0 ) | | ( m . y ! = 0 ) | | ( m . z ! = 0 ) ) & & ( m . zeroTierPhysicalEndpoints . size ( ) > 0 ) ) {
2015-10-28 00:36:47 +00:00
const double mdist = _dist3d ( m . x , m . y , m . z , px , py , pz ) ;
2015-10-20 22:27:53 +00:00
if ( mdist < bestDistance ) {
2015-10-23 20:03:34 +00:00
bestDistance = mdist ;
2015-10-20 22:27:53 +00:00
bestMember = * mid ;
best = m . zeroTierPhysicalEndpoints ;
}
}
}
}
2015-10-28 00:36:47 +00:00
// Redirect to a closer member if it has a ZeroTier endpoint address in the same ss_family
2015-10-27 16:36:48 +00:00
for ( std : : vector < InetAddress > : : const_iterator a ( best . begin ( ) ) ; a ! = best . end ( ) ; + + a ) {
if ( a - > ss_family = = peerPhysicalAddress . ss_family ) {
TRACE ( " %s at [%d,%d,%d] is %f from us but %f from %u, can redirect to %s " , peerAddress . toString ( ) . c_str ( ) , px , py , pz , currentDistance , bestDistance , bestMember , a - > toString ( ) . c_str ( ) ) ;
2015-10-27 21:37:38 +00:00
redirectTo = * a ;
return true ;
2015-10-27 16:36:48 +00:00
}
2015-10-20 22:27:53 +00:00
}
2015-10-27 16:36:48 +00:00
TRACE ( " %s at [%d,%d,%d] is %f from us, no better endpoints found " , peerAddress . toString ( ) . c_str ( ) , px , py , pz , currentDistance ) ;
2015-10-27 21:37:38 +00:00
return false ;
2015-10-20 22:27:53 +00:00
} else {
// TODO: pick based on load if no location info?
2015-10-27 21:37:38 +00:00
return false ;
2015-10-20 22:27:53 +00:00
}
2015-10-14 21:12:12 +00:00
}
2015-10-26 19:41:08 +00:00
void Cluster : : status ( ZT_ClusterStatus & status ) const
{
const uint64_t now = RR - > node - > now ( ) ;
memset ( & status , 0 , sizeof ( ZT_ClusterStatus ) ) ;
status . myId = _id ;
2015-11-09 17:14:26 +00:00
{
ZT_ClusterMemberStatus * const s = & ( status . members [ status . clusterSize + + ] ) ;
s - > id = _id ;
s - > alive = 1 ;
s - > x = _x ;
s - > y = _y ;
s - > z = _z ;
s - > load = 0 ; // TODO
2015-11-13 20:14:28 +00:00
s - > peers = RR - > topology - > countActive ( now ) ;
2015-11-09 17:14:26 +00:00
for ( std : : vector < InetAddress > : : const_iterator ep ( _zeroTierPhysicalEndpoints . begin ( ) ) ; ep ! = _zeroTierPhysicalEndpoints . end ( ) ; + + ep ) {
if ( s - > numZeroTierPhysicalEndpoints > = ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES ) // sanity check
break ;
memcpy ( & ( s - > zeroTierPhysicalEndpoints [ s - > numZeroTierPhysicalEndpoints + + ] ) , & ( * ep ) , sizeof ( struct sockaddr_storage ) ) ;
}
2015-10-26 19:41:08 +00:00
}
{
Mutex : : Lock _l1 ( _memberIds_m ) ;
for ( std : : vector < uint16_t > : : const_iterator mid ( _memberIds . begin ( ) ) ; mid ! = _memberIds . end ( ) ; + + mid ) {
if ( status . clusterSize > = ZT_CLUSTER_MAX_MEMBERS ) // sanity check
break ;
2015-11-07 00:12:41 +00:00
2015-10-26 19:41:08 +00:00
_Member & m = _members [ * mid ] ;
Mutex : : Lock ml ( m . lock ) ;
2015-11-07 00:12:41 +00:00
ZT_ClusterMemberStatus * const s = & ( status . members [ status . clusterSize + + ] ) ;
2015-10-26 19:41:08 +00:00
s - > id = * mid ;
s - > msSinceLastHeartbeat = ( unsigned int ) std : : min ( ( uint64_t ) ( ~ ( ( unsigned int ) 0 ) ) , ( now - m . lastReceivedAliveAnnouncement ) ) ;
s - > alive = ( s - > msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT ) ? 1 : 0 ;
s - > x = m . x ;
s - > y = m . y ;
s - > z = m . z ;
s - > load = m . load ;
2015-11-07 00:12:41 +00:00
s - > peers = m . peers ;
2015-10-26 19:41:08 +00:00
for ( std : : vector < InetAddress > : : const_iterator ep ( m . zeroTierPhysicalEndpoints . begin ( ) ) ; ep ! = m . zeroTierPhysicalEndpoints . end ( ) ; + + ep ) {
if ( s - > numZeroTierPhysicalEndpoints > = ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES ) // sanity check
break ;
memcpy ( & ( s - > zeroTierPhysicalEndpoints [ s - > numZeroTierPhysicalEndpoints + + ] ) , & ( * ep ) , sizeof ( struct sockaddr_storage ) ) ;
}
}
}
}
2015-10-20 23:24:21 +00:00
void Cluster : : _send ( uint16_t memberId , StateMessageType type , const void * msg , unsigned int len )
2015-10-14 21:12:12 +00:00
{
2015-10-22 23:02:01 +00:00
if ( ( len + 3 ) > ( ZT_CLUSTER_MAX_MESSAGE_LENGTH - ( 24 + 2 + 2 ) ) ) // sanity check
return ;
2015-10-14 21:12:12 +00:00
_Member & m = _members [ memberId ] ;
// assumes m.lock is locked!
2015-10-20 23:24:21 +00:00
if ( ( m . q . size ( ) + len + 3 ) > ZT_CLUSTER_MAX_MESSAGE_LENGTH )
_flush ( memberId ) ;
m . q . append ( ( uint16_t ) ( len + 1 ) ) ;
m . q . append ( ( uint8_t ) type ) ;
m . q . append ( msg , len ) ;
2015-10-14 21:12:12 +00:00
}
void Cluster : : _flush ( uint16_t memberId )
{
_Member & m = _members [ memberId ] ;
// assumes m.lock is locked!
2015-10-20 22:27:53 +00:00
if ( m . q . size ( ) > ( 24 + 2 + 2 ) ) { // 16-byte IV + 8-byte MAC + 2 byte from-member-ID + 2 byte to-member-ID
2015-10-14 21:12:12 +00:00
// Create key from member's key and IV
char keytmp [ 32 ] ;
memcpy ( keytmp , m . key , 32 ) ;
for ( int i = 0 ; i < 8 ; + + i )
keytmp [ i ] ^ = m . q [ i ] ;
Salsa20 s20 ( keytmp , 256 , m . q . field ( 8 , 8 ) ) ;
Utils : : burn ( keytmp , sizeof ( keytmp ) ) ;
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey [ ZT_POLY1305_KEY_LEN ] ;
memset ( polykey , 0 , sizeof ( polykey ) ) ;
s20 . encrypt12 ( polykey , polykey , sizeof ( polykey ) ) ;
// Encrypt m.q in place
s20 . encrypt12 ( reinterpret_cast < const char * > ( m . q . data ( ) ) + 24 , const_cast < char * > ( reinterpret_cast < const char * > ( m . q . data ( ) ) ) + 24 , m . q . size ( ) - 24 ) ;
// Add MAC for authentication (encrypt-then-MAC)
char mac [ ZT_POLY1305_MAC_LEN ] ;
Poly1305 : : compute ( mac , reinterpret_cast < const char * > ( m . q . data ( ) ) + 24 , m . q . size ( ) - 24 , polykey ) ;
memcpy ( m . q . field ( 16 , 8 ) , mac , 8 ) ;
// Send!
2015-10-20 22:27:53 +00:00
_sendFunction ( _sendFunctionArg , memberId , m . q . data ( ) , m . q . size ( ) ) ;
2015-10-14 21:12:12 +00:00
// Prepare for more
m . q . clear ( ) ;
char iv [ 16 ] ;
Utils : : getSecureRandom ( iv , 16 ) ;
m . q . append ( iv , 16 ) ;
m . q . addSize ( 8 ) ; // room for MAC
2015-10-20 22:27:53 +00:00
m . q . append ( ( uint16_t ) _id ) ; // from member ID
m . q . append ( ( uint16_t ) memberId ) ; // to member ID
2015-10-14 21:12:12 +00:00
}
}
2015-11-09 17:45:43 +00:00
void Cluster : : _doREMOTE_WHOIS ( uint64_t fromMemberId , const Packet & remotep )
{
if ( remotep . payloadLength ( ) > = ZT_ADDRESS_LENGTH ) {
Identity queried ( RR - > topology - > getIdentity ( Address ( remotep . payload ( ) , ZT_ADDRESS_LENGTH ) ) ) ;
if ( queried ) {
Buffer < 1024 > routp ;
remotep . source ( ) . appendTo ( routp ) ;
routp . append ( ( uint8_t ) Packet : : VERB_OK ) ;
2015-11-09 18:25:20 +00:00
routp . addSize ( 2 ) ; // space for length
2015-11-09 17:45:43 +00:00
routp . append ( ( uint8_t ) Packet : : VERB_WHOIS ) ;
routp . append ( remotep . packetId ( ) ) ;
queried . serialize ( routp ) ;
2015-11-09 19:08:52 +00:00
routp . setAt < uint16_t > ( ZT_ADDRESS_LENGTH + 1 , ( uint16_t ) ( routp . size ( ) - ZT_ADDRESS_LENGTH - 3 ) ) ;
2015-11-09 17:45:43 +00:00
2015-11-09 19:08:52 +00:00
TRACE ( " responding to remote WHOIS from %s @ %u with identity of %s " , remotep . source ( ) . toString ( ) . c_str ( ) , ( unsigned int ) fromMemberId , queried . address ( ) . toString ( ) . c_str ( ) ) ;
2015-11-09 17:45:43 +00:00
Mutex : : Lock _l2 ( _members [ fromMemberId ] . lock ) ;
_send ( fromMemberId , CLUSTER_MESSAGE_PROXY_SEND , routp . data ( ) , routp . size ( ) ) ;
}
}
}
void Cluster : : _doREMOTE_MULTICAST_GATHER ( uint64_t fromMemberId , const Packet & remotep )
{
const uint64_t nwid = remotep . at < uint64_t > ( ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID ) ;
const MulticastGroup mg ( MAC ( remotep . field ( ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC , 6 ) , 6 ) , remotep . at < uint32_t > ( ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI ) ) ;
unsigned int gatherLimit = remotep . at < uint32_t > ( ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT ) ;
const Address remotePeerAddress ( remotep . source ( ) ) ;
if ( gatherLimit ) {
Buffer < ZT_PROTO_MAX_PACKET_LENGTH > routp ;
remotePeerAddress . appendTo ( routp ) ;
routp . append ( ( uint8_t ) Packet : : VERB_OK ) ;
2015-11-09 18:25:20 +00:00
routp . addSize ( 2 ) ; // space for length
2015-11-09 17:45:43 +00:00
routp . append ( ( uint8_t ) Packet : : VERB_MULTICAST_GATHER ) ;
routp . append ( remotep . packetId ( ) ) ;
routp . append ( nwid ) ;
mg . mac ( ) . appendTo ( routp ) ;
routp . append ( ( uint32_t ) mg . adi ( ) ) ;
2015-11-09 18:25:20 +00:00
if ( gatherLimit > ( ( ZT_CLUSTER_MAX_MESSAGE_LENGTH - 80 ) / 5 ) )
gatherLimit = ( ( ZT_CLUSTER_MAX_MESSAGE_LENGTH - 80 ) / 5 ) ;
2015-11-09 17:45:43 +00:00
if ( RR - > mc - > gather ( remotePeerAddress , nwid , mg , routp , gatherLimit ) ) {
2015-11-09 18:25:20 +00:00
routp . setAt < uint16_t > ( ZT_ADDRESS_LENGTH + 1 , ( uint16_t ) ( routp . size ( ) - ZT_ADDRESS_LENGTH - 3 ) ) ;
2015-11-09 19:08:52 +00:00
TRACE ( " responding to remote MULTICAST_GATHER from %s @ %u with %u bytes " , remotePeerAddress . toString ( ) . c_str ( ) , ( unsigned int ) fromMemberId , routp . size ( ) ) ;
2015-11-09 17:45:43 +00:00
Mutex : : Lock _l2 ( _members [ fromMemberId ] . lock ) ;
_send ( fromMemberId , CLUSTER_MESSAGE_PROXY_SEND , routp . data ( ) , routp . size ( ) ) ;
}
}
}
2015-10-14 21:12:12 +00:00
} // namespace ZeroTier
# endif // ZT_ENABLE_CLUSTER