From 86762d2b409afbfd57a74e6c8a8c7e5f0a9880ae Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 28 Aug 2019 15:52:18 -0700 Subject: [PATCH] high throughput root is working! --- root/root.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/root/root.cpp b/root/root.cpp index eb327173c..ace203c77 100644 --- a/root/root.cpp +++ b/root/root.cpp @@ -80,6 +80,7 @@ struct PeerInfo uint8_t key[32]; InetAddress ip4,ip6; int64_t lastReceive; + int64_t lastSync; AtomicCounter __refCount; @@ -89,13 +90,11 @@ struct PeerInfo static Identity self; static std::atomic_bool run; -static std::vector< SharedPtr > newPeers; static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > multicastSubscriptions; static std::unordered_map< Identity,SharedPtr,IdentityHasher > peersByIdentity; static std::unordered_map< Address,std::set< SharedPtr >,AddressHasher > peersByVirtAddr; static std::unordered_map< InetAddress,std::set< SharedPtr >,InetAddressHasher > peersByPhysAddr; -static std::mutex newPeers_l; static std::mutex multicastSubscriptions_l; static std::mutex peersByIdentity_l; static std::mutex peersByVirtAddr_l; @@ -134,10 +133,7 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt) if (self.agree(id,peer->key)) { if (pkt.dearmor(peer->key)) { peer->id = id; - { - std::lock_guard np_l(newPeers_l); - newPeers.push_back(peer); - } + peer->lastSync = 0; { std::lock_guard pbi_l(peersByIdentity_l); peersByIdentity.emplace(id,peer); @@ -476,6 +472,7 @@ int main(int argc,char **argv) } int64_t lastCleanedMulticastSubscriptions = 0; + int64_t lastCleanedPeers = 0; while (run) { peersByIdentity_l.lock(); peersByPhysAddr_l.lock(); @@ -485,8 +482,10 @@ int main(int argc,char **argv) sleep(1); const int64_t now = OSUtils::now(); + if ((now - lastCleanedMulticastSubscriptions) > 120000) { lastCleanedMulticastSubscriptions = now; + std::lock_guard l(multicastSubscriptions_l); for(auto a=multicastSubscriptions.begin();a!=multicastSubscriptions.end();) { for(auto b=a->second.begin();b!=a->second.end();) { @@ -504,6 +503,44 @@ int main(int argc,char **argv) else ++a; } } + + if ((now - lastCleanedPeers) > 120000) { + lastCleanedPeers = now; + + std::lock_guard pbi_l(peersByIdentity_l); + for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();) { + if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) { + std::lock_guard pbv_l(peersByVirtAddr_l); + std::lock_guard pbp_l(peersByPhysAddr_l); + + auto pbv = peersByVirtAddr.find(p->second->id.address()); + if (pbv != peersByVirtAddr.end()) { + pbv->second.erase(p->second); + if (pbv->second.empty()) + peersByVirtAddr.erase(pbv); + } + + if (p->second->ip4) { + auto pbp = peersByPhysAddr.find(p->second->ip4); + if (pbp != peersByPhysAddr.end()) { + pbp->second.erase(p->second); + if (pbp->second.empty()) + peersByPhysAddr.erase(pbp); + } + } + if (p->second->ip6) { + auto pbp = peersByPhysAddr.find(p->second->ip6); + if (pbp != peersByPhysAddr.end()) { + pbp->second.erase(p->second); + if (pbp->second.empty()) + peersByPhysAddr.erase(pbp); + } + } + + peersByIdentity.erase(p++); + } else ++p; + } + } } for(auto s=sockets.begin();s!=sockets.end();++s) {