diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 7244c9517..a5c30e0fc 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -105,9 +105,6 @@ Cluster::~Cluster()
Utils::burn(_masterSecret,sizeof(_masterSecret));
Utils::burn(_key,sizeof(_key));
delete [] _members;
-
- for(std::multimap
::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();)
- delete qi->second;
}
void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
@@ -223,24 +220,19 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
_remotePeers[std::pair(id.address(),(unsigned int)fromMemberId)] = RR->node->now();
}
- std::pair q[ZT_CLUSTER_MAX_QUEUE_PER_SENDER];
- unsigned int qc = 0;
+ std::list<_SQE> q;
{
Mutex::Lock _l(_sendViaClusterQueue_m);
- std::pair< std::multimap::iterator,std::multimap::iterator > er(_sendViaClusterQueue.equal_range(id.address()));
- for(std::multimap::iterator qi(er.first);qi!=er.second;) {
- if (qc >= ZT_CLUSTER_MAX_QUEUE_PER_SENDER) // sanity check
- break;
- q[qc++] = *qi;
- _sendViaClusterQueue.erase(qi++);
+ std::map< Address,std::list<_SQE> >::iterator qe(_sendViaClusterQueue.find(id.address()));
+ if (qe != _sendViaClusterQueue.end()) {
+ q.swap(qe->second); // just swap ptr instead of copying
+ _sendViaClusterQueue.erase(qe);
}
}
- for(unsigned int i=0;isendViaCluster(q[i].first,q[i].second->toPeerAddress,q[i].second->data,q[i].second->len,q[i].second->unite);
- delete q[i].second;
- }
+ for(std::list<_SQE>::iterator qi(q.begin());qi!=q.end();++qi)
+ this->sendViaCluster(id.address(),qi->toPeerAddress,qi->data,qi->len,qi->unite);
- TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
+ TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),(unsigned int)q.size());
}
} break;
@@ -260,7 +252,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
const unsigned int plen = dmsg.at(ptr); ptr += 2;
if (plen) {
Packet remotep(dmsg.field(ptr,plen),plen); ptr += plen;
- TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen);
+ //TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen);
switch(remotep.verb()) {
case Packet::VERB_WHOIS: _doREMOTE_WHOIS(fromMemberId,remotep); break;
case Packet::VERB_MULTICAST_GATHER: _doREMOTE_MULTICAST_GATHER(fromMemberId,remotep); break;
@@ -377,10 +369,16 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
return;
_sendViaClusterQueue_m.lock();
- const unsigned long queueCount = _sendViaClusterQueue.count(fromPeerAddress);
+ unsigned long queueCount;
+ {
+ std::map< Address,std::list<_SQE> >::const_iterator qe(_sendViaClusterQueue.find(fromPeerAddress));
+ queueCount = (qe == _sendViaClusterQueue.end()) ? 0 : (unsigned long)qe->second.size();
+ }
_sendViaClusterQueue_m.unlock();
- if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER)
+ if (queueCount > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
+ TRACE("dropping sendViaCluster for %s -> %s since queue for sender is full",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
return;
+ }
const uint64_t now = RR->node->now();
@@ -420,8 +418,9 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
// If there isn't a good place to send via, then enqueue this for retrying
// later and return after having broadcasted a WANT_PEER.
if (enqueueAndWait) {
+ TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
Mutex::Lock _l(_sendViaClusterQueue_m);
- _sendViaClusterQueue.insert(std::pair(fromPeerAddress,new _SQE(now,toPeerAddress,data,len,unite)));
+ _sendViaClusterQueue[fromPeerAddress].push_back(_SQE(now,toPeerAddress,data,len,unite));
return;
}
}
@@ -456,11 +455,11 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
_send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
_flush(mostRecentMemberId);
}
- if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0)
+ if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) {
+ TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len);
+ }
}
-
- TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
}
void Cluster::sendDistributedQuery(const Packet &pkt)
@@ -485,11 +484,15 @@ void Cluster::doPeriodicTasks()
{
Mutex::Lock _l2(_sendViaClusterQueue_m);
- for(std::multimap::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
- if ((now - qi->second->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) {
- delete qi->second;
+ for(std::map< Address,std::list<_SQE> >::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) {
+ for(std::list<_SQE>::iterator qii(qi->second.begin());qii!=qi->second.end();) {
+ if ((now - qii->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION)
+ qi->second.erase(qii++);
+ else ++qii;
+ }
+ if (qi->second.empty())
_sendViaClusterQueue.erase(qi++);
- } else ++qi;
+ else ++qi;
}
}
@@ -752,9 +755,10 @@ void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
routp.append((uint8_t)Packet::VERB_WHOIS);
routp.append(remotep.packetId());
queried.serialize(routp);
-
- Mutex::Lock _l2(_members[fromMemberId].lock);
routp.setAt(ZT_ADDRESS_LENGTH + 1,(uint16_t)(routp.size() - ZT_ADDRESS_LENGTH - 3));
+
+ TRACE("responding to remote WHOIS from %s @ %u with identity of %s",remotep.source().toString().c_str(),(unsigned int)fromMemberId,queried.address().toString().c_str());
+ Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
_flush(fromMemberId);
}
@@ -783,6 +787,8 @@ void Cluster::_doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &rem
gatherLimit = ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 80) / 5);
if (RR->mc->gather(remotePeerAddress,nwid,mg,routp,gatherLimit)) {
routp.setAt(ZT_ADDRESS_LENGTH + 1,(uint16_t)(routp.size() - ZT_ADDRESS_LENGTH - 3));
+
+ TRACE("responding to remote MULTICAST_GATHER from %s @ %u with %u bytes",remotePeerAddress.toString().c_str(),(unsigned int)fromMemberId,routp.size());
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
}
diff --git a/node/Cluster.hpp b/node/Cluster.hpp
index beb5ecdc4..63196928d 100644
--- a/node/Cluster.hpp
+++ b/node/Cluster.hpp
@@ -34,6 +34,7 @@
#include
#include