Replace slow std::list<> with an O(log(N)) data structure for the cluster relaying send queue.

This commit is contained in:
Adam Ierymenko 2015-11-10 09:05:01 -08:00
parent 32ec378e3b
commit b171c9a0db
2 changed files with 191 additions and 75 deletions

View File

@ -33,8 +33,12 @@
#include <string.h>
#include <math.h>
#include <map>
#include <algorithm>
#include <set>
#include <utility>
#include <list>
#include <stdexcept>
#include "../version.h"
@ -49,6 +53,34 @@
#include "Packet.hpp"
#include "Switch.hpp"
#include "Node.hpp"
#include "Array.hpp"
/**
* Chunk size for allocating queue entries
*
* Queue entries are allocated in chunks of this many and are added to a pool.
* ZT_CLUSTER_MAX_QUEUE_GLOBAL must be evenly divisible by this.
*/
#define ZT_CLUSTER_QUEUE_CHUNK_SIZE 32
/**
* Maximum number of chunks to ever allocate
*
* This is a global sanity limit to prevent resource exhaustion attacks. It
* works out to about 600mb of RAM. You'll never see this on a normal edge
* node. We're unlikely to see this on a root server unless someone is DOSing
* us. In that case cluster relaying will be affected but other functions
* should continue to operate normally.
*/
#define ZT_CLUSTER_MAX_QUEUE_CHUNKS 8194
/**
* Max data per queue entry
*
* If we ever support larger transport MTUs this must be increased. The plus
* 16 is just a small margin and has no special meaning.
*/
#define ZT_CLUSTER_SEND_QUEUE_DATA_MAX (ZT_UDP_DEFAULT_PAYLOAD_MTU + 16)
namespace ZeroTier {
@ -61,6 +93,137 @@ static inline double _dist3d(int x1,int y1,int z1,int x2,int y2,int z2)
return sqrt((dx * dx) + (dy * dy) + (dz * dz));
}
// An entry in _ClusterSendQueue
struct _ClusterSendQueueEntry
{
uint64_t timestamp;
Address fromPeerAddress;
Address toPeerAddress;
// if we ever support larger transport MTUs this must be increased
unsigned char data[ZT_CLUSTER_SEND_QUEUE_DATA_MAX];
unsigned int len;
bool unite;
};
// A multi-index map with entry memory pooling -- this allows our queue to
// be O(log(N)) and is complex enough that it makes the code a lot cleaner
// to break it out from Cluster.
class _ClusterSendQueue
{
public:
_ClusterSendQueue() :
_poolCount(0)
{
}
~_ClusterSendQueue() {} // memory is automatically freed when _chunks is destroyed
inline void enqueue(uint64_t ts,const Address &from,const Address &to,const void *data,unsigned int len,bool unite)
{
if (len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX)
return;
Mutex::Lock _l(_lock);
// Delete oldest queue entry if sender has too many queued packets
{
std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(from,(_ClusterSendQueueEntry *)0)));
std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator oldest(_bySrc.end());
unsigned long countForSender = 0;
while ((qi != _bySrc.end())&&(qi->first == from)) {
if (++countForSender > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
_byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(oldest->second->toPeerAddress,oldest->second));
_pool[_poolCount++] = oldest->second;
_bySrc.erase(oldest);
break;
} else if (oldest == _bySrc.end())
oldest = qi;
++qi;
}
}
_ClusterSendQueueEntry *e;
if (_poolCount > 0) {
e = _pool[--_poolCount];
} else {
if (_chunks.size() >= ZT_CLUSTER_MAX_QUEUE_CHUNKS)
return; // queue is totally full!
_chunks.push_back(Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE>());
e = &(_chunks.back().data[0]);
for(unsigned int i=1;i<ZT_CLUSTER_QUEUE_CHUNK_SIZE;++i)
_pool[_poolCount++] = &(_chunks.back().data[i]);
}
e->timestamp = ts;
e->fromPeerAddress = from;
e->toPeerAddress = to;
memcpy(e->data,data,len);
e->len = len;
e->unite = unite;
_bySrc.insert(std::pair<Address,_ClusterSendQueueEntry *>(from,e));
_byDest.insert(std::pair<Address,_ClusterSendQueueEntry *>(to,e));
}
inline void expire(uint64_t now)
{
Mutex::Lock _l(_lock);
for(std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.begin());qi!=_bySrc.end();) {
if ((now - qi->second->timestamp) > ZT_CLUSTER_QUEUE_EXPIRATION) {
_byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->toPeerAddress,qi->second));
_pool[_poolCount++] = qi->second;
_bySrc.erase(qi++);
} else ++qi;
}
}
/**
* Get and dequeue entries for a given destination address
*
* After use these entries must be returned with returnToPool()!
*
* @param dest Destination address
* @param results Array to fill with results
* @param maxResults Size of results[] in pointers
* @return Number of actual results returned
*/
inline unsigned int getByDest(const Address &dest,_ClusterSendQueueEntry **results,unsigned int maxResults)
{
unsigned int count = 0;
Mutex::Lock _l(_lock);
std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_byDest.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(dest,(_ClusterSendQueueEntry *)0)));
while ((qi != _byDest.end())&&(qi->first == dest)) {
_bySrc.erase(std::pair<Address,_ClusterSendQueueEntry *>(qi->second->fromPeerAddress,qi->second));
results[count++] = qi->second;
if (count == maxResults)
break;
_byDest.erase(qi++);
}
return count;
}
/**
* Return entries to pool after use
*
* @param entries Array of entries
* @param count Number of entries
*/
inline void returnToPool(_ClusterSendQueueEntry **entries,unsigned int count)
{
Mutex::Lock _l(_lock);
for(unsigned int i=0;i<count;++i)
_pool[_poolCount++] = entries[i];
}
private:
std::list< Array<_ClusterSendQueueEntry,ZT_CLUSTER_QUEUE_CHUNK_SIZE> > _chunks;
_ClusterSendQueueEntry *_pool[ZT_CLUSTER_QUEUE_CHUNK_SIZE * ZT_CLUSTER_MAX_QUEUE_CHUNKS];
unsigned long _poolCount;
std::set< std::pair<Address,_ClusterSendQueueEntry *> > _bySrc;
std::set< std::pair<Address,_ClusterSendQueueEntry *> > _byDest;
Mutex _lock;
};
Cluster::Cluster(
const RuntimeEnvironment *renv,
uint16_t id,
@ -73,6 +236,7 @@ Cluster::Cluster(
int (*addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *),
void *addressToLocationFunctionArg) :
RR(renv),
_sendQueue(new _ClusterSendQueue()),
_sendFunction(sendFunction),
_sendFunctionArg(sendFunctionArg),
_addressToLocationFunction(addressToLocationFunction),
@ -84,7 +248,8 @@ Cluster::Cluster(
_zeroTierPhysicalEndpoints(zeroTierPhysicalEndpoints),
_members(new _Member[ZT_CLUSTER_MAX_MEMBERS]),
_lastFlushed(0),
_lastCleanedRemotePeers(0)
_lastCleanedRemotePeers(0),
_lastCleanedQueue(0)
{
uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
@ -105,6 +270,7 @@ Cluster::~Cluster()
Utils::burn(_masterSecret,sizeof(_masterSecret));
Utils::burn(_key,sizeof(_key));
delete [] _members;
delete _sendQueue;
}
void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
@ -220,19 +386,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
_remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now();
}
std::list<_SQE> q;
{
Mutex::Lock _l(_sendViaClusterQueue_m);
for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
if (qi->toPeerAddress == id.address())
q.splice(q.end(),_sendViaClusterQueue,qi++);
else ++qi;
}
}
for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi)
this->sendViaCluster(id.address(),qi->toPeerAddress,qi->data,qi->len,qi->unite);
_ClusterSendQueueEntry *q[16384]; // 16384 is "tons"
unsigned int qc = _sendQueue->getByDest(id.address(),q,16384);
for(unsigned int i=0;i<qc;++i)
this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
_sendQueue->returnToPool(q,qc);
TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),(unsigned int)q.size());
TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc);
}
} break;
@ -244,7 +404,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
peer->identity().serialize(buf);
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size());
_flush(fromMemberId);
}
} break;
@ -333,7 +492,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
{
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
_flush(fromMemberId);
}
RR->sw->send(rendezvousForLocal,true,0);
}
@ -379,19 +537,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
return;
unsigned int queueCount = 0;
{
Mutex::Lock _l(_sendViaClusterQueue_m);
for(std::list<_SQE>::const_iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();++qi) {
if (qi->fromPeerAddress == fromPeerAddress) {
if (++queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
return;
}
}
}
}
const uint64_t now = RR->node->now();
uint64_t mostRecentTs = 0;
@ -423,8 +568,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
if ((enqueueAndWait)&&(queueCount == 0))
_flush(*mid);
}
}
@ -432,8 +575,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
// later and return after having broadcasted a WANT_PEER.
if (enqueueAndWait) {
TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
Mutex::Lock _l(_sendViaClusterQueue_m);
_sendViaClusterQueue.push_back(_SQE(now,fromPeerAddress,toPeerAddress,data,len,unite));
_sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite);
return;
}
}
@ -464,10 +606,8 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
{
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
if (buf.size() > 0) {
if (buf.size() > 0)
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
_flush(mostRecentMemberId);
}
if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) {
TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len);
@ -484,7 +624,6 @@ void Cluster::sendDistributedQuery(const Packet &pkt)
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size());
_flush(*mid);
}
}
@ -495,15 +634,6 @@ void Cluster::doPeriodicTasks()
if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) {
_lastFlushed = now;
{
Mutex::Lock _l2(_sendViaClusterQueue_m);
for(std::list<_SQE>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
if ((now - qi->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION)
_sendViaClusterQueue.erase(qi++);
else ++qi;
}
}
Mutex::Lock _l(_memberIds_m);
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
@ -549,6 +679,11 @@ void Cluster::doPeriodicTasks()
else ++rp;
}
}
if ((now - _lastCleanedQueue) >= ZT_CLUSTER_QUEUE_EXPIRATION) {
_lastCleanedQueue = now;
_sendQueue->expire(now);
}
}
void Cluster::addMember(uint16_t memberId)
@ -768,7 +903,6 @@ void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
TRACE("responding to remote WHOIS from %s @ %u with identity of %s",remotep.source().toString().c_str(),(unsigned int)fromMemberId,queried.address().toString().c_str());
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
_flush(fromMemberId);
}
}
}

View File

@ -30,16 +30,11 @@
#ifdef ZT_ENABLE_CLUSTER
#include <vector>
#include <algorithm>
#include <map>
#include <utility>
#include <list>
#include "Constants.hpp"
#include "../include/ZeroTierOne.h"
#include "Address.hpp"
#include "Array.hpp"
#include "InetAddress.hpp"
#include "SHA512.hpp"
#include "Utils.hpp"
@ -48,6 +43,7 @@
#include "SharedPtr.hpp"
#include "Hashtable.hpp"
#include "Packet.hpp"
#include "SharedPtr.hpp"
/**
* Timeout for cluster members being considered "alive"
@ -60,12 +56,12 @@
/**
* Desired period between doPeriodicTasks() in milliseconds
*/
#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100
#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 25
/**
* How often to flush outgoing message queues (maximum interval)
*/
#define ZT_CLUSTER_FLUSH_PERIOD 250
#define ZT_CLUSTER_FLUSH_PERIOD 100
/**
* Maximum number of queued outgoing packets per sender address
@ -75,7 +71,7 @@
/**
* Expiration time for send queue entries
*/
#define ZT_CLUSTER_QUEUE_EXPIRATION 500
#define ZT_CLUSTER_QUEUE_EXPIRATION 1500
namespace ZeroTier {
@ -84,6 +80,9 @@ class MulticastGroup;
class Peer;
class Identity;
// Internal class implemented inside Cluster.cpp
class _ClusterSendQueue;
/**
* Multi-homing cluster state replication and packet relaying
*
@ -323,10 +322,11 @@ private:
void _doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep);
void _doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &remotep);
// These are initialized in the constructor and remain immutable
// These are initialized in the constructor and remain immutable ------------
uint16_t _masterSecret[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
unsigned char _key[ZT_PEER_SECRET_KEY_LENGTH];
const RuntimeEnvironment *RR;
_ClusterSendQueue *const _sendQueue;
void (*_sendFunction)(void *,unsigned int,const void *,unsigned int);
void *_sendFunctionArg;
int (*_addressToLocationFunction)(void *,const struct sockaddr_storage *,int *,int *,int *);
@ -336,7 +336,7 @@ private:
const int32_t _z;
const uint16_t _id;
const std::vector<InetAddress> _zeroTierPhysicalEndpoints;
// end immutable fields
// end immutable fields -----------------------------------------------------
struct _Member
{
@ -379,27 +379,9 @@ private:
std::map< std::pair<Address,unsigned int>,uint64_t > _remotePeers; // we need ordered behavior and lower_bound here
Mutex _remotePeers_m;
struct _SQE
{
_SQE() : timestamp(0),len(0),unite(false) {}
_SQE(const uint64_t ts,const Address &f,const Address &t,const void *d,const unsigned int l,const bool u) :
timestamp(ts),
fromPeerAddress(f),
toPeerAddress(t),
len(l),
unite(u) { memcpy(data,d,l); }
uint64_t timestamp;
Address fromPeerAddress;
Address toPeerAddress;
unsigned int len;
bool unite;
unsigned char data[ZT_PROTO_MAX_PACKET_LENGTH];
};
std::list<_SQE> _sendViaClusterQueue;
Mutex _sendViaClusterQueue_m;
uint64_t _lastFlushed;
uint64_t _lastCleanedRemotePeers;
uint64_t _lastCleanedQueue;
};
} // namespace ZeroTier