This commit is contained in:
Adam Ierymenko 2015-11-09 11:08:52 -08:00
parent b57c855a8c
commit 4ad2ff2f71
3 changed files with 39 additions and 32 deletions

View File

@ -105,9 +105,6 @@ Cluster::~Cluster()
Utils::burn(_masterSecret,sizeof(_masterSecret));
Utils::burn(_key,sizeof(_key));
delete [] _members;
for(std::multimap<Address,_SQE *>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();)
delete qi->second;
}
void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
@ -223,24 +220,19 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
_remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now();
}
std::pair<Address,_SQE *> q[ZT_CLUSTER_MAX_QUEUE_PER_SENDER];
unsigned int qc = 0;
std::list<_SQE> q;
{
Mutex::Lock _l(_sendViaClusterQueue_m);
std::pair< std::multimap<Address,_SQE *>::iterator,std::multimap<Address,_SQE *>::iterator > er(_sendViaClusterQueue.equal_range(id.address()));
for(std::multimap<Address,_SQE *>::iterator qi(er.first);qi!=er.second;) {
if (qc >= ZT_CLUSTER_MAX_QUEUE_PER_SENDER) // sanity check
break;
q[qc++] = *qi;
_sendViaClusterQueue.erase(qi++);
std::map< Address,std::list<_SQE> >::iterator qe(_sendViaClusterQueue.find(id.address()));
if (qe != _sendViaClusterQueue.end()) {
q.swap(qe->second); // just swap ptr instead of copying
_sendViaClusterQueue.erase(qe);
}
}
for(unsigned int i=0;i<qc;++i) {
this->sendViaCluster(q[i].first,q[i].second->toPeerAddress,q[i].second->data,q[i].second->len,q[i].second->unite);
delete q[i].second;
}
for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi)
this->sendViaCluster(id.address(),qi->toPeerAddress,qi->data,qi->len,qi->unite);
TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),(unsigned int)q.size());
}
} break;
@ -260,7 +252,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
const unsigned int plen = dmsg.at<uint16_t>(ptr); ptr += 2;
if (plen) {
Packet remotep(dmsg.field(ptr,plen),plen); ptr += plen;
TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen);
//TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen);
switch(remotep.verb()) {
case Packet::VERB_WHOIS: _doREMOTE_WHOIS(fromMemberId,remotep); break;
case Packet::VERB_MULTICAST_GATHER: _doREMOTE_MULTICAST_GATHER(fromMemberId,remotep); break;
@ -377,10 +369,16 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
return;
_sendViaClusterQueue_m.lock();
const unsigned long queueCount = _sendViaClusterQueue.count(fromPeerAddress);
unsigned long queueCount;
{
std::map< Address,std::list<_SQE> >::const_iterator qe(_sendViaClusterQueue.find(fromPeerAddress));
queueCount = (qe == _sendViaClusterQueue.end()) ? 0 : (unsigned long)qe->second.size();
}
_sendViaClusterQueue_m.unlock();
if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER)
if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
return;
}
const uint64_t now = RR->node->now();
@ -420,8 +418,9 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
// If there isn't a good place to send via, then enqueue this for retrying
// later and return after having broadcasted a WANT_PEER.
if (enqueueAndWait) {
TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
Mutex::Lock _l(_sendViaClusterQueue_m);
_sendViaClusterQueue.insert(std::pair<Address,_SQE *>(fromPeerAddress,new _SQE(now,toPeerAddress,data,len,unite)));
_sendViaClusterQueue[fromPeerAddress].push_back(_SQE(now,toPeerAddress,data,len,unite));
return;
}
}
@ -456,11 +455,11 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
_flush(mostRecentMemberId);
}
if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0)
if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) {
TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len);
}
}
TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
}
void Cluster::sendDistributedQuery(const Packet &pkt)
@ -485,11 +484,15 @@ void Cluster::doPeriodicTasks()
{
Mutex::Lock _l2(_sendViaClusterQueue_m);
for(std::multimap<Address,_SQE *>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
if ((now - qi->second->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) {
delete qi->second;
for(std::map< Address,std::list<_SQE> >::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
for(std::list<_SQE>::iterator qii(qi->second.begin());qii!=qi->second.end();) {
if ((now - qii->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION)
qi->second.erase(qii++);
else ++qii;
}
if (qi->second.empty())
_sendViaClusterQueue.erase(qi++);
} else ++qi;
else ++qi;
}
}
@ -752,9 +755,10 @@ void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
routp.append((uint8_t)Packet::VERB_WHOIS);
routp.append(remotep.packetId());
queried.serialize(routp);
Mutex::Lock _l2(_members[fromMemberId].lock);
routp.setAt<uint16_t>(ZT_ADDRESS_LENGTH + 1,(uint16_t)(routp.size() - ZT_ADDRESS_LENGTH - 3));
TRACE("responding to remote WHOIS from %s @ %u with identity of %s",remotep.source().toString().c_str(),(unsigned int)fromMemberId,queried.address().toString().c_str());
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
_flush(fromMemberId);
}
@ -783,6 +787,8 @@ void Cluster::_doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &rem
gatherLimit = ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 80) / 5);
if (RR->mc->gather(remotePeerAddress,nwid,mg,routp,gatherLimit)) {
routp.setAt<uint16_t>(ZT_ADDRESS_LENGTH + 1,(uint16_t)(routp.size() - ZT_ADDRESS_LENGTH - 3));
TRACE("responding to remote MULTICAST_GATHER from %s @ %u with %u bytes",remotePeerAddress.toString().c_str(),(unsigned int)fromMemberId,routp.size());
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
}

View File

@ -34,6 +34,7 @@
#include <algorithm>
#include <map>
#include <utility>
#include <list>
#include "Constants.hpp"
#include "../include/ZeroTierOne.h"
@ -63,7 +64,7 @@
/**
* How often to flush outgoing message queues (maximum interval)
*/
#define ZT_CLUSTER_FLUSH_PERIOD 300
#define ZT_CLUSTER_FLUSH_PERIOD 250
/**
* Maximum number of queued outgoing packets per sender address
@ -73,7 +74,7 @@
/**
* Expiration time for send queue entries
*/
#define ZT_CLUSTER_QUEUE_EXPIRATION 2500
#define ZT_CLUSTER_QUEUE_EXPIRATION 1500
namespace ZeroTier {
@ -382,7 +383,7 @@ private:
bool unite;
unsigned char data[ZT_PROTO_MAX_PACKET_LENGTH];
};
std::multimap<Address,_SQE *> _sendViaClusterQueue; // queue by from peer address
std::map< Address,std::list<_SQE> > _sendViaClusterQueue;
Mutex _sendViaClusterQueue_m;
uint64_t _lastFlushed;

View File