Clustered handling of relaying.

This commit is contained in:
Adam Ierymenko
2015-10-20 17:22:53 -07:00
parent 35a12b94ea
commit d6dee7bb5c
3 changed files with 69 additions and 1 deletions

View File

@ -352,6 +352,53 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
}
bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len)
{
if (len > 16384) // sanity check
return false;
uint64_t mostRecentTimestamp = 0;
uint16_t canHasPeer = 0;
{ // Anyone got this peer?
Mutex::Lock _l2(_peerAffinities_m);
std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),_PeerAffinity(toPeerAddress,0,0))); // O(log(n))
while ((i != _peerAffinities.end())&&(i->address() == toPeerAddress)) {
uint16_t mid = i->clusterMemberId();
if ((mid != _id)&&(i->timestamp > mostRecentTimestamp)) {
mostRecentTimestamp = i->timestamp;
canHasPeer = mid;
}
}
}
const uint64_t now = RR->node->now();
if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) {
Buffer<16384> buf;
InetAddress v4,v6;
if (fromPeerAddress) {
SharedPtr<Peer> fromPeer(RR->topology->getPeer(fromPeerAddress));
if (fromPeer)
fromPeer->getBestActiveAddresses(now,v4,v6);
}
buf.append((uint8_t)( (v4) ? ((v6) ? 2 : 1) : ((v6) ? 1 : 0) ));
if (v4)
v4.serialize(buf);
if (v6)
v6.serialize(buf);
buf.append((uint16_t)len);
buf.append(data,len);
{
Mutex::Lock _l2(_members[canHasPeer].lock);
_send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size());
}
}
return false;
}
void Cluster::replicateHavePeer(const Identity &peerId)
{
{ // Use peer affinity table to track our own last announce time for peers