Another tweak to cluster I/O rules.

This commit is contained in:
Adam Ierymenko 2017-02-01 13:52:53 -08:00
parent b378f5dcd7
commit 60ff280dcb
4 changed files with 25 additions and 28 deletions

View File

@ -516,7 +516,7 @@ void Cluster::broadcastNetworkConfigChunk(const void *chunk,unsigned int len)
} }
} }
int Cluster::prepSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret) int Cluster::checkSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret)
{ {
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
mostRecentTs = 0; mostRecentTs = 0;

View File

@ -289,7 +289,7 @@ public:
* @param peerSecret Result: Buffer to fill with peer secret on valid return value, must be at least ZT_PEER_SECRET_KEY_LENGTH bytes * @param peerSecret Result: Buffer to fill with peer secret on valid return value, must be at least ZT_PEER_SECRET_KEY_LENGTH bytes
* @return -1 if cluster does not know this peer, or a member ID to pass to sendViaCluster() * @return -1 if cluster does not know this peer, or a member ID to pass to sendViaCluster()
*/ */
int prepSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret); int checkSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret);
/** /**
* Send data via cluster front plane (packet head or fragment) * Send data via cluster front plane (packet head or fragment)

View File

@ -137,7 +137,7 @@ public:
* Get the best current direct path * Get the best current direct path
* *
* @param now Current time * @param now Current time
* @param includeDead If true, include even expired paths * @param includeExpired If true, include even expired paths
* @return Best current path or NULL if none * @return Best current path or NULL if none
*/ */
SharedPtr<Path> getBestPath(uint64_t now,bool includeExpired); SharedPtr<Path> getBestPath(uint64_t now,bool includeExpired);

View File

@ -691,10 +691,13 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
SharedPtr<Path> viaPath; SharedPtr<Path> viaPath;
const uint64_t now = RR->node->now(); const uint64_t now = RR->node->now();
const Address destination(packet.destination()); const Address destination(packet.destination());
#ifdef ZT_ENABLE_CLUSTER #ifdef ZT_ENABLE_CLUSTER
uint64_t clusterMostRecentTs = 0; uint64_t clusterMostRecentTs = 0;
int clusterMostRecentMemberId = -1; int clusterMostRecentMemberId = -1;
uint8_t clusterPeerSecret[ZT_PEER_SECRET_KEY_LENGTH]; uint8_t clusterPeerSecret[ZT_PEER_SECRET_KEY_LENGTH];
if (RR->cluster)
clusterMostRecentMemberId = RR->cluster->checkSendViaCluster(destination,clusterMostRecentTs,clusterPeerSecret);
#endif #endif
const SharedPtr<Peer> peer(RR->topology->getPeer(destination)); const SharedPtr<Peer> peer(RR->topology->getPeer(destination));
@ -708,37 +711,40 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
viaPath = peer->getBestPath(now,false); viaPath = peer->getBestPath(now,false);
if ( (viaPath) && (!viaPath->alive(now)) && (!RR->topology->isUpstream(peer->identity())) ) { if ( (viaPath) && (!viaPath->alive(now)) && (!RR->topology->isUpstream(peer->identity())) ) {
if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) #ifdef ZT_ENABLE_CLUSTER
if ((clusterMostRecentMemberId < 0)||(viaPath->lastIn() > clusterMostRecentTs)) {
#endif
if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) {
peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now); peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now);
viaPath->sent(now);
}
#ifdef ZT_ENABLE_CLUSTER
}
#endif
viaPath.zero(); viaPath.zero();
} }
if (!viaPath) {
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) )
viaPath = peer->getBestPath(now,true);
}
#ifdef ZT_ENABLE_CLUSTER #ifdef ZT_ENABLE_CLUSTER
if (RR->cluster)
clusterMostRecentMemberId = RR->cluster->prepSendViaCluster(destination,clusterMostRecentTs,clusterPeerSecret);
if (clusterMostRecentMemberId >= 0) { if (clusterMostRecentMemberId >= 0) {
if ((viaPath)&&(viaPath->lastIn() < clusterMostRecentTs)) if ((viaPath)&&(viaPath->lastIn() < clusterMostRecentTs))
viaPath.zero(); viaPath.zero();
} else if (!viaPath) { } else if (!viaPath) {
peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known
return false;
}
#else #else
if (!viaPath) { if (!viaPath) {
#endif
peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) {
if (!(viaPath = peer->getBestPath(now,true)))
return false; return false;
} }
#ifdef ZT_ENABLE_CLUSTER
}
#else
}
#endif #endif
} else { } else {
#ifdef ZT_ENABLE_CLUSTER #ifdef ZT_ENABLE_CLUSTER
if (RR->cluster)
clusterMostRecentMemberId = RR->cluster->prepSendViaCluster(destination,clusterMostRecentTs,clusterPeerSecret);
if (clusterMostRecentMemberId < 0) { if (clusterMostRecentMemberId < 0) {
#else #else
requestWhois(destination); requestWhois(destination);
@ -749,15 +755,6 @@ bool Switch::_trySend(Packet &packet,bool encrypt)
#endif #endif
} }
// Sanity checks
#ifdef ZT_ENABLE_CLUSTER
if ((!viaPath)&&(clusterMostRecentMemberId < 0))
return false;
#else
if (!viaPath)
return false;
#endif
unsigned int chunkSize = std::min(packet.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU); unsigned int chunkSize = std::min(packet.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU);
packet.setFragmented(chunkSize < packet.size()); packet.setFragmented(chunkSize < packet.size());