From 5453cab22b26b4caa38457dadf64e5f53920cb2d Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Tue, 20 Aug 2019 18:50:38 -0700 Subject: [PATCH] Added flow-awareness check for policies, more work on ZT_MULTIPATH_ACTIVE_BACKUP --- node/Constants.hpp | 2 +- node/Peer.cpp | 106 ++++++++++++++++++++++++++---------------- node/Peer.hpp | 1 + node/Switch.cpp | 113 +++++++++++++++++++++++++-------------------- node/Switch.hpp | 5 ++ 5 files changed, 134 insertions(+), 93 deletions(-) diff --git a/node/Constants.hpp b/node/Constants.hpp index 278c705d8..ee656aaec 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -491,7 +491,7 @@ /** * How long before we consider a path to be dead in rapid fail-over scenarios */ -#define ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD 1000 +#define ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD 250 /** * Paths are considered expired if they have not sent us a real packet in this long diff --git a/node/Peer.cpp b/node/Peer.cpp index 7e96b5f06..622095e29 100644 --- a/node/Peer.cpp +++ b/node/Peer.cpp @@ -77,7 +77,8 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident _lastAggregateStatsReport(0), _lastAggregateAllocation(0), _virtualPathCount(0), - _roundRobinPathAssignmentIdx(0) + _roundRobinPathAssignmentIdx(0), + _pathAssignmentIdx(0) { if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH)) throw ZT_EXCEPTION_INVALID_ARGUMENT; @@ -468,16 +469,18 @@ SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired, int64 _paths[i].p->processBackgroundPathMeasurements(now); } } - // Detect new flows and update existing records - if (_flows.count(flowId)) { - _flows[flowId]->lastSend = now; - } - else { - fprintf(stderr, "new flow %llx detected between this node and %llx (%lu active flow(s))\n", - flowId, this->_id.address().toInt(), (_flows.size()+1)); - struct Flow *newFlow = new Flow(flowId, now); - _flows[flowId] = newFlow; - newFlow->assignedPath = nullptr; + if (RR->sw->isFlowAware()) { + // Detect new flows and update existing records + if (_flows.count(flowId)) { + _flows[flowId]->lastSend = now; + } + else { + fprintf(stderr, "new flow %llx detected between this node and %llx (%lu active flow(s))\n", + flowId, this->_id.address().toInt(), (_flows.size()+1)); + struct Flow *newFlow = new Flow(flowId, now); + _flows[flowId] = newFlow; + newFlow->assignedPath = nullptr; + } } // Construct set of virtual paths if needed if (!_virtualPaths.size()) { @@ -532,45 +535,64 @@ SharedPtr Peer::getAppropriatePath(int64_t now, bool includeExpired, int64 if ((now - _paths[i].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) { bFoundHotPath = true; _activeBackupPath = _paths[i].p; + _pathAssignmentIdx = i; _activeBackupPath->address().toString(curPathStr); - fprintf(stderr, "selected %s as the primary active-backup path to %llx\n", - curPathStr, this->_id.address().toInt()); + fprintf(stderr, "selected %s as the primary active-backup path to %llx (idx=%d)\n", + curPathStr, this->_id.address().toInt(), _pathAssignmentIdx); + break; } } } - if (!_activeBackupPath) { - return SharedPtr(); - } - if (!bFoundHotPath) { - _activeBackupPath->address().toString(curPathStr); - fprintf(stderr, "no hot paths available to to use as active-backup primary to %llx, selected %s anyway\n", - this->_id.address().toInt(), curPathStr); - } } else { + char what[128]; if ((now - _activeBackupPath->lastIn()) > ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) { - _activeBackupPath->address().toString(curPathStr); - /* Fail-over to the fist path that appears to still be active. - * This will eventually be user-configurable */ - for (int i=0; iaddress().toString(curPathStr); // Record path string for later debug trace + int16_t previousIdx = _pathAssignmentIdx; + SharedPtr nextAlternativePath; + // Search for a hot path, at the same time find the next path in + // a RR sequence that seems viable to use as an alternative + int searchCount = 0; + while (searchCount < ZT_MAX_PEER_NETWORK_PATHS) { + _pathAssignmentIdx++; + if (_pathAssignmentIdx == ZT_MAX_PEER_NETWORK_PATHS) { + _pathAssignmentIdx = 0; + } + searchCount++; + if (_paths[_pathAssignmentIdx].p) { + _paths[_pathAssignmentIdx].p->address().toString(what); + if (_activeBackupPath.ptr() == _paths[_pathAssignmentIdx].p.ptr()) { continue; } - if ((now - _paths[i].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) { + if (!nextAlternativePath) { // Record the first viable alternative in the RR sequence + nextAlternativePath = _paths[_pathAssignmentIdx].p; + } + if ((now - _paths[_pathAssignmentIdx].p->lastIn()) < ZT_MULTIPATH_ACTIVE_BACKUP_RAPID_FAILOVER_PERIOD) { bFoundHotPath = true; - _activeBackupPath->address().toString(curPathStr); // Record path string for later debug trace - _activeBackupPath = _paths[i].p; + _activeBackupPath = _paths[_pathAssignmentIdx].p; _activeBackupPath->address().toString(newPathStr); + fprintf(stderr, "primary active-backup path %s to %llx appears to be dead, switched to %s\n", + curPathStr, this->_id.address().toInt(), newPathStr); + break; } } } - if (bFoundHotPath) { - fprintf(stderr, "primary active-backup path %s to %llx appears to be dead, switched to path %s\n", - curPathStr, this->_id.address().toInt(), newPathStr); + if (!bFoundHotPath) { + if (nextAlternativePath) { + _activeBackupPath = nextAlternativePath; + _activeBackupPath->address().toString(curPathStr); + //fprintf(stderr, "no hot paths found to use as active-backup primary to %llx, using next best: %s\n", + // this->_id.address().toInt(), curPathStr); + } + else { + // No change + } } } } + if (!_activeBackupPath) { + return SharedPtr(); + } return _activeBackupPath; } @@ -866,14 +888,16 @@ inline void Peer::processBackgroundPeerTasks(const int64_t now) } // Remove old flows - std::map::iterator it = _flows.begin(); - while (it != _flows.end()) { - if ((now - it->second->lastSend) > ZT_MULTIPATH_FLOW_EXPIRATION) { - fprintf(stderr, "forgetting flow %llx between this node and %llx (%lu active flow(s))\n", - it->first, this->_id.address().toInt(), _flows.size()); - it = _flows.erase(it); - } else { - it++; + if (RR->sw->isFlowAware()) { + std::map::iterator it = _flows.begin(); + while (it != _flows.end()) { + if ((now - it->second->lastSend) > ZT_MULTIPATH_FLOW_EXPIRATION) { + fprintf(stderr, "forgetting flow %llx between this node and %llx (%lu active flow(s))\n", + it->first, this->_id.address().toInt(), _flows.size()); + it = _flows.erase(it); + } else { + it++; + } } } } diff --git a/node/Peer.hpp b/node/Peer.hpp index 84d7d43a2..dddd8fc01 100644 --- a/node/Peer.hpp +++ b/node/Peer.hpp @@ -727,6 +727,7 @@ private: int16_t _roundRobinPathAssignmentIdx; SharedPtr _activeBackupPath; + int16_t _pathAssignmentIdx; }; } // namespace ZeroTier diff --git a/node/Switch.cpp b/node/Switch.cpp index c2251f23d..51f23f674 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -284,6 +284,14 @@ static bool _ipv6GetPayload(const uint8_t *frameData,unsigned int frameLen,unsig return false; // overflow == invalid } +bool Switch::isFlowAware() +{ + int mode = RR->node->getMultipathMode(); + return (( mode == ZT_MULTIPATH_BALANCE_RR_FLOW) + || (mode == ZT_MULTIPATH_BALANCE_XOR_FLOW) + || (mode == ZT_MULTIPATH_BALANCE_DYNAMIC_FLOW)); +} + void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len) { if (!network->hasConfig()) @@ -309,61 +317,64 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr &network,const * preferred virtual path and will be sent out according to what the multipath logic * deems appropriate. An example of this would be an ICMP packet. */ + int64_t flowId = -1; - if (etherType == ZT_ETHERTYPE_IPV4 && (len >= 20)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - int8_t proto = (reinterpret_cast(data)[9]); - const unsigned int headerLen = 4 * (reinterpret_cast(data)[0] & 0xf); - switch(proto) { - case 0x01: // ICMP - flowId = 0x01; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (len > (headerLen + 4)) { - unsigned int pos = headerLen + 0; - srcPort = (reinterpret_cast(data)[pos++]) << 8; - srcPort |= (reinterpret_cast(data)[pos]); - pos++; - dstPort = (reinterpret_cast(data)[pos++]) << 8; - dstPort |= (reinterpret_cast(data)[pos]); - flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto; - } - break; + if (isFlowAware()) { + if (etherType == ZT_ETHERTYPE_IPV4 && (len >= 20)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + int8_t proto = (reinterpret_cast(data)[9]); + const unsigned int headerLen = 4 * (reinterpret_cast(data)[0] & 0xf); + switch(proto) { + case 0x01: // ICMP + flowId = 0x01; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (len > (headerLen + 4)) { + unsigned int pos = headerLen + 0; + srcPort = (reinterpret_cast(data)[pos++]) << 8; + srcPort |= (reinterpret_cast(data)[pos]); + pos++; + dstPort = (reinterpret_cast(data)[pos++]) << 8; + dstPort |= (reinterpret_cast(data)[pos]); + flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto; + } + break; + } } - } - if (etherType == ZT_ETHERTYPE_IPV6 && (len >= 40)) { - uint16_t srcPort = 0; - uint16_t dstPort = 0; - unsigned int pos; - unsigned int proto; - _ipv6GetPayload((const uint8_t *)data, len, pos, proto); - switch(proto) { - case 0x3A: // ICMPv6 - flowId = 0x3A; - break; - // All these start with 16-bit source and destination port in that order - case 0x06: // TCP - case 0x11: // UDP - case 0x84: // SCTP - case 0x88: // UDPLite - if (len > (pos + 4)) { - srcPort = (reinterpret_cast(data)[pos++]) << 8; - srcPort |= (reinterpret_cast(data)[pos]); - pos++; - dstPort = (reinterpret_cast(data)[pos++]) << 8; - dstPort |= (reinterpret_cast(data)[pos]); - flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto; - } - break; - default: - break; + if (etherType == ZT_ETHERTYPE_IPV6 && (len >= 40)) { + uint16_t srcPort = 0; + uint16_t dstPort = 0; + unsigned int pos; + unsigned int proto; + _ipv6GetPayload((const uint8_t *)data, len, pos, proto); + switch(proto) { + case 0x3A: // ICMPv6 + flowId = 0x3A; + break; + // All these start with 16-bit source and destination port in that order + case 0x06: // TCP + case 0x11: // UDP + case 0x84: // SCTP + case 0x88: // UDPLite + if (len > (pos + 4)) { + srcPort = (reinterpret_cast(data)[pos++]) << 8; + srcPort |= (reinterpret_cast(data)[pos]); + pos++; + dstPort = (reinterpret_cast(data)[pos++]) << 8; + dstPort |= (reinterpret_cast(data)[pos]); + flowId = ((int64_t)srcPort << 48) | ((int64_t)dstPort << 32) | proto; + } + break; + default: + break; + } } } diff --git a/node/Switch.hpp b/node/Switch.hpp index 388e1ccf1..666ee0531 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -91,6 +91,11 @@ public: */ void onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddress &fromAddr,const void *data,unsigned int len); + /** + * Returns whether our bonding or balancing policy is aware of flows. + */ + bool isFlowAware(); + /** * Called when a packet comes from a local Ethernet tap *