Remove a whole bunch of now-unnecessary cruft from Topology and PacketDecoder.

This commit is contained in:
Adam Ierymenko 2013-10-05 10:19:12 -04:00
parent 0e43e5e8f2
commit 4267e7da93
11 changed files with 120 additions and 376 deletions

View File

@ -198,6 +198,11 @@ error_no_ZT_ARCH_defined;
*/
#define ZT_MAC_FIRST_OCTET 0x32
/**
* Length of secret key in bytes
*/
#define ZT_PEER_SECRET_KEY_LENGTH 32
/**
* How often Topology::clean() and Network::clean() are called in ms
*/

View File

@ -36,13 +36,12 @@
#include "Salsa20.hpp"
#include "Utils.hpp"
// Mask for second byte in hashcash criterion -- making it require
// 13 0 bits at the start of the hash.
#define ZT_IDENTITY_SHA_BYTE1_MASK 0xf8
namespace ZeroTier {
/*
* This is the hashcash criterion
*/
struct _Identity_generate_cond
{
_Identity_generate_cond() throw() {}
@ -80,6 +79,8 @@ void Identity::generate()
bool Identity::locallyValidate() const
{
if (_address.isReserved())
return false;
char sha512buf[64];
char addrb[5];
_address.copyTo(addrb,5);

View File

@ -39,7 +39,7 @@
#include "C25519.hpp"
#include "Buffer.hpp"
#define ZT_IDENTITY_MAX_BINARY_SERIALIZED_LENGTH (ZT_ADDRESS_LENGTH + 1 + ZT_C25519_PUBLIC_KEY_LEN + ZT_C25519_SIGNATURE_LEN + 1 + ZT_C25519_PRIVATE_KEY_LEN)
#define ZT_IDENTITY_MAX_BINARY_SERIALIZED_LENGTH (ZT_ADDRESS_LENGTH + 1 + ZT_C25519_PUBLIC_KEY_LEN + 1 + ZT_C25519_PRIVATE_KEY_LEN)
namespace ZeroTier {

View File

@ -61,7 +61,6 @@ const char *Packet::errorString(ErrorCode e)
case ERROR_BAD_PROTOCOL_VERSION: return "BAD_PROTOCOL_VERSION";
case ERROR_OBJ_NOT_FOUND: return "OBJECT_NOT_FOUND";
case ERROR_IDENTITY_COLLISION: return "IDENTITY_COLLISION";
case ERROR_IDENTITY_INVALID: return "IDENTITY_INVALID";
case ERROR_UNSUPPORTED_OPERATION: return "UNSUPPORTED_OPERATION";
case ERROR_NO_MEMBER_CERTIFICATE: return "NO_MEMBER_CERTIFICATE";
}

View File

@ -51,11 +51,13 @@
* 2 - 0.3.0 ... 0.4.5
* * Added signature and originating peer to multicast frame
* * Double size of multicast frame bloom filter
* 3 - 0.5.0 ...
* 3 - 0.5.0 ... 0.6.0
* * Yet another multicast redesign
* * New crypto completely changes key agreement cipher
* 4 - 0.6.0 ...
* * New identity format based on hashcash design
*/
#define ZT_PROTO_VERSION 3
#define ZT_PROTO_VERSION 4
/**
* Maximum hop count allowed by packet structure (3 bits, 0-7)
@ -620,14 +622,11 @@ public:
/* HELLO pushed an identity whose address is already claimed */
ERROR_IDENTITY_COLLISION = 4,
/* Identity was not valid */
ERROR_IDENTITY_INVALID = 5,
/* Verb or use case not supported/enabled by this node */
ERROR_UNSUPPORTED_OPERATION = 6,
ERROR_UNSUPPORTED_OPERATION = 5,
/* Message to private network rejected -- no unexpired certificate on file */
ERROR_NO_MEMBER_CERTIFICATE = 7
ERROR_NO_MEMBER_CERTIFICATE = 6
};
/**

View File

@ -124,73 +124,6 @@ bool PacketDecoder::tryDecode(const RuntimeEnvironment *_r)
}
}
void PacketDecoder::_CBaddPeerFromHello(void *arg,const SharedPtr<Peer> &p,Topology::PeerVerifyResult result)
{
_CBaddPeerFromHello_Data *req = (_CBaddPeerFromHello_Data *)arg;
const RuntimeEnvironment *_r = req->renv;
try {
switch(result) {
case Topology::PEER_VERIFY_ACCEPTED_NEW:
case Topology::PEER_VERIFY_ACCEPTED_ALREADY_HAVE:
case Topology::PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS: {
_r->sw->doAnythingWaitingForPeer(p);
Packet outp(req->source,_r->identity.address(),Packet::VERB_OK);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(req->helloPacketId);
outp.append(req->helloTimestamp);
outp.append((unsigned char)ZT_PROTO_VERSION);
outp.append((unsigned char)ZEROTIER_ONE_VERSION_MAJOR);
outp.append((unsigned char)ZEROTIER_ONE_VERSION_MINOR);
outp.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
outp.armor(p->key(),true);
_r->demarc->send(req->localPort,req->remoteAddress,outp.data(),outp.size(),-1);
} break;
case Topology::PEER_VERIFY_REJECTED_INVALID_IDENTITY: {
Packet outp(req->source,_r->identity.address(),Packet::VERB_ERROR);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(req->helloPacketId);
outp.append((unsigned char)Packet::ERROR_IDENTITY_INVALID);
outp.armor(p->key(),true);
_r->demarc->send(req->localPort,req->remoteAddress,outp.data(),outp.size(),-1);
} break;
case Topology::PEER_VERIFY_REJECTED_DUPLICATE:
case Topology::PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED: {
Packet outp(req->source,_r->identity.address(),Packet::VERB_ERROR);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(req->helloPacketId);
outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION);
outp.armor(p->key(),true);
_r->demarc->send(req->localPort,req->remoteAddress,outp.data(),outp.size(),-1);
} break;
}
} catch ( ... ) {
TRACE("unexpected exception in addPeer() result callback for peer received via HELLO");
}
delete req;
}
void PacketDecoder::_CBaddPeerFromWhois(void *arg,const SharedPtr<Peer> &p,Topology::PeerVerifyResult result)
{
const RuntimeEnvironment *_r = (const RuntimeEnvironment *)arg;
try {
switch(result) {
case Topology::PEER_VERIFY_ACCEPTED_NEW:
case Topology::PEER_VERIFY_ACCEPTED_ALREADY_HAVE:
case Topology::PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS:
_r->sw->doAnythingWaitingForPeer(p);
break;
default:
break;
}
} catch ( ... ) {
TRACE("unexpected exception in addPeer() result callback for peer received via OK(WHOIS)");
}
}
bool PacketDecoder::_doERROR(const RuntimeEnvironment *_r,const SharedPtr<Peer> &peer)
{
try {
@ -205,7 +138,6 @@ bool PacketDecoder::_doERROR(const RuntimeEnvironment *_r,const SharedPtr<Peer>
}
break;
case Packet::ERROR_IDENTITY_COLLISION:
case Packet::ERROR_IDENTITY_INVALID:
// TODO: if it comes from a supernode, regenerate a new identity
break;
case Packet::ERROR_NO_MEMBER_CERTIFICATE:
@ -225,67 +157,59 @@ bool PacketDecoder::_doERROR(const RuntimeEnvironment *_r,const SharedPtr<Peer>
bool PacketDecoder::_doHELLO(const RuntimeEnvironment *_r)
{
try {
//unsigned int protoVersion = (*this)[ZT_PROTO_VERB_HELLO_IDX_PROTOCOL_VERSION];
unsigned int protoVersion = (*this)[ZT_PROTO_VERB_HELLO_IDX_PROTOCOL_VERSION];
unsigned int vMajor = (*this)[ZT_PROTO_VERB_HELLO_IDX_MAJOR_VERSION];
unsigned int vMinor = (*this)[ZT_PROTO_VERB_HELLO_IDX_MINOR_VERSION];
unsigned int vRevision = at<uint16_t>(ZT_PROTO_VERB_HELLO_IDX_REVISION);
uint64_t timestamp = at<uint64_t>(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP);
Identity id(*this,ZT_PROTO_VERB_HELLO_IDX_IDENTITY);
// Initial sniff test for valid addressing and that this is indeed the
// submitter's identity.
if ((id.address().isReserved())||(id.address() != source())) {
#ifdef ZT_TRACE
if (id.address().isReserved()) {
TRACE("dropped HELLO from %s(%s): identity has reserved address",source().toString().c_str(),_remoteAddress.toString().c_str());
} else {
TRACE("dropped HELLO from %s(%s): identity is not for sender of packet (HELLO is a self-announcement)",source().toString().c_str(),_remoteAddress.toString().c_str());
if (protoVersion != ZT_PROTO_VERSION) {
TRACE("dropped HELLO from %s(%s): protocol version mismatch (%u, expected %u)",source().toString().c_str(),_remoteAddress.toString().c_str(),protoVersion,(unsigned int)ZT_PROTO_VERSION);
return true;
}
if (!id.locallyValidate()) {
TRACE("dropped HELLO from %s(%s): identity invalid",source().toString().c_str(),_remoteAddress.toString().c_str());
return true;
}
SharedPtr<Peer> peer(_r->topology->getPeer(id.address()));
if (peer) {
if (peer->identity() != id) {
unsigned char key[ZT_PEER_SECRET_KEY_LENGTH];
if (_r->identity.agree(id,key,ZT_PEER_SECRET_KEY_LENGTH)) {
TRACE("rejected HELLO from %s(%s): address already claimed",source().toString().c_str(),_remoteAddress.toString().c_str());
Packet outp(source(),_r->identity.address(),Packet::VERB_ERROR);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(packetId());
outp.append((unsigned char)Packet::ERROR_IDENTITY_COLLISION);
outp.armor(key,true);
_r->demarc->send(_localPort,_remoteAddress,outp.data(),outp.size(),-1);
}
return true;
}
#endif
return true;
}
} else peer = _r->topology->addPeer(SharedPtr<Peer>(new Peer(_r->identity,id)));
// Is this a HELLO for a peer we already know? If so just update its
// packet receive stats and send an OK.
SharedPtr<Peer> existingPeer(_r->topology->getPeer(id.address()));
if ((existingPeer)&&(existingPeer->identity() == id)) {
existingPeer->onReceive(_r,_localPort,_remoteAddress,hops(),Packet::VERB_HELLO,Utils::now());
existingPeer->setRemoteVersion(vMajor,vMinor,vRevision);
peer->onReceive(_r,_localPort,_remoteAddress,hops(),Packet::VERB_HELLO,Utils::now());
peer->setRemoteVersion(vMajor,vMinor,vRevision);
Packet outp(source(),_r->identity.address(),Packet::VERB_OK);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(packetId());
outp.append(timestamp);
outp.append((unsigned char)ZT_PROTO_VERSION);
outp.append((unsigned char)ZEROTIER_ONE_VERSION_MAJOR);
outp.append((unsigned char)ZEROTIER_ONE_VERSION_MINOR);
outp.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
outp.armor(existingPeer->key(),true);
_r->demarc->send(_localPort,_remoteAddress,outp.data(),outp.size(),-1);
return true;
}
SharedPtr<Peer> candidate(new Peer(_r->identity,id));
candidate->setPathAddress(_remoteAddress,false);
candidate->setRemoteVersion(vMajor,vMinor,vRevision);
_CBaddPeerFromHello_Data *arg = new _CBaddPeerFromHello_Data;
arg->renv = _r;
arg->source = source();
arg->remoteAddress = _remoteAddress;
arg->localPort = _localPort;
arg->vMajor = vMajor;
arg->vMinor = vMinor;
arg->vRevision = vRevision;
arg->helloPacketId = packetId();
arg->helloTimestamp = timestamp;
_r->topology->addPeer(candidate,&PacketDecoder::_CBaddPeerFromHello,arg);
Packet outp(source(),_r->identity.address(),Packet::VERB_OK);
outp.append((unsigned char)Packet::VERB_HELLO);
outp.append(packetId());
outp.append(timestamp);
outp.append((unsigned char)ZT_PROTO_VERSION);
outp.append((unsigned char)ZEROTIER_ONE_VERSION_MAJOR);
outp.append((unsigned char)ZEROTIER_ONE_VERSION_MINOR);
outp.append((uint16_t)ZEROTIER_ONE_VERSION_REVISION);
outp.armor(peer->key(),true);
_r->demarc->send(_localPort,_remoteAddress,outp.data(),outp.size(),-1);
} catch (std::exception &ex) {
TRACE("dropped HELLO from %s(%s): %s",source().toString().c_str(),_remoteAddress.toString().c_str(),ex.what());
} catch ( ... ) {
TRACE("dropped HELLO from %s(%s): unexpected exception",source().toString().c_str(),_remoteAddress.toString().c_str());
}
return true;
}
@ -305,12 +229,13 @@ bool PacketDecoder::_doOK(const RuntimeEnvironment *_r,const SharedPtr<Peer> &pe
peer->setRemoteVersion(vMajor,vMinor,vRevision);
} break;
case Packet::VERB_WHOIS: {
TRACE("%s(%s): OK(%s)",source().toString().c_str(),_remoteAddress.toString().c_str(),Packet::verbString(inReVerb));
// Right now only supernodes are allowed to send OK(WHOIS) to prevent
// poisoning attacks. Further decentralization will require some other
// kind of trust mechanism.
if (_r->topology->isSupernode(source())) {
// Right now, only supernodes are queried for WHOIS so we only
// accept OK(WHOIS) from supernodes. Otherwise peers could
// potentially cache-poison.
_r->topology->addPeer(SharedPtr<Peer>(new Peer(_r->identity,Identity(*this,ZT_PROTO_VERB_WHOIS__OK__IDX_IDENTITY))),&PacketDecoder::_CBaddPeerFromWhois,const_cast<void *>((const void *)_r));
Identity id(*this,ZT_PROTO_VERB_WHOIS__OK__IDX_IDENTITY);
if (id.locallyValidate())
_r->sw->doAnythingWaitingForPeer(_r->topology->addPeer(SharedPtr<Peer>(new Peer(_r->identity,id))));
}
} break;
case Packet::VERB_NETWORK_CONFIG_REQUEST: {

View File

@ -109,26 +109,6 @@ public:
inline uint64_t receiveTime() const throw() { return _receiveTime; }
private:
struct _CBaddPeerFromHello_Data
{
const RuntimeEnvironment *renv;
Address source;
InetAddress remoteAddress;
Demarc::Port localPort;
unsigned int vMajor,vMinor,vRevision;
uint64_t helloPacketId;
uint64_t helloTimestamp;
};
static void _CBaddPeerFromHello(
void *arg, // _CBaddPeerFromHello_Data
const SharedPtr<Peer> &p,
Topology::PeerVerifyResult result);
static void _CBaddPeerFromWhois(
void *arg, // RuntimeEnvironment
const SharedPtr<Peer> &p,
Topology::PeerVerifyResult result);
// These are called internally to handle packet contents once it has
// been authenticated, decrypted, decompressed, and classified.
bool _doERROR(const RuntimeEnvironment *_r,const SharedPtr<Peer> &peer);

View File

@ -55,7 +55,7 @@ Peer::Peer(const Identity &myIdentity,const Identity &peerIdentity)
_vRevision(0),
_dirty(true)
{
if (!myIdentity.agree(peerIdentity,_key,sizeof(_key)))
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
throw std::runtime_error("new peer identity key agreement failed");
}

View File

@ -34,10 +34,10 @@
#include <utility>
#include <stdexcept>
#include "Constants.hpp"
#include "Address.hpp"
#include "Utils.hpp"
#include "Identity.hpp"
#include "Constants.hpp"
#include "Logger.hpp"
#include "Demarc.hpp"
#include "RuntimeEnvironment.hpp"
@ -52,7 +52,7 @@
* Max length of serialized peer record
*/
#define ZT_PEER_MAX_SERIALIZED_LENGTH ( \
32 + \
ZT_PEER_SECRET_KEY_LENGTH + \
ZT_IDENTITY_MAX_BINARY_SERIALIZED_LENGTH + \
( ( \
(sizeof(uint64_t) * 4) + \
@ -532,7 +532,7 @@ private:
bool fixed; // do not learn address from received packets
};
unsigned char _key[32]; // shared secret key agreed upon between identities
unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH];
Identity _id;
WanPath _ipv4p;

View File

@ -54,22 +54,12 @@ Topology::Topology(const RuntimeEnvironment *renv,const char *dbpath)
}
Utils::lockDownFile(dbpath,false); // node.db caches secrets
_thread = Thread::start(this);
}
Topology::~Topology()
{
{
Mutex::Lock _l(_peerDeepVerifyJobs_m);
_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
_peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE;
_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
_peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::EXIT_THREAD;
}
_peerDeepVerifyJobs_c.signal();
Thread::join(_thread);
KISSDB_close(&_dbm);
// Flush last changes to disk
clean();
}
void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> > &sn)
@ -83,10 +73,8 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> >
for(std::map< Identity,std::vector<InetAddress> >::const_iterator i(sn.begin());i!=sn.end();++i) {
if (i->first != _r->identity) {
SharedPtr<Peer> p(getPeer(i->first.address()));
if ((!p)||(p->identity() != i->first)) {
p = SharedPtr<Peer>(new Peer(_r->identity,i->first));
_reallyAddPeer(p);
}
if (!p)
p = addPeer(SharedPtr<Peer>(new Peer(_r->identity,i->first)));
for(std::vector<InetAddress>::const_iterator j(i->second.begin());j!=i->second.end();++j)
p->setPathAddress(*j,true);
_supernodePeers.push_back(p);
@ -97,22 +85,33 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> >
_amSupernode = (_supernodes.find(_r->identity) != _supernodes.end());
}
void Topology::addPeer(const SharedPtr<Peer> &candidate,void (*callback)(void *,const SharedPtr<Peer> &,Topology::PeerVerifyResult),void *arg)
SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer)
{
if (candidate->address() != _r->identity.address()) {
Mutex::Lock _l(_peerDeepVerifyJobs_m);
_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
_PeerDeepVerifyJob &job = _peerDeepVerifyJobs.back();
job.callback = callback;
job.arg = arg;
job.candidate = candidate;
job.type = _PeerDeepVerifyJob::VERIFY_PEER;
_peerDeepVerifyJobs_c.signal();
} else {
TRACE("BUG: addPeer() caught and ignored attempt to add peer for self");
if (callback)
callback(arg,candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED);
if (peer->address() == _r->identity.address()) {
TRACE("BUG: addNewPeer() caught and ignored attempt to add peer for self");
throw std::logic_error("cannot add peer for self");
}
SharedPtr<Peer> actualPeer;
{
Mutex::Lock _l(_activePeers_m);
actualPeer = _activePeers.insert(std::pair< Address,SharedPtr<Peer> >(peer->address(),peer)).first->second;
}
uint64_t atmp[ZT_ADDRESS_LENGTH];
actualPeer->address().copyTo(atmp,ZT_ADDRESS_LENGTH);
Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
actualPeer->serialize(b);
b.zeroUnused();
_dbm_m.lock();
if (KISSDB_put(&_dbm,atmp,b.data())) {
TRACE("error writing %s to peerdb",actualPeer->address().toString().c_str());
} else actualPeer->getAndResetDirty();
_dbm_m.unlock();
return actualPeer;
}
SharedPtr<Peer> Topology::getPeer(const Address &zta)
@ -212,143 +211,29 @@ skip_and_try_next_supernode:
void Topology::clean()
{
{
Mutex::Lock _l(_peerDeepVerifyJobs_m);
_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
_peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE;
}
_peerDeepVerifyJobs_c.signal();
}
TRACE("cleaning caches and flushing modified peers to disk...");
void Topology::threadMain()
throw()
{
for(;;) {
_peerDeepVerifyJobs_m.lock();
if (_peerDeepVerifyJobs.empty()) {
_peerDeepVerifyJobs_m.unlock();
_peerDeepVerifyJobs_c.wait();
continue;
}
_PeerDeepVerifyJob job(_peerDeepVerifyJobs.front());
_peerDeepVerifyJobs.pop_front();
unsigned long queueRemaining = (unsigned long)_peerDeepVerifyJobs.size();
_peerDeepVerifyJobs_m.unlock();
Mutex::Lock _l(_activePeers_m);
for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) {
if (p->second->getAndResetDirty()) {
try {
uint64_t atmp[ZT_ADDRESS_LENGTH];
p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH);
switch(job.type) {
case _PeerDeepVerifyJob::VERIFY_PEER:
/* TODO: We should really verify peers every time completely if this
* is a supernode, perhaps deferring the expensive part for new
* addresses. An attempt at claim jumping should also trigger a
* short duration ban of the originating IP address in most cases,
* since this means either malicious intent or broken software. */
TRACE("verifying peer: %s",job.candidate->identity().address().toString().c_str());
Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
p->second->serialize(b);
b.zeroUnused();
if ((job.candidate->identity())&&(!job.candidate->identity().address().isReserved())&&(job.candidate->identity().locallyValidate())) {
// Peer passes sniff test, so check to see if we've already got
// one with the same address.
SharedPtr<Peer> existingPeer(getPeer(job.candidate->identity().address()));
if (existingPeer) {
if (existingPeer->identity() == job.candidate->identity()) {
// It's an *exact* duplicate, so return the existing peer
if (job.callback)
job.callback(job.arg,existingPeer,PEER_VERIFY_ACCEPTED_ALREADY_HAVE);
} else if (queueRemaining > 3) {
/* Prevents a CPU hog DOS attack, while allowing a very unlikely kind of
* DOS attack where someone knows someone else's address prior to their
* registering it and claim-jumps them and then floods with bad identities
* to hold their claim. Of the two, the latter would be infeasable
* without already having cracked the target's machine in which case
* the attacker has their private key anyway and can really steal their
* identity. So why bother.*/
TRACE("%s is duplicate, load too high, old won",job.candidate->identity().address().toString().c_str());
if (job.callback)
job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED);
} else {
// It's different so deeply validate it first, then the
// existing claimant, and toss the imposter. If both verify, the
// one we already have wins.
if (!job.candidate->identity().locallyValidate()) {
LOG("Topology: IMPOSTER %s rejected",job.candidate->identity().address().toString().c_str());
if (job.callback)
job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY);
} else if (!existingPeer->identity().locallyValidate()) {
LOG("Topology: previous IMPOSTER %s displaced by valid identity!",job.candidate->identity().address().toString().c_str());
_reallyAddPeer(job.candidate);
if (job.callback)
job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS);
} else {
LOG("Topology: tie between apparently valid claims on %s, oldest won",job.candidate->identity().address().toString().c_str());
if (job.callback)
job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE);
}
}
} else {
TRACE("%s accepted as new",job.candidate->identity().address().toString().c_str());
_reallyAddPeer(job.candidate);
if (job.callback)
job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_NEW);
}
} else {
TRACE("%s rejected, identity failed initial checks",job.candidate->identity().address().toString().c_str());
if (job.callback)
job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY);
_dbm_m.lock();
if (KISSDB_put(&_dbm,atmp,b.data())) {
TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str());
}
break;
case _PeerDeepVerifyJob::CLEAN_CACHE:
TRACE("cleaning caches and flushing modified peers to disk...");
{
Mutex::Lock _l(_activePeers_m);
for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) {
if (p->second->getAndResetDirty()) {
try {
uint64_t atmp[ZT_ADDRESS_LENGTH];
p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH);
Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
p->second->serialize(b);
b.zeroUnused();
_dbm_m.lock();
if (KISSDB_put(&_dbm,atmp,b.data())) {
TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str());
}
_dbm_m.unlock();
} catch ( ... ) {
TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str());
}
}
}
}
break;
case _PeerDeepVerifyJob::EXIT_THREAD:
TRACE("thread terminating...");
return;
_dbm_m.unlock();
} catch ( ... ) {
TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str());
}
}
}
}
void Topology::_reallyAddPeer(const SharedPtr<Peer> &p)
{
{
Mutex::Lock _l(_activePeers_m);
_activePeers[p->identity().address()] = p;
}
try {
uint64_t atmp[ZT_ADDRESS_LENGTH];
p->address().copyTo(atmp,ZT_ADDRESS_LENGTH);
Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
p->serialize(b);
b.zeroUnused();
_dbm_m.lock();
if (KISSDB_put(&_dbm,atmp,b.data())) {
TRACE("error writing %s to peerdb",p->address().toString().c_str());
} else p->getAndResetDirty();
_dbm_m.unlock();
} catch ( ... ) {
TRACE("unexpected exception flushing to peerdb");
}
}
} // namespace ZeroTier

View File

@ -33,18 +33,14 @@
#include <map>
#include <set>
#include <list>
#include <vector>
#include <stdexcept>
#include "Constants.hpp"
#include "Address.hpp"
#include "Peer.hpp"
#include "Mutex.hpp"
#include "Condition.hpp"
#include "InetAddress.hpp"
#include "Constants.hpp"
#include "Thread.hpp"
#include "MulticastGroup.hpp"
#include "Utils.hpp"
#include "../ext/kissdb/kissdb.h"
@ -59,19 +55,6 @@ class RuntimeEnvironment;
class Topology
{
public:
/**
* Result of peer add/verify
*/
enum PeerVerifyResult
{
PEER_VERIFY_ACCEPTED_NEW, /* new peer */
PEER_VERIFY_ACCEPTED_ALREADY_HAVE, /* we already knew ye */
PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS, /* you booted out an impostor */
PEER_VERIFY_REJECTED_INVALID_IDENTITY, /* identity is invalid or validation failed */
PEER_VERIFY_REJECTED_DUPLICATE, /* someone equally valid already has your address */
PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED /* you look duplicate and I'm too busy to deep verify */
};
Topology(const RuntimeEnvironment *renv,const char *dbpath)
throw(std::runtime_error);
@ -85,22 +68,15 @@ public:
void setSupernodes(const std::map< Identity,std::vector<InetAddress> > &sn);
/**
* Add a peer to this network
*
* Verification and adding actually occurs in the background, since in
* rare cases it can be somewhat CPU-intensive. The callback will be
* called (from the background thread) when add is complete.
*
* The peer given to the callback may not be the same object provided
* as a candidate if the candidate was an exact duplicate of a peer we
* already have.
* Add a peer to database
*
* @param candidate New candidate peer to be added
* @param callback Callback to call when peer verification is complete
* @param arg First argument to callback
* @return Verification result or PEER_VERIFY__IN_PROGRESS if occurring in background
* This will not replace existing peers. In that case the existing peer
* record is returned.
*
* @param peer Peer to add
* @return New or existing peer (should replace 'peer')
*/
void addPeer(const SharedPtr<Peer> &candidate,void (*callback)(void *,const SharedPtr<Peer> &,PeerVerifyResult),void *arg);
SharedPtr<Peer> addPeer(const SharedPtr<Peer> &peer);
/**
* Get a peer from its address
@ -169,7 +145,7 @@ public:
inline bool amSupernode() const { return _amSupernode; }
/**
* Clean and flush database now (runs in the background)
* Clean and flush database
*/
void clean();
@ -296,38 +272,12 @@ public:
std::vector< SharedPtr<Peer> > &_v;
};
/**
* Thread main method; do not call elsewhere
*/
void threadMain()
throw();
private:
void _reallyAddPeer(const SharedPtr<Peer> &p);
// A job for the background deep verify thread (also does cache cleaning, flushing, etc.)
struct _PeerDeepVerifyJob
{
void (*callback)(void *,const SharedPtr<Peer> &,Topology::PeerVerifyResult);
void *arg;
SharedPtr<Peer> candidate;
enum {
VERIFY_PEER,
CLEAN_CACHE,
EXIT_THREAD
} type;
};
const RuntimeEnvironment *const _r;
Thread _thread;
std::map< Address,SharedPtr<Peer> > _activePeers;
Mutex _activePeers_m;
std::list< _PeerDeepVerifyJob > _peerDeepVerifyJobs;
Mutex _peerDeepVerifyJobs_m;
Condition _peerDeepVerifyJobs_c;
std::map< Identity,std::vector<InetAddress> > _supernodes;
std::set< Address > _supernodeAddresses;
std::vector< SharedPtr<Peer> > _supernodePeers;