From 29e5880d8b7efc05443a467de93d2ab46df76ab2 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Mon, 3 May 2021 17:59:31 -0700 Subject: [PATCH] Match formatting of Bond-related sources to ZeroTier standard (no functional changes) --- node/Bond.cpp | 965 ++++++++++++++++++++++------------------ node/Bond.hpp | 265 +++++++---- node/BondController.cpp | 61 +-- node/BondController.hpp | 117 +++-- node/Flow.hpp | 64 ++- osdep/Link.hpp | 153 ++++--- 6 files changed, 971 insertions(+), 654 deletions(-) diff --git a/node/Bond.cpp b/node/Bond.cpp index 70088df7f..0b10474b6 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -11,90 +11,85 @@ */ /****/ -#include +#include "Bond.hpp" #include "../osdep/OSUtils.hpp" - -#include "Peer.hpp" -#include "Bond.hpp" #include "Switch.hpp" -#include "Flow.hpp" -#include "Path.hpp" + +#include namespace ZeroTier { -Bond::Bond(const RuntimeEnvironment *renv, int policy, const SharedPtr& peer) : - RR(renv), - _peer(peer), - _qosCutoffCount(0), - _ackCutoffCount(0), - _lastAckRateCheck(0), - _lastQoSRateCheck(0), - _lastQualityEstimation(0), - _lastCheckUserPreferences(0), - _lastBackgroundTaskCheck(0), - _lastBondStatusLog(0), - _lastPathNegotiationReceived(0), - _lastPathNegotiationCheck(0), - _lastSentPathNegotiationRequest(0), - _lastFlowStatReset(0), - _lastFlowExpirationCheck(0), - _lastFlowRebalance(0), - _lastFrame(0), - _lastActiveBackupPathChange(0) +Bond::Bond(const RuntimeEnvironment* renv, int policy, const SharedPtr& peer) + : RR(renv) + , _peer(peer) + , _qosCutoffCount(0) + , _ackCutoffCount(0) + , _lastAckRateCheck(0) + , _lastQoSRateCheck(0) + , _lastQualityEstimation(0) + , _lastCheckUserPreferences(0) + , _lastBackgroundTaskCheck(0) + , _lastBondStatusLog(0) + , _lastPathNegotiationReceived(0) + , _lastPathNegotiationCheck(0) + , _lastSentPathNegotiationRequest(0) + , _lastFlowStatReset(0) + , _lastFlowExpirationCheck(0) + , _lastFlowRebalance(0) + , _lastFrame(0) + , _lastActiveBackupPathChange(0) { setReasonableDefaults(policy, SharedPtr(), false); _policyAlias = BondController::getPolicyStrByCode(policy); } -Bond::Bond(const RuntimeEnvironment *renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr& peer) : - RR(renv), - _policyAlias(policyAlias), - _peer(peer) +Bond::Bond(const RuntimeEnvironment* renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr& peer) : RR(renv), _policyAlias(policyAlias), _peer(peer) { setReasonableDefaults(BondController::getPolicyCodeByStr(basePolicy), SharedPtr(), false); } -Bond::Bond(const RuntimeEnvironment *renv, SharedPtr originalBond, const SharedPtr& peer) : - RR(renv), - _peer(peer), - _lastAckRateCheck(0), - _lastQoSRateCheck(0), - _lastQualityEstimation(0), - _lastCheckUserPreferences(0), - _lastBackgroundTaskCheck(0), - _lastBondStatusLog(0), - _lastPathNegotiationReceived(0), - _lastPathNegotiationCheck(0), - _lastFlowStatReset(0), - _lastFlowExpirationCheck(0), - _lastFlowRebalance(0), - _lastFrame(0) +Bond::Bond(const RuntimeEnvironment* renv, SharedPtr originalBond, const SharedPtr& peer) + : RR(renv) + , _peer(peer) + , _lastAckRateCheck(0) + , _lastQoSRateCheck(0) + , _lastQualityEstimation(0) + , _lastCheckUserPreferences(0) + , _lastBackgroundTaskCheck(0) + , _lastBondStatusLog(0) + , _lastPathNegotiationReceived(0) + , _lastPathNegotiationCheck(0) + , _lastFlowStatReset(0) + , _lastFlowExpirationCheck(0) + , _lastFlowRebalance(0) + , _lastFrame(0) { setReasonableDefaults(originalBond->_bondingPolicy, originalBond, true); } void Bond::nominatePath(const SharedPtr& path, int64_t now) { - char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + char traceMsg[256]; + char pathStr[128]; + path->address().toString(pathStr); Mutex::Lock _l(_paths_m); - if (!RR->bc->linkAllowed(_policyAlias, getLink(path))) { + if (! RR->bc->linkAllowed(_policyAlias, getLink(path))) { return; } bool alreadyPresent = false; - for (int i=0; iifname().c_str(), pathStr, _peer->_id.address().toInt()); + sprintf(traceMsg, "%s (bond) Nominating link %s/%s to peer %llx. It has now entered its trial period", OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); _paths[i]->startTrial(now); break; @@ -120,19 +115,19 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) * broadcast */ if (_bondingPolicy == ZT_BONDING_POLICY_BROADCAST) { - return SharedPtr(); // Handled in Switch::_trySend() + return SharedPtr(); // Handled in Switch::_trySend() } - if (!_numBondedPaths) { - return SharedPtr(); // No paths assigned to bond yet, cannot balance traffic + if (! _numBondedPaths) { + return SharedPtr(); // No paths assigned to bond yet, cannot balance traffic } /** * balance-rr */ if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) { - if (!_allowFlowHashing) { + if (! _allowFlowHashing) { if (_packetsPerLink == 0) { // Randomly select a path - return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize + return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize } if (_rrPacketsSentOnCurrLink < _packetsPerLink) { // Continue to use this link @@ -146,10 +141,10 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) } else { int _tempIdx = _rrIdx; - for (int searchCount = 0; searchCount < (_numBondedPaths-1); searchCount++) { - _tempIdx = (_tempIdx == (_numBondedPaths-1)) ? 0 : _tempIdx+1; + for (int searchCount = 0; searchCount < (_numBondedPaths - 1); searchCount++) { + _tempIdx = (_tempIdx == (_numBondedPaths - 1)) ? 0 : _tempIdx + 1; if (_bondedIdx[_tempIdx] != ZT_MAX_PEER_NETWORK_PATHS) { - if (_paths[_bondedIdx[_tempIdx]] && _paths[_bondedIdx[_tempIdx]]->eligible(now,_ackSendInterval)) { + if (_paths[_bondedIdx[_tempIdx]] && _paths[_bondedIdx[_tempIdx]]->eligible(now, _ackSendInterval)) { _rrIdx = _tempIdx; break; } @@ -165,9 +160,9 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) * balance-xor */ if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { - if (!_allowFlowHashing || flowId == -1) { + if (! _allowFlowHashing || flowId == -1) { // No specific path required for unclassified traffic, send on anything - return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize + return _paths[_bondedIdx[_freeRandomByte % _numBondedPaths]]; // TODO: Optimize } else if (_allowFlowHashing) { // TODO: Optimize @@ -192,38 +187,35 @@ SharedPtr Bond::getAppropriatePath(int64_t now, int32_t flowId) void Bond::recordIncomingInvalidPacket(const SharedPtr& path) { - //char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - //sprintf(traceMsg, "%s (qos) Invalid packet on link %s/%s from peer %llx", + // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + // sprintf(traceMsg, "%s (qos) Invalid packet on link %s/%s from peer %llx", // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, _peer->_id.address().toInt()); - //RR->t->bondStateMessage(NULL, traceMsg); + // RR->t->bondStateMessage(NULL, traceMsg); Mutex::Lock _l(_paths_m); - for (int i=0; ipacketValiditySamples.push(false); } } } -void Bond::recordOutgoingPacket(const SharedPtr &path, const uint64_t packetId, - uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now) +void Bond::recordOutgoingPacket(const SharedPtr& path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, const int32_t flowId, int64_t now) { - //char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - //sprintf(traceMsg, "%s (bond) Outgoing packet on link %s/%s to peer %llx", + // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + // sprintf(traceMsg, "%s (bond) Outgoing packet on link %s/%s to peer %llx", // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, _peer->_id.address().toInt()); - //RR->t->bondStateMessage(NULL, traceMsg); - _freeRandomByte += (unsigned char)(packetId >> 8); // Grab entropy to use in path selection logic - if (!_shouldCollectPathStatistics) { + // RR->t->bondStateMessage(NULL, traceMsg); + _freeRandomByte += (unsigned char)(packetId >> 8); // Grab entropy to use in path selection logic + if (! _shouldCollectPathStatistics) { return; } bool isFrame = (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME); - bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) - && (verb != Packet::VERB_ACK) - && (verb != Packet::VERB_QOS_MEASUREMENT)); + bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) && (verb != Packet::VERB_ACK) && (verb != Packet::VERB_QOS_MEASUREMENT)); if (isFrame || shouldRecord) { Mutex::Lock _l(_paths_m); if (isFrame) { ++(path->_packetsOut); - _lastFrame=now; + _lastFrame = now; } if (shouldRecord) { path->_unackedBytes += payloadLength; @@ -241,22 +233,19 @@ void Bond::recordOutgoingPacket(const SharedPtr &path, const uint64_t pack } } -void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, uint16_t payloadLength, - Packet::Verb verb, int32_t flowId, int64_t now) +void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, uint16_t payloadLength, Packet::Verb verb, int32_t flowId, int64_t now) { - //char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - //sprintf(traceMsg, "%s (bond) Incoming packet on link %s/%s from peer %llx [id=%llx, len=%d, verb=%d, flowId=%x]", + // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + // sprintf(traceMsg, "%s (bond) Incoming packet on link %s/%s from peer %llx [id=%llx, len=%d, verb=%d, flowId=%x]", // OSUtils::humanReadableTimestamp().c_str(), getLink(path)->ifname().c_str(), pathStr, _peer->_id.address().toInt(), packetId, payloadLength, verb, flowId); - //RR->t->bondStateMessage(NULL, traceMsg); + // RR->t->bondStateMessage(NULL, traceMsg); bool isFrame = (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME); - bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) - && (verb != Packet::VERB_ACK) - && (verb != Packet::VERB_QOS_MEASUREMENT)); + bool shouldRecord = (packetId & (ZT_QOS_ACK_DIVISOR - 1) && (verb != Packet::VERB_ACK) && (verb != Packet::VERB_QOS_MEASUREMENT)); if (isFrame || shouldRecord) { Mutex::Lock _l(_paths_m); if (isFrame) { ++(path->_packetsIn); - _lastFrame=now; + _lastFrame = now; } if (shouldRecord) { path->ackStatsIn[packetId] = payloadLength; @@ -271,15 +260,13 @@ void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, * that the next time we send a packet out that is part of a flow we know * which path to use. */ - if ((flowId != ZT_QOS_NO_FLOW) - && (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR - || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR - || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) { + if ((flowId != ZT_QOS_NO_FLOW) && (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) { Mutex::Lock _l(_flows_m); SharedPtr flow; - if (!_flows.count(flowId)) { + if (! _flows.count(flowId)) { flow = createFlow(path, flowId, 0, now); - } else { + } + else { flow = _flows[flowId]; } if (flow) { @@ -288,16 +275,16 @@ void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, } } -void Bond::receivedQoS(const SharedPtr& path, int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts) +void Bond::receivedQoS(const SharedPtr& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts) { Mutex::Lock _l(_paths_m); - //char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - //sprintf(traceMsg, "%s (qos) Received QoS packet sampling %d frames from peer %llx via %s/%s", + // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + // sprintf(traceMsg, "%s (qos) Received QoS packet sampling %d frames from peer %llx via %s/%s", // OSUtils::humanReadableTimestamp().c_str(), count, _peer->_id.address().toInt(), getLink(path)->ifname().c_str(), pathStr); - //RR->t->bondStateMessage(NULL, traceMsg); + // RR->t->bondStateMessage(NULL, traceMsg); // Look up egress times and compute latency values for each record - std::map::iterator it; - for (int j=0; j::iterator it; + for (int j = 0; j < count; j++) { it = path->qosStatsOut.find(rx_id[j]); if (it != path->qosStatsOut.end()) { path->latencySamples.push(((uint16_t)(now - it->second) - rx_ts[j]) / 2); @@ -310,10 +297,10 @@ void Bond::receivedQoS(const SharedPtr& path, int64_t now, int count, uint void Bond::receivedAck(const SharedPtr& path, int64_t now, int32_t ackedBytes) { Mutex::Lock _l(_paths_m); - //char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - //sprintf(traceMsg, "%s (qos) Received ACK packet for %d bytes from peer %llx via %s/%s", + // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + // sprintf(traceMsg, "%s (qos) Received ACK packet for %d bytes from peer %llx via %s/%s", // OSUtils::humanReadableTimestamp().c_str(), ackedBytes, _peer->_id.address().toInt(), getLink(path)->ifname().c_str(), pathStr); - //RR->t->bondStateMessage(NULL, traceMsg); + // RR->t->bondStateMessage(NULL, traceMsg); path->_lastAckReceived = now; path->_unackedBytes = (ackedBytes > path->_unackedBytes) ? 0 : path->_unackedBytes - ackedBytes; int64_t timeSinceThroughputEstimate = (now - path->_lastThroughputEstimation); @@ -327,32 +314,33 @@ void Bond::receivedAck(const SharedPtr& path, int64_t now, int32_t ackedBy } path->_lastThroughputEstimation = now; path->_bytesAckedSinceLastThroughputEstimation = 0; - } else { + } + else { path->_bytesAckedSinceLastThroughputEstimation += ackedBytes; } } -int32_t Bond::generateQoSPacket(const SharedPtr& path, int64_t now, char *qosBuffer) +int32_t Bond::generateQoSPacket(const SharedPtr& path, int64_t now, char* qosBuffer) { int32_t len = 0; - std::map::iterator it = path->qosStatsIn.begin(); - int i=0; - int numRecords = std::min(path->_packetsReceivedSinceLastQoS,ZT_QOS_TABLE_SIZE); - while (iqosStatsIn.end()) { + std::map::iterator it = path->qosStatsIn.begin(); + int i = 0; + int numRecords = std::min(path->_packetsReceivedSinceLastQoS, ZT_QOS_TABLE_SIZE); + while (i < numRecords && it != path->qosStatsIn.end()) { uint64_t id = it->first; memcpy(qosBuffer, &id, sizeof(uint64_t)); - qosBuffer+=sizeof(uint64_t); + qosBuffer += sizeof(uint64_t); uint16_t holdingTime = (uint16_t)(now - it->second); memcpy(qosBuffer, &holdingTime, sizeof(uint16_t)); - qosBuffer+=sizeof(uint16_t); - len+=sizeof(uint64_t)+sizeof(uint16_t); + qosBuffer += sizeof(uint16_t); + len += sizeof(uint64_t) + sizeof(uint16_t); path->qosStatsIn.erase(it++); ++i; } return len; } -bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) +bool Bond::assignFlowToBondedPath(SharedPtr& flow, int64_t now) { char traceMsg[256]; char curPathStr[128]; @@ -361,10 +349,17 @@ bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) idx = abs((int)(flow->id() % (_numBondedPaths))); SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[_bondedIdx[idx]]->localSocket()); _paths[_bondedIdx[idx]]->address().toString(curPathStr); - sprintf(traceMsg, "%s (balance-xor) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)", - OSUtils::humanReadableTimestamp().c_str(), flow->id(), _peer->_id.address().toInt(), link->ifname().c_str(), curPathStr, _flows.size()); + sprintf( + traceMsg, + "%s (balance-xor) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)", + OSUtils::humanReadableTimestamp().c_str(), + flow->id(), + _peer->_id.address().toInt(), + link->ifname().c_str(), + curPathStr, + _flows.size()); RR->t->bondStateMessage(NULL, traceMsg); - flow->assignPath(_paths[_bondedIdx[idx]],now); + flow->assignPath(_paths[_bondedIdx[idx]], now); ++(_paths[_bondedIdx[idx]]->_assignedFlowCount); } if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { @@ -373,9 +368,8 @@ bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) if (_totalBondUnderload) { entropy %= _totalBondUnderload; } - if (!_numBondedPaths) { - sprintf(traceMsg, "%s (balance-aware) There are no bonded paths, cannot assign flow %x\n", - OSUtils::humanReadableTimestamp().c_str(), flow->id()); + if (! _numBondedPaths) { + sprintf(traceMsg, "%s (balance-aware) There are no bonded paths, cannot assign flow %x\n", OSUtils::humanReadableTimestamp().c_str(), flow->id()); RR->t->bondStateMessage(NULL, traceMsg); return false; } @@ -384,13 +378,13 @@ bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) value that we use to randomly assign among the surviving paths, otherwise we risk not being able to find a path to assign this flow to. */ int totalIncompleteAllocation = 0; - for(unsigned int i=0;ibonded()) { totalIncompleteAllocation += _paths[i]->_allocation; } } entropy %= totalIncompleteAllocation; - for(unsigned int i=0;ibonded()) { SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); _paths[i]->address().toString(curPathStr); @@ -406,46 +400,52 @@ bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) if (flow->_assignedPath) { flow->_previouslyAssignedPath = flow->_assignedPath; } - flow->assignPath(_paths[idx],now); + flow->assignPath(_paths[idx], now); ++(_paths[idx]->_assignedFlowCount); } else { - fprintf(stderr, "could not assign flow?\n"); exit(0); // TODO: Remove for production + fprintf(stderr, "could not assign flow?\n"); + exit(0); // TODO: Remove for production return false; } } if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) { if (_abOverflowEnabled) { - flow->assignPath(_abPath, now); - } else { - sprintf(traceMsg, "%s (bond) Unable to assign outgoing flow %x to peer %llx, no active overflow link", - OSUtils::humanReadableTimestamp().c_str(), flow->id(), _peer->_id.address().toInt()); + flow->assignPath(_abPath, now); + } + else { + sprintf(traceMsg, "%s (bond) Unable to assign outgoing flow %x to peer %llx, no active overflow link", OSUtils::humanReadableTimestamp().c_str(), flow->id(), _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); return false; } } flow->assignedPath()->address().toString(curPathStr); SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, flow->assignedPath()->localSocket()); - sprintf(traceMsg, "%s (bond) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)", - OSUtils::humanReadableTimestamp().c_str(), flow->id(), _peer->_id.address().toInt(), link->ifname().c_str(), curPathStr, _flows.size()); + sprintf( + traceMsg, + "%s (bond) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)", + OSUtils::humanReadableTimestamp().c_str(), + flow->id(), + _peer->_id.address().toInt(), + link->ifname().c_str(), + curPathStr, + _flows.size()); RR->t->bondStateMessage(NULL, traceMsg); return true; } -SharedPtr Bond::createFlow(const SharedPtr &path, int32_t flowId, unsigned char entropy, int64_t now) +SharedPtr Bond::createFlow(const SharedPtr& path, int32_t flowId, unsigned char entropy, int64_t now) { char traceMsg[256]; char curPathStr[128]; // --- - if (!_numBondedPaths) { - sprintf(traceMsg, "%s (bond) There are no bonded paths to peer %llx, cannot assign flow %x\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), flowId); + if (! _numBondedPaths) { + sprintf(traceMsg, "%s (bond) There are no bonded paths to peer %llx, cannot assign flow %x\n", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), flowId); RR->t->bondStateMessage(NULL, traceMsg); return SharedPtr(); } if (_flows.size() >= ZT_FLOW_MAX_COUNT) { - sprintf(traceMsg, "%s (bond) Maximum number of flows on bond to peer %llx reached (%d), forcibly forgetting oldest flow\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), ZT_FLOW_MAX_COUNT); + sprintf(traceMsg, "%s (bond) Maximum number of flows on bond to peer %llx reached (%d), forcibly forgetting oldest flow\n", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), ZT_FLOW_MAX_COUNT); RR->t->bondStateMessage(NULL, traceMsg); forgetFlowsWhenNecessary(0, true, now); } @@ -457,19 +457,26 @@ SharedPtr Bond::createFlow(const SharedPtr &path, int32_t flowId, un * that the remote peer chose for us. */ if (path) { - flow->assignPath(path,now); + flow->assignPath(path, now); path->address().toString(curPathStr); path->_assignedFlowCount++; SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, flow->assignedPath()->localSocket()); - sprintf(traceMsg, "%s (bond) Assigned incoming flow %x from peer %llx to link %s/%s, %lu active flow(s)", - OSUtils::humanReadableTimestamp().c_str(), flow->id(), _peer->_id.address().toInt(), link->ifname().c_str(), curPathStr, _flows.size()); + sprintf( + traceMsg, + "%s (bond) Assigned incoming flow %x from peer %llx to link %s/%s, %lu active flow(s)", + OSUtils::humanReadableTimestamp().c_str(), + flow->id(), + _peer->_id.address().toInt(), + link->ifname().c_str(), + curPathStr, + _flows.size()); RR->t->bondStateMessage(NULL, traceMsg); } /** * Add a flow when no path was provided. This means that it is an outgoing packet * and that it is up to the local peer to decide how to load-balance its transmission. */ - else if (!path) { + else if (! path) { assignFlowToBondedPath(flow, now); } return flow; @@ -478,23 +485,23 @@ SharedPtr Bond::createFlow(const SharedPtr &path, int32_t flowId, un void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now) { char traceMsg[256]; - std::map >::iterator it = _flows.begin(); - std::map >::iterator oldestFlow = _flows.end(); + std::map >::iterator it = _flows.begin(); + std::map >::iterator oldestFlow = _flows.end(); SharedPtr expiredFlow; - if (age) { // Remove by specific age + if (age) { // Remove by specific age while (it != _flows.end()) { if (it->second->age(now) > age) { - sprintf(traceMsg, "%s (bond) Forgetting flow %x between this node and peer %llx, %lu active flow(s)", - OSUtils::humanReadableTimestamp().c_str(), it->first, _peer->_id.address().toInt(), (_flows.size()-1)); + sprintf(traceMsg, "%s (bond) Forgetting flow %x between this node and peer %llx, %lu active flow(s)", OSUtils::humanReadableTimestamp().c_str(), it->first, _peer->_id.address().toInt(), (_flows.size() - 1)); RR->t->bondStateMessage(NULL, traceMsg); it->second->assignedPath()->_assignedFlowCount--; it = _flows.erase(it); - } else { + } + else { ++it; } } } - else if (oldest) { // Remove single oldest by natural expiration + else if (oldest) { // Remove single oldest by natural expiration uint64_t maxAge = 0; while (it != _flows.end()) { if (it->second->age(now) > maxAge) { @@ -504,8 +511,14 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now) ++it; } if (oldestFlow != _flows.end()) { - sprintf(traceMsg, "%s (bond) Forgetting oldest flow %x (of age %llu) between this node and peer %llx, %lu active flow(s)", - OSUtils::humanReadableTimestamp().c_str(), oldestFlow->first, oldestFlow->second->age(now), _peer->_id.address().toInt(), (_flows.size()-1)); + sprintf( + traceMsg, + "%s (bond) Forgetting oldest flow %x (of age %llu) between this node and peer %llx, %lu active flow(s)", + OSUtils::humanReadableTimestamp().c_str(), + oldestFlow->first, + oldestFlow->second->age(now), + _peer->_id.address().toInt(), + (_flows.size() - 1)); RR->t->bondStateMessage(NULL, traceMsg); oldestFlow->second->assignedPath()->_assignedFlowCount--; _flows.erase(oldestFlow); @@ -513,7 +526,7 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now) } } -void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr &path, int16_t remoteUtility) +void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr& path, int16_t remoteUtility) { char traceMsg[256]; if (_abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) { @@ -522,48 +535,69 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr & Mutex::Lock _l(_paths_m); char pathStr[128]; path->address().toString(pathStr); - if (!_lastPathNegotiationCheck) { + if (! _lastPathNegotiationCheck) { return; } SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, path->localSocket()); if (remoteUtility > _localUtility) { - char pathStr[128]; path->address().toString(pathStr); - sprintf(traceMsg, "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is GREATER than local utility (%d), switching to said link\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), link->ifname().c_str(), pathStr, remoteUtility, _localUtility); + char pathStr[128]; + path->address().toString(pathStr); + sprintf( + traceMsg, + "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is GREATER than local utility (%d), switching to said link\n", + OSUtils::humanReadableTimestamp().c_str(), + _peer->_id.address().toInt(), + link->ifname().c_str(), + pathStr, + remoteUtility, + _localUtility); RR->t->bondStateMessage(NULL, traceMsg); negotiatedPath = path; } if (remoteUtility < _localUtility) { - sprintf(traceMsg, "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is LESS than local utility (%d), not switching\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), link->ifname().c_str(), pathStr, remoteUtility, _localUtility); + sprintf( + traceMsg, + "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is LESS than local utility (%d), not switching\n", + OSUtils::humanReadableTimestamp().c_str(), + _peer->_id.address().toInt(), + link->ifname().c_str(), + pathStr, + remoteUtility, + _localUtility); RR->t->bondStateMessage(NULL, traceMsg); } if (remoteUtility == _localUtility) { - sprintf(traceMsg, "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is equal to local utility (%d)\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), link->ifname().c_str(), pathStr, remoteUtility, _localUtility); + sprintf( + traceMsg, + "%s (bond) Peer %llx suggests using alternate link %s/%s. Remote utility (%d) is equal to local utility (%d)\n", + OSUtils::humanReadableTimestamp().c_str(), + _peer->_id.address().toInt(), + link->ifname().c_str(), + pathStr, + remoteUtility, + _localUtility); RR->t->bondStateMessage(NULL, traceMsg); if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) { - sprintf(traceMsg, "%s (bond) Agreeing with peer %llx to use alternate link %s/%s\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), link->ifname().c_str(), pathStr); + sprintf(traceMsg, "%s (bond) Agreeing with peer %llx to use alternate link %s/%s\n", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), link->ifname().c_str(), pathStr); RR->t->bondStateMessage(NULL, traceMsg); negotiatedPath = path; - } else { - sprintf(traceMsg, "%s (bond) Ignoring petition from peer %llx to use alternate link %s/%s\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), link->ifname().c_str(), pathStr); + } + else { + sprintf(traceMsg, "%s (bond) Ignoring petition from peer %llx to use alternate link %s/%s\n", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), link->ifname().c_str(), pathStr); RR->t->bondStateMessage(NULL, traceMsg); } } } -void Bond::pathNegotiationCheck(void *tPtr, const int64_t now) +void Bond::pathNegotiationCheck(void* tPtr, const int64_t now) { char pathStr[128]; int maxInPathIdx = ZT_MAX_PEER_NETWORK_PATHS; int maxOutPathIdx = ZT_MAX_PEER_NETWORK_PATHS; uint64_t maxInCount = 0; uint64_t maxOutCount = 0; - for(unsigned int i=0;i_packetsIn > maxInCount) { @@ -576,30 +610,28 @@ void Bond::pathNegotiationCheck(void *tPtr, const int64_t now) } _paths[i]->resetPacketCounts(); } - bool _peerLinksSynchronized = ((maxInPathIdx != ZT_MAX_PEER_NETWORK_PATHS) - && (maxOutPathIdx != ZT_MAX_PEER_NETWORK_PATHS) - && (maxInPathIdx != maxOutPathIdx)) ? false : true; + bool _peerLinksSynchronized = ((maxInPathIdx != ZT_MAX_PEER_NETWORK_PATHS) && (maxOutPathIdx != ZT_MAX_PEER_NETWORK_PATHS) && (maxInPathIdx != maxOutPathIdx)) ? false : true; /** * Determine utility and attempt to petition remote peer to switch to our chosen path */ - if (!_peerLinksSynchronized) { + if (! _peerLinksSynchronized) { _localUtility = _paths[maxOutPathIdx]->_failoverScore - _paths[maxInPathIdx]->_failoverScore; if (_paths[maxOutPathIdx]->_negotiated) { _localUtility -= ZT_MULTIPATH_FAILOVER_HANDICAP_NEGOTIATED; } if ((now - _lastSentPathNegotiationRequest) > ZT_PATH_NEGOTIATION_CUTOFF_TIME) { - //fprintf(stderr, "BT: (sync) it's been long enough, sending more requests.\n"); + // fprintf(stderr, "BT: (sync) it's been long enough, sending more requests.\n"); _numSentPathNegotiationRequests = 0; } if (_numSentPathNegotiationRequests < ZT_PATH_NEGOTIATION_TRY_COUNT) { if (_localUtility >= 0) { - //fprintf(stderr, "BT: (sync) paths appear to be out of sync (utility=%d)\n", _localUtility); + // fprintf(stderr, "BT: (sync) paths appear to be out of sync (utility=%d)\n", _localUtility); sendPATH_NEGOTIATION_REQUEST(tPtr, _paths[maxOutPathIdx]); ++_numSentPathNegotiationRequests; _lastSentPathNegotiationRequest = now; _paths[maxOutPathIdx]->address().toString(pathStr); - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _paths[maxOutPathIdx]->localSocket()); - //fprintf(stderr, "sending request to use %s on %s, ls=%llx, utility=%d\n", pathStr, link->ifname().c_str(), _paths[maxOutPathIdx]->localSocket(), _localUtility); + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[maxOutPathIdx]->localSocket()); + // fprintf(stderr, "sending request to use %s on %s, ls=%llx, utility=%d\n", pathStr, link->ifname().c_str(), _paths[maxOutPathIdx]->localSocket(), _localUtility); } } /** @@ -608,90 +640,98 @@ void Bond::pathNegotiationCheck(void *tPtr, const int64_t now) else if ((now - _lastSentPathNegotiationRequest) > (2 * ZT_PATH_NEGOTIATION_CHECK_INTERVAL)) { if (_localUtility == 0) { // There's no loss to us, just switch without sending a another request - //fprintf(stderr, "BT: (sync) giving up, switching to remote peer's path.\n"); + // fprintf(stderr, "BT: (sync) giving up, switching to remote peer's path.\n"); negotiatedPath = _paths[maxInPathIdx]; } } } } -void Bond::sendPATH_NEGOTIATION_REQUEST(void *tPtr, const SharedPtr &path) +void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, const SharedPtr& path) { - char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - sprintf(traceMsg, "%s (bond) Sending link negotiation request to peer %llx via link %s/%s, local utility is %d", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), getLink(path)->ifname().c_str(), pathStr, _localUtility); + char traceMsg[256]; + char pathStr[128]; + path->address().toString(pathStr); + sprintf( + traceMsg, + "%s (bond) Sending link negotiation request to peer %llx via link %s/%s, local utility is %d", + OSUtils::humanReadableTimestamp().c_str(), + _peer->_id.address().toInt(), + getLink(path)->ifname().c_str(), + pathStr, + _localUtility); RR->t->bondStateMessage(NULL, traceMsg); if (_abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) { return; } - Packet outp(_peer->_id.address(),RR->identity.address(),Packet::VERB_PATH_NEGOTIATION_REQUEST); + Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_PATH_NEGOTIATION_REQUEST); outp.append(_localUtility); if (path->address()) { - outp.armor(_peer->key(),false,_peer->aesKeysIfSupported()); - RR->node->putPacket(tPtr,path->localSocket(),path->address(),outp.data(),outp.size()); + outp.armor(_peer->key(), false, _peer->aesKeysIfSupported()); + RR->node->putPacket(tPtr, path->localSocket(), path->address(), outp.data(), outp.size()); } } -void Bond::sendACK(void *tPtr, const SharedPtr &path,const int64_t localSocket, - const InetAddress &atAddress,int64_t now) +void Bond::sendACK(void* tPtr, const SharedPtr& path, const int64_t localSocket, const InetAddress& atAddress, int64_t now) { - Packet outp(_peer->_id.address(),RR->identity.address(),Packet::VERB_ACK); + Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_ACK); int32_t bytesToAck = 0; - std::map::iterator it = path->ackStatsIn.begin(); + std::map::iterator it = path->ackStatsIn.begin(); while (it != path->ackStatsIn.end()) { bytesToAck += it->second; ++it; } - //char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - //sprintf(traceMsg, "%s (qos) Sending ACK packet for %d bytes to peer %llx via link %s/%s", + // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + // sprintf(traceMsg, "%s (qos) Sending ACK packet for %d bytes to peer %llx via link %s/%s", // OSUtils::humanReadableTimestamp().c_str(), bytesToAck, _peer->_id.address().toInt(), getLink(path)->ifname().c_str(), pathStr); - //RR->t->bondStateMessage(NULL, traceMsg); + // RR->t->bondStateMessage(NULL, traceMsg); outp.append(bytesToAck); if (atAddress) { - outp.armor(_peer->key(),false,_peer->aesKeysIfSupported()); - RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); - } else { - RR->sw->send(tPtr,outp,false); + outp.armor(_peer->key(), false, _peer->aesKeysIfSupported()); + RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size()); + } + else { + RR->sw->send(tPtr, outp, false); } path->ackStatsIn.clear(); path->_packetsReceivedSinceLastAck = 0; path->_lastAckSent = now; } -void Bond::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr &path,const int64_t localSocket, - const InetAddress &atAddress,int64_t now) +void Bond::sendQOS_MEASUREMENT(void* tPtr, const SharedPtr& path, const int64_t localSocket, const InetAddress& atAddress, int64_t now) { - //char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); - //sprintf(traceMsg, "%s (qos) Sending QoS packet to peer %llx via link %s/%s", + // char traceMsg[256]; char pathStr[128]; path->address().toString(pathStr); + // sprintf(traceMsg, "%s (qos) Sending QoS packet to peer %llx via link %s/%s", // OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), getLink(path)->ifname().c_str(), pathStr); - //RR->t->bondStateMessage(NULL, traceMsg); + // RR->t->bondStateMessage(NULL, traceMsg); const int64_t _now = RR->node->now(); - Packet outp(_peer->_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT); + Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_QOS_MEASUREMENT); char qosData[ZT_QOS_MAX_PACKET_SIZE]; - int16_t len = generateQoSPacket(path, _now,qosData); - outp.append(qosData,len); + int16_t len = generateQoSPacket(path, _now, qosData); + outp.append(qosData, len); if (atAddress) { - outp.armor(_peer->key(),false,_peer->aesKeysIfSupported()); - RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size()); - } else { - RR->sw->send(tPtr,outp,false); + outp.armor(_peer->key(), false, _peer->aesKeysIfSupported()); + RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size()); + } + else { + RR->sw->send(tPtr, outp, false); } // Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers. path->_packetsReceivedSinceLastQoS = 0; path->_lastQoSMeasurement = now; } -void Bond::processBackgroundTasks(void *tPtr, const int64_t now) +void Bond::processBackgroundTasks(void* tPtr, const int64_t now) { Mutex::Lock _l(_paths_m); - if (!_peer->_canUseMultipath || (now - _lastBackgroundTaskCheck) < ZT_BOND_BACKGROUND_TASK_MIN_INTERVAL) { + if (! _peer->_canUseMultipath || (now - _lastBackgroundTaskCheck) < ZT_BOND_BACKGROUND_TASK_MIN_INTERVAL) { return; } _lastBackgroundTaskCheck = now; // Compute dynamic path monitor timer interval if (_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_DYNAMIC) { - int suggestedMonitorInterval = (now - _lastFrame) / 100; + int suggestedMonitorInterval = (now - _lastFrame) / 100; _dynamicPathMonitorInterval = std::min(ZT_PATH_HEARTBEAT_PERIOD, ((suggestedMonitorInterval > _bondMonitorInterval) ? suggestedMonitorInterval : _bondMonitorInterval)); } // TODO: Clarify and generalize this logic @@ -719,7 +759,7 @@ void Bond::processBackgroundTasks(void *tPtr, const int64_t now) applyUserPrefs(); } - curateBond(now,false); + curateBond(now, false); if ((now - _lastQualityEstimation) > _qualityEstimationInterval) { _lastQualityEstimation = now; estimatePathQuality(now); @@ -728,20 +768,19 @@ void Bond::processBackgroundTasks(void *tPtr, const int64_t now) // Send QOS/ACK packets as needed if (_shouldCollectPathStatistics) { - for(unsigned int i=0;iallowed()) { - if (_paths[i]->needsToSendQoS(now,_qosSendInterval)) { + if (_paths[i]->needsToSendQoS(now, _qosSendInterval)) { sendQOS_MEASUREMENT(tPtr, _paths[i], _paths[i]->localSocket(), _paths[i]->address(), now); } - if (_paths[i]->needsToSendAck(now,_ackSendInterval)) { + if (_paths[i]->needsToSendAck(now, _ackSendInterval)) { sendACK(tPtr, _paths[i], _paths[i]->localSocket(), _paths[i]->address(), now); } } } } // Perform periodic background tasks unique to each bonding policy - switch (_bondingPolicy) - { + switch (_bondingPolicy) { case ZT_BONDING_POLICY_ACTIVE_BACKUP: processActiveBackupTasks(tPtr, now); break; @@ -764,19 +803,19 @@ void Bond::processBackgroundTasks(void *tPtr, const int64_t now) void Bond::applyUserPrefs() { - for(unsigned int i=0;i sl = getLink(_paths[i]); if (sl) { - if (sl->monitorInterval() == 0) { // If no interval was specified for this link, use more generic bond-wide interval + if (sl->monitorInterval() == 0) { // If no interval was specified for this link, use more generic bond-wide interval sl->setMonitorInterval(_bondMonitorInterval); } RR->bc->setMinReqPathMonitorInterval((sl->monitorInterval() < RR->bc->minReqPathMonitorInterval()) ? sl->monitorInterval() : RR->bc->minReqPathMonitorInterval()); bool bFoundCommonLink = false; - SharedPtr commonLink =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); - for(unsigned int j=0;j commonLink = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + for (unsigned int j = 0; j < ZT_MAX_PEER_NETWORK_PATHS; ++j) { if (_paths[j] && _paths[j].ptr() != _paths[i].ptr()) { if (RR->bc->getLinkBySocket(_policyAlias, _paths[j]->localSocket()) == commonLink) { bFoundCommonLink = true; @@ -789,7 +828,7 @@ void Bond::applyUserPrefs() _paths[i]->_ipvPref = sl->ipvPref(); _paths[i]->_mode = sl->mode(); _paths[i]->_enabled = sl->enabled(); - _paths[i]->_onlyPathOnLink = !bFoundCommonLink; + _paths[i]->_onlyPathOnLink = ! bFoundCommonLink; } } if (_peer) { @@ -807,37 +846,51 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) /** * Update path states */ - for(unsigned int i=0;ialive(now, true)) { tmpNumAliveLinks++; } - bool currEligibility = _paths[i]->eligible(now,_ackSendInterval); + bool currEligibility = _paths[i]->eligible(now, _ackSendInterval); if (currEligibility != _paths[i]->_lastEligibilityState) { _paths[i]->address().toString(pathStr); - char traceMsg[256]; _paths[i]->address().toString(pathStr); - sprintf(traceMsg, "%s (bond) Eligibility of link %s/%s to peer %llx has changed from %d to %d", - OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), pathStr, _peer->_id.address().toInt(), _paths[i]->_lastEligibilityState, currEligibility); + char traceMsg[256]; + _paths[i]->address().toString(pathStr); + sprintf( + traceMsg, + "%s (bond) Eligibility of link %s/%s to peer %llx has changed from %d to %d", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_paths[i])->ifname().c_str(), + pathStr, + _peer->_id.address().toInt(), + _paths[i]->_lastEligibilityState, + currEligibility); RR->t->bondStateMessage(NULL, traceMsg); if (currEligibility) { rebuildBond = true; } - if (!currEligibility) { - _paths[i]->adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, !currEligibility); + if (! currEligibility) { + _paths[i]->adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, ! currEligibility); if (_paths[i]->bonded()) { - char pathStr[128]; _paths[i]->address().toString(pathStr); - sprintf(traceMsg, "%s (bond) Link %s/%s to peer %llx was bonded, reallocation of its flows will occur soon", - OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), pathStr, _peer->_id.address().toInt()); + char pathStr[128]; + _paths[i]->address().toString(pathStr); + sprintf( + traceMsg, + "%s (bond) Link %s/%s to peer %llx was bonded, reallocation of its flows will occur soon", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_paths[i])->ifname().c_str(), + pathStr, + _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); rebuildBond = true; _paths[i]->_shouldReallocateFlows = _paths[i]->bonded(); _paths[i]->setBonded(false); - } else { - sprintf(traceMsg, "%s (bond) Link %s/%s to peer %llx was not bonded, no allocation consequences", - OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), pathStr, _peer->_id.address().toInt()); + } + else { + sprintf(traceMsg, "%s (bond) Link %s/%s to peer %llx was not bonded, no allocation consequences", OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), pathStr, _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } } @@ -862,25 +915,25 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) } if (_bondingPolicy == ZT_BONDING_POLICY_BROADCAST) { if (_numAliveLinks < 1) { - // Considerd healthy if we're able to send frames at all + // Considered healthy if we're able to send frames at all tmpHealthStatus = false; } } if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) { if (_numAliveLinks < _numTotalLinks) { - // Considerd healthy if all known paths are alive, this should be refined to account for user bond config settings + // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings tmpHealthStatus = false; } } if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) { if (_numAliveLinks < _numTotalLinks) { - // Considerd healthy if all known paths are alive, this should be refined to account for user bond config settings + // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings tmpHealthStatus = false; } } if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { if (_numAliveLinks < _numTotalLinks) { - // Considerd healthy if all known paths are alive, this should be refined to account for user bond config settings + // Considered healthy if all known paths are alive, this should be refined to account for user bond config settings tmpHealthStatus = false; } } @@ -888,11 +941,11 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) std::string healthStatusStr; if (tmpHealthStatus == true) { healthStatusStr = "HEALTHY"; - } else { + } + else { healthStatusStr = "DEGRADED"; } - sprintf(traceMsg, "%s (bond) Bond to peer %llx is in a %s state (%d/%d links)", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), healthStatusStr.c_str(), _numAliveLinks, _numTotalLinks); + sprintf(traceMsg, "%s (bond) Bond to peer %llx is in a %s state (%d/%d links)", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), healthStatusStr.c_str(), _numAliveLinks, _numTotalLinks); RR->t->bondStateMessage(NULL, traceMsg); } @@ -902,27 +955,25 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) * Curate the set of paths that are part of the bond proper. Selects a single path * per logical link according to eligibility and user-specified constraints. */ - if ((_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) - || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) - || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) { - if (!_numBondedPaths) { + if ((_bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR) || (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE)) { + if (! _numBondedPaths) { rebuildBond = true; } // TODO: Optimize if (rebuildBond) { int updatedBondedPathCount = 0; - std::map,int> linkMap; - for (int i=0;iallowed() && (_paths[i]->eligible(now,_ackSendInterval) || !_numBondedPaths)) { - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); - if (!linkMap.count(link)) { + std::map, int> linkMap; + for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (_paths[i] && _paths[i]->allowed() && (_paths[i]->eligible(now, _ackSendInterval) || ! _numBondedPaths)) { + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + if (! linkMap.count(link)) { linkMap[link] = i; } else { bool overriden = false; _paths[i]->address().toString(pathStr); - //fprintf(stderr, " link representative path already exists! (%s %s)\n", getLink(_paths[i])->ifname().c_str(), pathStr); - if (_paths[i]->preferred() && !_paths[linkMap[link]]->preferred()) { + // fprintf(stderr, " link representative path already exists! (%s %s)\n", getLink(_paths[i])->ifname().c_str(), pathStr); + if (_paths[i]->preferred() && ! _paths[linkMap[link]]->preferred()) { // Override previous choice if preferred if (_paths[linkMap[link]]->_assignedFlowCount) { _paths[linkMap[link]]->_deprecated = true; @@ -934,8 +985,7 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) linkMap[link] = i; overriden = true; } - if ((_paths[i]->preferred() && _paths[linkMap[link]]->preferred()) - || (!_paths[i]->preferred() && !_paths[linkMap[link]]->preferred())) { + if ((_paths[i]->preferred() && _paths[linkMap[link]]->preferred()) || (! _paths[i]->preferred() && ! _paths[linkMap[link]]->preferred())) { if (_paths[i]->preferenceRank() > _paths[linkMap[link]]->preferenceRank()) { // Override if higher preference if (_paths[linkMap[link]]->_assignedFlowCount) { @@ -951,9 +1001,9 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) } } } - std::map,int>::iterator it = linkMap.begin(); - for (int i=0; i, int>::iterator it = linkMap.begin(); + for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (! _paths[i]) { continue; } _bondedIdx[i] = ZT_MAX_PEER_NETWORK_PATHS; @@ -963,7 +1013,7 @@ void Bond::curateBond(const int64_t now, bool rebuildBond) ++it; ++updatedBondedPathCount; _paths[_bondedIdx[i]]->address().toString(pathStr); - //fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getLink(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr); + // fprintf(stderr, "setting i=%d, _bondedIdx[%d]=%d to bonded (%s %s)\n", i, i, _bondedIdx[i], getLink(_paths[_bondedIdx[i]])->ifname().c_str(), pathStr); } } _numBondedPaths = updatedBondedPathCount; @@ -979,17 +1029,17 @@ void Bond::estimatePathQuality(const int64_t now) { char pathStr[128]; uint32_t totUserSpecifiedLinkSpeed = 0; - if (_numBondedPaths) { // Compute relative user-specified speeds of links - for(unsigned int i=0;i<_numBondedPaths;++i) { - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + if (_numBondedPaths) { // Compute relative user-specified speeds of links + for (unsigned int i = 0; i < _numBondedPaths; ++i) { + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); if (_paths[i] && _paths[i]->allowed()) { totUserSpecifiedLinkSpeed += link->speed(); } } - for(unsigned int i=0;i<_numBondedPaths;++i) { - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + for (unsigned int i = 0; i < _numBondedPaths; ++i) { + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); if (_paths[i] && _paths[i]->allowed()) { - link->setRelativeSpeed(round( ((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255)); + link->setRelativeSpeed(round(((float)link->speed() / (float)totUserSpecifiedLinkSpeed) * 255)); } } } @@ -1017,8 +1067,8 @@ void Bond::estimatePathQuality(const int64_t now) memset(&alloc, 0, sizeof(alloc)); // Compute initial summary statistics - for(unsigned int i=0;iallowed()) { + for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (! _paths[i] || ! _paths[i]->allowed()) { continue; } // Compute/Smooth average of real-world observations @@ -1028,31 +1078,34 @@ void Bond::estimatePathQuality(const int64_t now) if (userHasSpecifiedLinkSpeeds()) { // Use user-reported metrics - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); if (link) { _paths[i]->_throughputMean = link->speed(); _paths[i]->_throughputVariance = 0; } } // Drain unacknowledged QoS records - std::map::iterator it = _paths[i]->qosStatsOut.begin(); + std::map::iterator it = _paths[i]->qosStatsOut.begin(); uint64_t currentLostRecords = 0; while (it != _paths[i]->qosStatsOut.end()) { - int qosRecordTimeout = 5000; //_paths[i]->monitorInterval() * ZT_MULTIPATH_QOS_ACK_INTERVAL_MULTIPLIER * 8; + int qosRecordTimeout = 5000; //_paths[i]->monitorInterval() * ZT_MULTIPATH_QOS_ACK_INTERVAL_MULTIPLIER * 8; if ((now - it->second) >= qosRecordTimeout) { // Packet was lost it = _paths[i]->qosStatsOut.erase(it); ++currentLostRecords; - } else { ++it; } + } + else { + ++it; + } } - quality[i]=0; - totQuality=0; + quality[i] = 0; + totQuality = 0; // Normalize raw observations according to sane limits and/or user specified values - lat[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyMean, 0, _maxAcceptableLatency, 0, 1)); - pdv[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1)); - plr[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1)); - per[i] = 1.0 / expf(4*Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1)); + lat[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_latencyMean, 0, _maxAcceptableLatency, 0, 1)); + pdv[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_latencyVariance, 0, _maxAcceptablePacketDelayVariance, 0, 1)); + plr[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_packetLossRatio, 0, _maxAcceptablePacketLossRatio, 0, 1)); + per[i] = 1.0 / expf(4 * Utils::normalize(_paths[i]->_packetErrorRatio, 0, _maxAcceptablePacketErrorRatio, 0, 1)); // Record bond-wide maximums to determine relative values maxLAT = lat[i] > maxLAT ? lat[i] : maxLAT; maxPDV = pdv[i] > maxPDV ? pdv[i] : maxPDV; @@ -1060,7 +1113,7 @@ void Bond::estimatePathQuality(const int64_t now) maxPER = per[i] > maxPER ? per[i] : maxPER; } // Convert metrics to relative quantities and apply contribution weights - for(unsigned int i=0;ibonded()) { quality[i] += ((maxLAT > 0.0f ? lat[i] / maxLAT : 0.0f) * _qualityWeights[ZT_QOS_LAT_IDX]); quality[i] += ((maxPDV > 0.0f ? pdv[i] / maxPDV : 0.0f) * _qualityWeights[ZT_QOS_PDV_IDX]); @@ -1070,7 +1123,7 @@ void Bond::estimatePathQuality(const int64_t now) } } // Normalize to 8-bit allocation values - for(unsigned int i=0;ibonded()) { alloc[i] = std::ceil((quality[i] / totQuality) * (float)255); _paths[i]->_allocation = alloc[i]; @@ -1083,12 +1136,12 @@ void Bond::processBalanceTasks(const int64_t now) char curPathStr[128]; // TODO: Generalize int totalAllocation = 0; - for (int i=0;ibonded() && _paths[i]->eligible(now,_ackSendInterval)) { - totalAllocation+=_paths[i]->_allocation; + if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now, _ackSendInterval)) { + totalAllocation += _paths[i]->_allocation; } } unsigned char minimumAllocationValue = 0.33 * ((float)totalAllocation / (float)_numBondedPaths); @@ -1099,13 +1152,13 @@ void Bond::processBalanceTasks(const int64_t now) */ if ((now - _lastFlowExpirationCheck) > ZT_MULTIPATH_FLOW_CHECK_INTERVAL) { Mutex::Lock _l(_flows_m); - forgetFlowsWhenNecessary(ZT_MULTIPATH_FLOW_EXPIRATION_INTERVAL,false,now); + forgetFlowsWhenNecessary(ZT_MULTIPATH_FLOW_EXPIRATION_INTERVAL, false, now); _lastFlowExpirationCheck = now; } if ((now - _lastFlowStatReset) > ZT_FLOW_STATS_RESET_INTERVAL) { Mutex::Lock _l(_flows_m); _lastFlowStatReset = now; - std::map >::iterator it = _flows.begin(); + std::map >::iterator it = _flows.begin(); while (it != _flows.end()) { it->second->resetByteCounts(); ++it; @@ -1116,19 +1169,26 @@ void Bond::processBalanceTasks(const int64_t now) */ if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { Mutex::Lock _l(_flows_m); - for (int i=0;ieligible(now,_ackSendInterval) && _paths[i]->_shouldReallocateFlows) { - char traceMsg[256]; char pathStr[128]; _paths[i]->address().toString(pathStr); - sprintf(traceMsg, "%s (balance-*) Reallocating flows to peer %llx from dead link %s/%s to surviving links", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), getLink(_paths[i])->ifname().c_str(), pathStr); + if (! _paths[i]->eligible(now, _ackSendInterval) && _paths[i]->_shouldReallocateFlows) { + char traceMsg[256]; + char pathStr[128]; + _paths[i]->address().toString(pathStr); + sprintf( + traceMsg, + "%s (balance-*) Reallocating flows to peer %llx from dead link %s/%s to surviving links", + OSUtils::humanReadableTimestamp().c_str(), + (unsigned long long)(_peer->_id.address().toInt()), + getLink(_paths[i])->ifname().c_str(), + pathStr); RR->t->bondStateMessage(NULL, traceMsg); - std::map >::iterator flow_it = _flows.begin(); + std::map >::iterator flow_it = _flows.begin(); while (flow_it != _flows.end()) { if (flow_it->second->assignedPath() == _paths[i]) { - if(assignFlowToBondedPath(flow_it->second, now)) { + if (assignFlowToBondedPath(flow_it->second, now)) { _paths[i]->_assignedFlowCount--; } } @@ -1144,20 +1204,27 @@ void Bond::processBalanceTasks(const int64_t now) */ if (_bondingPolicy == ZT_BONDING_POLICY_BALANCE_XOR || _bondingPolicy == ZT_BONDING_POLICY_BALANCE_AWARE) { Mutex::Lock _l(_flows_m); - for (int i=0;ibonded() && _paths[i]->eligible(now,_ackSendInterval) && (_paths[i]->_allocation < minimumAllocationValue) && _paths[i]->_assignedFlowCount) { + if (_paths[i] && _paths[i]->bonded() && _paths[i]->eligible(now, _ackSendInterval) && (_paths[i]->_allocation < minimumAllocationValue) && _paths[i]->_assignedFlowCount) { _paths[i]->address().toString(curPathStr); - char traceMsg[256]; char pathStr[128]; _paths[i]->address().toString(pathStr); - sprintf(traceMsg, "%s (balance-*) Reallocating flows to peer %llx from under-performing link %s/%s\n", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), getLink(_paths[i])->ifname().c_str(), pathStr); + char traceMsg[256]; + char pathStr[128]; + _paths[i]->address().toString(pathStr); + sprintf( + traceMsg, + "%s (balance-*) Reallocating flows to peer %llx from under-performing link %s/%s\n", + OSUtils::humanReadableTimestamp().c_str(), + (unsigned long long)(_peer->_id.address().toInt()), + getLink(_paths[i])->ifname().c_str(), + pathStr); RR->t->bondStateMessage(NULL, traceMsg); - std::map >::iterator flow_it = _flows.begin(); + std::map >::iterator flow_it = _flows.begin(); while (flow_it != _flows.end()) { if (flow_it->second->assignedPath() == _paths[i]) { - if(assignFlowToBondedPath(flow_it->second, now)) { + if (assignFlowToBondedPath(flow_it->second, now)) { _paths[i]->_assignedFlowCount--; } } @@ -1197,13 +1264,12 @@ void Bond::processBalanceTasks(const int64_t now) * Return flows to the original path if it has once again become available */ if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) { - std::map >::iterator flow_it = _flows.begin(); + std::map >::iterator flow_it = _flows.begin(); while (flow_it != _flows.end()) { - if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) - && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) { - //fprintf(stderr, "moving flow back onto its previous path assignment (based on eligibility)\n"); + if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) { + // fprintf(stderr, "moving flow back onto its previous path assignment (based on eligibility)\n"); (flow_it->second->_assignedPath->_assignedFlowCount)--; - flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath,now); + flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath, now); (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++; } ++flow_it; @@ -1214,13 +1280,12 @@ void Bond::processBalanceTasks(const int64_t now) * Return flows to the original path if it has once again become (performant) */ if ((now - _lastFlowRebalance) > ZT_FLOW_REBALANCE_INTERVAL) { - std::map >::iterator flow_it = _flows.begin(); + std::map >::iterator flow_it = _flows.begin(); while (flow_it != _flows.end()) { - if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) - && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) { - //fprintf(stderr, "moving flow back onto its previous path assignment (based on performance)\n"); + if (flow_it->second->_previouslyAssignedPath && flow_it->second->_previouslyAssignedPath->eligible(now, _ackSendInterval) && (flow_it->second->_previouslyAssignedPath->_allocation >= (minimumAllocationValue * 2))) { + // fprintf(stderr, "moving flow back onto its previous path assignment (based on performance)\n"); (flow_it->second->_assignedPath->_assignedFlowCount)--; - flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath,now); + flow_it->second->assignPath(flow_it->second->_previouslyAssignedPath, now); (flow_it->second->_previouslyAssignedPath->_assignedFlowCount)++; } ++flow_it; @@ -1229,7 +1294,7 @@ void Bond::processBalanceTasks(const int64_t now) } } } - else if (!_allowFlowHashing) { + else if (! _allowFlowHashing) { // Nothing } } @@ -1243,7 +1308,7 @@ void Bond::dequeueNextActiveBackupPath(const uint64_t now) _abPath = _abFailoverQueue.front(); _abFailoverQueue.pop_front(); _lastActiveBackupPathChange = now; - for (int i=0; iresetPacketCounts(); } @@ -1260,9 +1325,11 @@ bool Bond::abForciblyRotateLink() _abPath->address().toString(prevPathStr); dequeueNextActiveBackupPath(RR->node->now()); _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Forcibly rotating peer %llx link from %s/%s to %s/%s", + sprintf( + traceMsg, + "%s (active-backup) Forcibly rotating peer %llx link from %s/%s to %s/%s", OSUtils::humanReadableTimestamp().c_str(), - _peer->_id.address().toInt(), + (unsigned long long)(_peer->_id.address().toInt()), getLink(prevPath)->ifname().c_str(), prevPathStr, getLink(_abPath)->ifname().c_str(), @@ -1273,7 +1340,7 @@ bool Bond::abForciblyRotateLink() return false; } -void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) +void Bond::processActiveBackupTasks(void* tPtr, const int64_t now) { char traceMsg[256]; char pathStr[128]; @@ -1285,30 +1352,35 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) bool bFoundPrimaryLink = false; /** - * Generate periodic statuc report + * Generate periodic status report */ if ((now - _lastBondStatusLog) > ZT_MULTIPATH_BOND_STATUS_INTERVAL) { _lastBondStatusLog = now; if (_abPath) { _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Active link to peer %llx is %s/%s, failover queue size is %zu", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), getLink(_abPath)->ifname().c_str(), curPathStr, _abFailoverQueue.size()); + sprintf( + traceMsg, + "%s (active-backup) Active link to peer %llx is %s/%s, failover queue size is %zu", + OSUtils::humanReadableTimestamp().c_str(), + (unsigned long long)(_peer->_id.address().toInt()), + getLink(_abPath)->ifname().c_str(), + curPathStr, + _abFailoverQueue.size()); RR->t->bondStateMessage(NULL, traceMsg); - } else { - sprintf(traceMsg, "%s (active-backup) No active link to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); + } + else { + sprintf(traceMsg, "%s (active-backup) No active link to peer %llx", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } if (_abFailoverQueue.empty()) { - sprintf(traceMsg, "%s (active-backup) Failover queue is empty, bond to peer %llx is NOT currently fault-tolerant", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); - RR->t->bondStateMessage(NULL, traceMsg); + sprintf(traceMsg, "%s (active-backup) Failover queue is empty, bond to peer %llx is NOT currently fault-tolerant", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); + RR->t->bondStateMessage(NULL, traceMsg); } } /** * Select initial "active" active-backup link */ - if (!_abPath) { + if (! _abPath) { /** * [Automatic mode] * The user has not explicitly specified links or their failover schedule, @@ -1317,16 +1389,15 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) * policy will assign it as the new active backup path. If the path fails it will * simply find the next eligible path. */ - if (!userHasSpecifiedLinks()) { - sprintf(traceMsg, "%s (active-backup) No links to peer %llx specified. Searching...", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); - for (int i=0; ieligible(now,_ackSendInterval)) { + if (! userHasSpecifiedLinks()) { + sprintf(traceMsg, "%s (active-backup) No links to peer %llx specified. Searching...", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); + RR->t->bondStateMessage(NULL, traceMsg); + for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (_paths[i] && _paths[i]->eligible(now, _ackSendInterval)) { _paths[i]->address().toString(curPathStr); - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); if (link) { - sprintf(traceMsg, "%s (active-backup) Found eligible link %s/%s to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); + sprintf(traceMsg, "%s (active-backup) Found eligible link %s/%s to peer %llx", OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } _abPath = _paths[i]; @@ -1340,14 +1411,14 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) */ else if (userHasSpecifiedLinks()) { if (userHasSpecifiedPrimaryLink()) { - //sprintf(traceMsg, "%s (active-backup) Checking local.conf for user-specified primary link\n", OSUtils::humanReadableTimestamp().c_str()); - for (int i=0; i link =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); - if (_paths[i]->eligible(now,_ackSendInterval) && link->primary()) { - if (!_paths[i]->preferred()) { + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + if (_paths[i]->eligible(now, _ackSendInterval) && link->primary()) { + if (! _paths[i]->preferred()) { _paths[i]->address().toString(curPathStr); // Found path on primary link, take note in case we don't find a preferred path nonPreferredPath = _paths[i]; @@ -1358,41 +1429,37 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) _abPath->address().toString(curPathStr); SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); bFoundPrimaryLink = true; - break; // Found preferred path %s on primary link + break; // Found preferred path %s on primary link } } } if (_abPath) { _abPath->address().toString(curPathStr); - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket()); + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket()); if (link) { - sprintf(traceMsg, "%s (active-backup) Found preferred primary link %s/%s to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); + sprintf(traceMsg, "%s (active-backup) Found preferred primary link %s/%s to peer %llx", OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } } else { if (bFoundPrimaryLink && nonPreferredPath) { - sprintf(traceMsg, "%s (active-backup) Found non-preferred primary link to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); + sprintf(traceMsg, "%s (active-backup) Found non-preferred primary link to peer %llx", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); _abPath = nonPreferredPath; } } - if (!_abPath) { - sprintf(traceMsg, "%s (active-backup) Designated primary link to peer %llx is not yet ready", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); + if (! _abPath) { + sprintf(traceMsg, "%s (active-backup) Designated primary link to peer %llx is not yet ready", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); - // TODO: Should wait for some time (failover interval?) and then swtich to spare link + // TODO: Should wait for some time (failover interval?) and then switch to spare link } } - else if (!userHasSpecifiedPrimaryLink()) { + else if (! userHasSpecifiedPrimaryLink()) { int _abIdx = ZT_MAX_PEER_NETWORK_PATHS; - sprintf(traceMsg, "%s (active-backup) User did not specify a primary link to peer %llx, selecting first available link", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); + sprintf(traceMsg, "%s (active-backup) User did not specify a primary link to peer %llx, selecting first available link", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); - for (int i=0; ieligible(now,_ackSendInterval)) { + for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (_paths[i] && _paths[i]->eligible(now, _ackSendInterval)) { _abIdx = i; break; } @@ -1402,11 +1469,10 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) } else { _abPath = _paths[_abIdx]; - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket()); + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _abPath->localSocket()); if (link) { _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Selected non-primary link %s/%s to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); + sprintf(traceMsg, "%s (active-backup) Selected non-primary link %s/%s to peer %llx", OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } } @@ -1419,17 +1485,24 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) if (_abPath) { // Don't worry about the failover queue until we have an active link // Remove ineligible paths from the failover link queue - for (std::list >::iterator it(_abFailoverQueue.begin()); it!=_abFailoverQueue.end();) { - if ((*it) && !(*it)->eligible(now,_ackSendInterval)) { + for (std::list >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end();) { + if ((*it) && ! (*it)->eligible(now, _ackSendInterval)) { (*it)->address().toString(curPathStr); - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, (*it)->localSocket()); + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, (*it)->localSocket()); it = _abFailoverQueue.erase(it); if (link) { - sprintf(traceMsg, "%s (active-backup) Link %s/%s to peer %llx is now ineligible, removing from failover queue, there are %zu links in the queue", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt(), _abFailoverQueue.size()); + sprintf( + traceMsg, + "%s (active-backup) Link %s/%s to peer %llx is now ineligible, removing from failover queue, there are %zu links in the queue", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_abPath)->ifname().c_str(), + curPathStr, + _peer->_id.address().toInt(), + _abFailoverQueue.size()); RR->t->bondStateMessage(NULL, traceMsg); } - } else { + } + else { ++it; } } @@ -1441,17 +1514,17 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) /** * Clear failover scores */ - for (int i=0; i_failoverScore = 0; } } // Follow user-specified failover instructions - for (int i=0; iallowed() || !_paths[i]->eligible(now,_ackSendInterval)) { + for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (! _paths[i] || ! _paths[i]->allowed() || ! _paths[i]->eligible(now, _ackSendInterval)) { continue; } - SharedPtr link =RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); + SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[i]->localSocket()); _paths[i]->address().toString(pathStr); int failoverScoreHandicap = _paths[i]->_failoverScore; @@ -1462,7 +1535,7 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) // If using "optimize" primary reselect mode, ignore user link designations failoverScoreHandicap += ZT_MULTIPATH_FAILOVER_HANDICAP_PRIMARY; } - if (!_paths[i]->_failoverScore) { + if (! _paths[i]->_failoverScore) { // If we didn't inherit a failover score from a "parent" that wants to use this path as a failover int newHandicap = failoverScoreHandicap ? failoverScoreHandicap : _paths[i]->_allocation; _paths[i]->_failoverScore = newHandicap; @@ -1472,12 +1545,12 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) failoverLink = RR->bc->getLinkByName(_policyAlias, link->failoverToLink()); } if (failoverLink) { - for (int j=0; jaddress().toString(pathStr); int inheritedHandicap = failoverScoreHandicap - 10; int newHandicap = _paths[j]->_failoverScore > inheritedHandicap ? _paths[j]->_failoverScore : inheritedHandicap; - if (!_paths[j]->preferred()) { + if (! _paths[j]->preferred()) { newHandicap--; } _paths[j]->_failoverScore = newHandicap; @@ -1486,15 +1559,22 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) } if (_paths[i].ptr() != _abPath.ptr()) { bool bFoundPathInQueue = false; - for (std::list >::iterator it(_abFailoverQueue.begin()); it!=_abFailoverQueue.end();++it) { + for (std::list >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end(); ++it) { if (_paths[i].ptr() == (*it).ptr()) { bFoundPathInQueue = true; } } - if (!bFoundPathInQueue) { + if (! bFoundPathInQueue) { _abFailoverQueue.push_front(_paths[i]); - _paths[i]->address().toString(curPathStr); sprintf(traceMsg, "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt(), _abFailoverQueue.size()); + _paths[i]->address().toString(curPathStr); + sprintf( + traceMsg, + "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_abPath)->ifname().c_str(), + curPathStr, + _peer->_id.address().toInt(), + _abFailoverQueue.size()); RR->t->bondStateMessage(NULL, traceMsg); } } @@ -1504,11 +1584,9 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) * No failover instructions provided by user, build queue according to performance * and IPv preference. */ - else if (!userHasSpecifiedFailoverInstructions()) { - for (int i=0; iallowed() - || !_paths[i]->eligible(now,_ackSendInterval)) { + else if (! userHasSpecifiedFailoverInstructions()) { + for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (! _paths[i] || ! _paths[i]->allowed() || ! _paths[i]->eligible(now, _ackSendInterval)) { continue; } int failoverScoreHandicap = 0; @@ -1516,7 +1594,7 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_PREFERRED; } bool includeRefractoryPeriod = true; - if (!_paths[i]->eligible(now,includeRefractoryPeriod)) { + if (! _paths[i]->eligible(now, includeRefractoryPeriod)) { failoverScoreHandicap = -10000; } if (getLink(_paths[i])->primary() && _abLinkSelectMethod != ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE) { @@ -1526,22 +1604,29 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) if (_paths[i].ptr() == negotiatedPath.ptr()) { _paths[i]->_negotiated = true; failoverScoreHandicap = ZT_MULTIPATH_FAILOVER_HANDICAP_NEGOTIATED; - } else { + } + else { _paths[i]->_negotiated = false; } _paths[i]->_failoverScore = _paths[i]->_allocation + failoverScoreHandicap; if (_paths[i].ptr() != _abPath.ptr()) { bool bFoundPathInQueue = false; - for (std::list >::iterator it(_abFailoverQueue.begin()); it!=_abFailoverQueue.end();++it) { + for (std::list >::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end(); ++it) { if (_paths[i].ptr() == (*it).ptr()) { bFoundPathInQueue = true; } } - if (!bFoundPathInQueue) { + if (! bFoundPathInQueue) { _abFailoverQueue.push_front(_paths[i]); _paths[i]->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue", - OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), curPathStr, _peer->_id.address().toInt(), _abFailoverQueue.size()); + sprintf( + traceMsg, + "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_paths[i])->ifname().c_str(), + curPathStr, + _peer->_id.address().toInt(), + _abFailoverQueue.size()); RR->t->bondStateMessage(NULL, traceMsg); } } @@ -1558,20 +1643,25 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) /** * Fulfill primary reselect obligations */ - if (_abPath && !_abPath->eligible(now,_ackSendInterval)) { // Implicit ZT_MULTIPATH_RESELECTION_POLICY_FAILURE + if (_abPath && ! _abPath->eligible(now, _ackSendInterval)) { // Implicit ZT_MULTIPATH_RESELECTION_POLICY_FAILURE _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Link %s/%s to peer %llx has failed. Selecting new link from failover queue, there are %zu links in the queue", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt(), _abFailoverQueue.size()); + sprintf( + traceMsg, + "%s (active-backup) Link %s/%s to peer %llx has failed. Selecting new link from failover queue, there are %zu links in the queue", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_abPath)->ifname().c_str(), + curPathStr, + _peer->_id.address().toInt(), + _abFailoverQueue.size()); RR->t->bondStateMessage(NULL, traceMsg); - if (!_abFailoverQueue.empty()) { + if (! _abFailoverQueue.empty()) { dequeueNextActiveBackupPath(now); _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Active link to peer %llx has been switched to %s/%s", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), getLink(_abPath)->ifname().c_str(), curPathStr); + sprintf(traceMsg, "%s (active-backup) Active link to peer %llx has been switched to %s/%s", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt(), getLink(_abPath)->ifname().c_str(), curPathStr); RR->t->bondStateMessage(NULL, traceMsg); - } else { - sprintf(traceMsg, "%s (active-backup) Failover queue is empty. No links to peer %llx to choose from", - OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); + } + else { + sprintf(traceMsg, "%s (active-backup) Failover queue is empty. No links to peer %llx to choose from", OSUtils::humanReadableTimestamp().c_str(), _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } } @@ -1582,29 +1672,37 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) _lastActiveBackupPathChange = now; } if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_ALWAYS) { - if (_abPath && !getLink(_abPath)->primary() - && getLink(_abFailoverQueue.front())->primary()) { + if (_abPath && ! getLink(_abPath)->primary() && getLink(_abFailoverQueue.front())->primary()) { dequeueNextActiveBackupPath(now); _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Switching back to available primary link %s/%s to peer %llx [linkSelectionMethod = always]", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); + sprintf( + traceMsg, + "%s (active-backup) Switching back to available primary link %s/%s to peer %llx [linkSelectionMethod = always]", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_abPath)->ifname().c_str(), + curPathStr, + _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } } if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_BETTER) { - if (_abPath && !getLink(_abPath)->primary()) { + if (_abPath && ! getLink(_abPath)->primary()) { // Active backup has switched to "better" primary link according to re-select policy. - if (getLink(_abFailoverQueue.front())->primary() - && (_abFailoverQueue.front()->_failoverScore > _abPath->_failoverScore)) { + if (getLink(_abFailoverQueue.front())->primary() && (_abFailoverQueue.front()->_failoverScore > _abPath->_failoverScore)) { dequeueNextActiveBackupPath(now); _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Switching back to user-defined primary link %s/%s to peer %llx [linkSelectionMethod = better]", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); + sprintf( + traceMsg, + "%s (active-backup) Switching back to user-defined primary link %s/%s to peer %llx [linkSelectionMethod = better]", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_abPath)->ifname().c_str(), + curPathStr, + _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } } } - if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE && !_abFailoverQueue.empty()) { + if (_abLinkSelectMethod == ZT_MULTIPATH_RESELECTION_POLICY_OPTIMIZE && ! _abFailoverQueue.empty()) { /** * Implement link negotiation that was previously-decided */ @@ -1613,14 +1711,19 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) _abPath->address().toString(prevPathStr); _lastPathNegotiationCheck = now; _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Switching negotiated link %s/%s to peer %llx [linkSelectionMethod = optimize]", - OSUtils::humanReadableTimestamp().c_str(), getLink(_abPath)->ifname().c_str(), curPathStr, _peer->_id.address().toInt()); + sprintf( + traceMsg, + "%s (active-backup) Switching negotiated link %s/%s to peer %llx [linkSelectionMethod = optimize]", + OSUtils::humanReadableTimestamp().c_str(), + getLink(_abPath)->ifname().c_str(), + curPathStr, + _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } else { // Try to find a better path and automatically switch to it -- not too often, though. if ((now - _lastActiveBackupPathChange) > ZT_MULTIPATH_MIN_ACTIVE_BACKUP_AUTOFLOP_INTERVAL) { - if (!_abFailoverQueue.empty()) { + if (! _abFailoverQueue.empty()) { int newFScore = _abFailoverQueue.front()->_failoverScore; int prevFScore = _abPath->_failoverScore; // Establish a minimum switch threshold to prevent flapping @@ -1631,8 +1734,17 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) _abPath->address().toString(prevPathStr); dequeueNextActiveBackupPath(now); _abPath->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Switching from %s/%s (fscore=%d) to better link %s/%s (fscore=%d) for peer %llx [linkSelectionMethod = optimize]", - OSUtils::humanReadableTimestamp().c_str(), getLink(oldPath)->ifname().c_str(), prevPathStr, prevFScore, getLink(_abPath)->ifname().c_str(), curPathStr, newFScore, _peer->_id.address().toInt()); + sprintf( + traceMsg, + "%s (active-backup) Switching from %s/%s (fscore=%d) to better link %s/%s (fscore=%d) for peer %llx [linkSelectionMethod = optimize]", + OSUtils::humanReadableTimestamp().c_str(), + getLink(oldPath)->ifname().c_str(), + prevPathStr, + prevFScore, + getLink(_abPath)->ifname().c_str(), + curPathStr, + newFScore, + _peer->_id.address().toInt()); RR->t->bondStateMessage(NULL, traceMsg); } } @@ -1648,11 +1760,12 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool if (policy <= ZT_BONDING_POLICY_NONE || policy > ZT_BONDING_POLICY_BALANCE_AWARE) { // If no default set, use NONE (effectively disabling this bond) if (_defaultBondingPolicy < ZT_BONDING_POLICY_NONE || _defaultBondingPolicy > ZT_BONDING_POLICY_BALANCE_AWARE) { - _bondingPolicy= ZT_BONDING_POLICY_NONE; + _bondingPolicy = ZT_BONDING_POLICY_NONE; } - _bondingPolicy= _defaultBondingPolicy; - } else { - _bondingPolicy= policy; + _bondingPolicy = _defaultBondingPolicy; + } + else { + _bondingPolicy = policy; } _freeRandomByte = 0; @@ -1666,18 +1779,18 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool _downDelay = 0; _upDelay = 0; - _allowFlowHashing=false; - _bondMonitorInterval=0; - _shouldCollectPathStatistics=false; + _allowFlowHashing = false; + _bondMonitorInterval = 0; + _shouldCollectPathStatistics = false; // Path negotiation - _allowPathNegotiation=false; - _pathNegotiationCutoffCount=0; - _localUtility=0; + _allowPathNegotiation = false; + _pathNegotiationCutoffCount = 0; + _localUtility = 0; - _numBondedPaths=0; - _rrPacketsSentOnCurrLink=0; - _rrIdx=0; + _numBondedPaths = 0; + _rrPacketsSentOnCurrLink = 0; + _rrIdx = 0; _totalBondUnderload = 0; @@ -1685,7 +1798,7 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool _maxAcceptablePacketDelayVariance = 50; _maxAcceptablePacketLossRatio = 0.10; _maxAcceptablePacketErrorRatio = 0.10; - _userHasSpecifiedLinkSpeeds=0; + _userHasSpecifiedLinkSpeeds = 0; /* ZT_MULTIPATH_FLOW_REBALANCE_STRATEGY_PASSIVE is the most conservative strategy and is least likely to cause unexpected behavior */ @@ -1783,9 +1896,8 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool _failoverInterval = templateBond->_failoverInterval >= 250 ? templateBond->_failoverInterval : _failoverInterval; _downDelay = templateBond->_downDelay; _upDelay = templateBond->_upDelay; - if (templateBond->_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_PASSIVE - && templateBond->_failoverInterval != 0) { - //fprintf(stderr, "warning: passive path monitoring was specified, this will prevent failovers from happening in a timely manner.\n"); + if (templateBond->_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_PASSIVE && templateBond->_failoverInterval != 0) { + // fprintf(stderr, "warning: passive path monitoring was specified, this will prevent failovers from happening in a timely manner.\n"); } _abLinkSelectMethod = templateBond->_abLinkSelectMethod; memcpy(_qualityWeights, templateBond->_qualityWeights, ZT_QOS_WEIGHT_SIZE * sizeof(float)); @@ -1807,7 +1919,7 @@ void Bond::setUserQualityWeights(float weights[], int len) { if (len == ZT_QOS_WEIGHT_SIZE) { float weightTotal = 0.0; - for (unsigned int i=0; i 0.99 && weightTotal < 1.01) { @@ -1816,7 +1928,8 @@ void Bond::setUserQualityWeights(float weights[], int len) } } -bool Bond::relevant() { +bool Bond::relevant() +{ return false; } @@ -1830,4 +1943,4 @@ void Bond::dumpInfo(const int64_t now) // Omitted } -} // namespace ZeroTier +} // namespace ZeroTier diff --git a/node/Bond.hpp b/node/Bond.hpp index 50f217d2f..3920b5804 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -14,37 +14,36 @@ #ifndef ZT_BOND_HPP #define ZT_BOND_HPP -#include - +#include "Flow.hpp" #include "Path.hpp" #include "Peer.hpp" -#include "../osdep/Link.hpp" -#include "Flow.hpp" +#include "Packet.hpp" + +#include +#include namespace ZeroTier { class RuntimeEnvironment; class Link; +class Peer; -class Bond -{ +class Bond { friend class SharedPtr; friend class Peer; friend class BondController; - struct PathQualityComparator - { - bool operator ()(const SharedPtr & a, const SharedPtr & b) + struct PathQualityComparator { + bool operator()(const SharedPtr& a, const SharedPtr& b) { - if(a->_failoverScore == b->_failoverScore) { + if (a->_failoverScore == b->_failoverScore) { return a < b; } return a->_failoverScore > b->_failoverScore; } }; -public: - + public: // TODO: Remove bool _header; int64_t _lastLogTS; @@ -61,7 +60,7 @@ public: * @param policy Bonding policy * @param peer */ - Bond(const RuntimeEnvironment *renv, int policy, const SharedPtr& peer); + Bond(const RuntimeEnvironment* renv, int policy, const SharedPtr& peer); /** * Constructor. For use when user intends to manually specify parameters @@ -70,7 +69,7 @@ public: * @param policyAlias * @param peer */ - Bond(const RuntimeEnvironment *renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr& peer); + Bond(const RuntimeEnvironment* renv, std::string& basePolicy, std::string& policyAlias, const SharedPtr& peer); /** * Constructor. Creates a bond based off of a user-defined bond template @@ -79,12 +78,15 @@ public: * @param original * @param peer */ - Bond(const RuntimeEnvironment *renv, SharedPtr originalBond, const SharedPtr& peer); + Bond(const RuntimeEnvironment* renv, SharedPtr originalBond, const SharedPtr& peer); /** * @return The human-readable name of the bonding policy */ - std::string policyAlias() { return _policyAlias; } + std::string policyAlias() + { + return _policyAlias; + } /** * Inform the bond about the path that its peer (owning object) just learned about. @@ -135,8 +137,7 @@ public: * @param flowId Flow ID * @param now Current time */ - void recordOutgoingPacket(const SharedPtr &path, uint64_t packetId, - uint16_t payloadLength, Packet::Verb verb, int32_t flowId, int64_t now); + void recordOutgoingPacket(const SharedPtr& path, uint64_t packetId, uint16_t payloadLength, Packet::Verb verb, int32_t flowId, int64_t now); /** * Process the contents of an inbound VERB_QOS_MEASUREMENT to gather path quality observations. @@ -146,7 +147,7 @@ public: * @param rx_id table of packet IDs * @param rx_ts table of holding times */ - void receivedQoS(const SharedPtr& path, int64_t now, int count, uint64_t *rx_id, uint16_t *rx_ts); + void receivedQoS(const SharedPtr& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts); /** * Process the contents of an inbound VERB_ACK to gather path quality observations. @@ -164,7 +165,7 @@ public: * @param qosBuffer destination buffer * @return Size of payload */ - int32_t generateQoSPacket(const SharedPtr& path, int64_t now, char *qosBuffer); + int32_t generateQoSPacket(const SharedPtr& path, int64_t now, char* qosBuffer); /** * Record statistics for an inbound packet. @@ -176,8 +177,7 @@ public: * @param flowId Flow ID * @param now Current time */ - void recordIncomingPacket(const SharedPtr& path, uint64_t packetId, uint16_t payloadLength, - Packet::Verb verb, int32_t flowId, int64_t now); + void recordIncomingPacket(const SharedPtr& path, uint64_t packetId, uint16_t payloadLength, Packet::Verb verb, int32_t flowId, int64_t now); /** * Determines the most appropriate path for packet and flow egress. This decision is made by @@ -198,7 +198,7 @@ public: * @param now Current time * @return Pointer to newly-created Flow */ - SharedPtr createFlow(const SharedPtr &path, int32_t flowId, unsigned char entropy, int64_t now); + SharedPtr createFlow(const SharedPtr& path, int32_t flowId, unsigned char entropy, int64_t now); /** * Removes flow records that are past a certain age limit. @@ -215,7 +215,7 @@ public: * @param flow Flow to be assigned * @param now Current time */ - bool assignFlowToBondedPath(SharedPtr &flow, int64_t now); + bool assignFlowToBondedPath(SharedPtr& flow, int64_t now); /** * Determine whether a path change should occur given the remote peer's reported utility and our @@ -226,7 +226,7 @@ public: * @param path Path over which the negotiation request was received * @param remoteUtility How much utility the remote peer claims to gain by using the declared path */ - void processIncomingPathNegotiationRequest(uint64_t now, SharedPtr &path, int16_t remoteUtility); + void processIncomingPathNegotiationRequest(uint64_t now, SharedPtr& path, int16_t remoteUtility); /** * Determine state of path synchronization and whether a negotiation request @@ -235,7 +235,7 @@ public: * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call * @param now Current time */ - void pathNegotiationCheck(void *tPtr, const int64_t now); + void pathNegotiationCheck(void* tPtr, const int64_t now); /** * Sends a VERB_ACK to the remote peer. @@ -246,8 +246,7 @@ public: * @param atAddress * @param now Current time */ - void sendACK(void *tPtr, const SharedPtr &path,int64_t localSocket, - const InetAddress &atAddress,int64_t now); + void sendACK(void* tPtr, const SharedPtr& path, int64_t localSocket, const InetAddress& atAddress, int64_t now); /** * Sends a VERB_QOS_MEASUREMENT to the remote peer. @@ -258,8 +257,7 @@ public: * @param atAddress * @param now Current time */ - void sendQOS_MEASUREMENT(void *tPtr,const SharedPtr &path,int64_t localSocket, - const InetAddress &atAddress,int64_t now); + void sendQOS_MEASUREMENT(void* tPtr, const SharedPtr& path, int64_t localSocket, const InetAddress& atAddress, int64_t now); /** * Sends a VERB_PATH_NEGOTIATION_REQUEST to the remote peer. @@ -267,7 +265,7 @@ public: * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call * @param path Path over which packet should be sent */ - void sendPATH_NEGOTIATION_REQUEST(void *tPtr, const SharedPtr &path); + void sendPATH_NEGOTIATION_REQUEST(void* tPtr, const SharedPtr& path); /** * @@ -281,7 +279,7 @@ public: * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call * @param now Current time */ - void processActiveBackupTasks(void *tPtr, int64_t now); + void processActiveBackupTasks(void* tPtr, int64_t now); /** * Switches the active link in an active-backup scenario to the next best during @@ -311,64 +309,82 @@ public: /** * @param latencyInMilliseconds Maximum acceptable latency. */ - void setMaxAcceptableLatency(int16_t latencyInMilliseconds) { + void setMaxAcceptableLatency(int16_t latencyInMilliseconds) + { _maxAcceptableLatency = latencyInMilliseconds; } /** * @param latencyInMilliseconds Maximum acceptable (mean) latency. */ - void setMaxAcceptableMeanLatency(int16_t latencyInMilliseconds) { + void setMaxAcceptableMeanLatency(int16_t latencyInMilliseconds) + { _maxAcceptableMeanLatency = latencyInMilliseconds; } /** * @param latencyVarianceInMilliseconds Maximum acceptable packet delay variance (jitter). */ - void setMaxAcceptablePacketDelayVariance(int16_t latencyVarianceInMilliseconds) { + void setMaxAcceptablePacketDelayVariance(int16_t latencyVarianceInMilliseconds) + { _maxAcceptablePacketDelayVariance = latencyVarianceInMilliseconds; } /** * @param lossRatio Maximum acceptable packet loss ratio (PLR). */ - void setMaxAcceptablePacketLossRatio(float lossRatio) { + void setMaxAcceptablePacketLossRatio(float lossRatio) + { _maxAcceptablePacketLossRatio = lossRatio; } /** * @param errorRatio Maximum acceptable packet error ratio (PER). */ - void setMaxAcceptablePacketErrorRatio(float errorRatio) { + void setMaxAcceptablePacketErrorRatio(float errorRatio) + { _maxAcceptablePacketErrorRatio = errorRatio; } /** * @param errorRatio Maximum acceptable packet error ratio (PER). */ - void setMinAcceptableAllocation(float minAlloc) { + void setMinAcceptableAllocation(float minAlloc) + { _minAcceptableAllocation = minAlloc * 255; } /** * @return Whether the user has defined links for use on this bond */ - inline bool userHasSpecifiedLinks() { return _userHasSpecifiedLinks; } + inline bool userHasSpecifiedLinks() + { + return _userHasSpecifiedLinks; + } /** * @return Whether the user has defined a set of failover link(s) for this bond */ - inline bool userHasSpecifiedFailoverInstructions() { return _userHasSpecifiedFailoverInstructions; }; + inline bool userHasSpecifiedFailoverInstructions() + { + return _userHasSpecifiedFailoverInstructions; + }; /** * @return Whether the user has specified a primary link */ - inline bool userHasSpecifiedPrimaryLink() { return _userHasSpecifiedPrimaryLink; } + inline bool userHasSpecifiedPrimaryLink() + { + return _userHasSpecifiedPrimaryLink; + } /** * @return Whether the user has specified link speeds */ - inline bool userHasSpecifiedLinkSpeeds() { return _userHasSpecifiedLinkSpeeds; } + inline bool userHasSpecifiedLinkSpeeds() + { + return _userHasSpecifiedLinkSpeeds; + } /** * Periodically perform maintenance tasks for each active bond. @@ -376,7 +392,7 @@ public: * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call * @param now Current time */ - void processBackgroundTasks(void *tPtr, int64_t now); + void processBackgroundTasks(void* tPtr, int64_t now); /** * Rate limit gate for VERB_ACK @@ -387,11 +403,12 @@ public: inline bool rateGateACK(const int64_t now) { _ackCutoffCount++; - int numToDrain = _lastAckRateCheck ? (now - _lastAckRateCheck) / ZT_ACK_DRAINAGE_DIVISOR : _ackCutoffCount; + int numToDrain = _lastAckRateCheck ? (now - _lastAckRateCheck) / ZT_ACK_DRAINAGE_DIVISOR : _ackCutoffCount; _lastAckRateCheck = now; if (_ackCutoffCount > numToDrain) { - _ackCutoffCount-=numToDrain; - } else { + _ackCutoffCount -= numToDrain; + } + else { _ackCutoffCount = 0; } return (_ackCutoffCount < ZT_ACK_CUTOFF_LIMIT); @@ -409,8 +426,9 @@ public: int numToDrain = (now - _lastQoSRateCheck) / ZT_QOS_DRAINAGE_DIVISOR; _lastQoSRateCheck = now; if (_qosCutoffCount > numToDrain) { - _qosCutoffCount-=numToDrain; - } else { + _qosCutoffCount -= numToDrain; + } + else { _qosCutoffCount = 0; } return (_qosCutoffCount < ZT_QOS_CUTOFF_LIMIT); @@ -426,7 +444,8 @@ public: { if ((now - _lastPathNegotiationReceived) <= ZT_PATH_NEGOTIATION_CUTOFF_TIME) ++_pathNegotiationCutoffCount; - else _pathNegotiationCutoffCount = 0; + else + _pathNegotiationCutoffCount = 0; _lastPathNegotiationReceived = now; return (_pathNegotiationCutoffCount < ZT_PATH_NEGOTIATION_CUTOFF_LIMIT); } @@ -434,130 +453,206 @@ public: /** * @param interval Maximum amount of time user expects a failover to take on this bond. */ - inline void setFailoverInterval(uint32_t interval) { _failoverInterval = interval; } + inline void setFailoverInterval(uint32_t interval) + { + _failoverInterval = interval; + } /** * @param interval Maximum amount of time user expects a failover to take on this bond. */ - inline uint32_t getFailoverInterval() { return _failoverInterval; } + inline uint32_t getFailoverInterval() + { + return _failoverInterval; + } /** * @param strategy Strategy that the bond uses to re-assign protocol flows. */ - inline void setFlowRebalanceStrategy(uint32_t strategy) { _flowRebalanceStrategy = strategy; } + inline void setFlowRebalanceStrategy(uint32_t strategy) + { + _flowRebalanceStrategy = strategy; + } /** * @param strategy Strategy that the bond uses to prob for path aliveness and quality */ - inline void setLinkMonitorStrategy(uint8_t strategy) { _linkMonitorStrategy = strategy; } + inline void setLinkMonitorStrategy(uint8_t strategy) + { + _linkMonitorStrategy = strategy; + } /** * @param abOverflowEnabled Whether "overflow" mode is enabled for this active-backup bond */ - inline void setOverflowMode(bool abOverflowEnabled) { _abOverflowEnabled = abOverflowEnabled; } + inline void setOverflowMode(bool abOverflowEnabled) + { + _abOverflowEnabled = abOverflowEnabled; + } /** * @return the current up delay parameter */ - inline uint16_t getUpDelay() { return _upDelay; } + inline uint16_t getUpDelay() + { + return _upDelay; + } /** * @param upDelay Length of time before a newly-discovered path is admitted to the bond */ - inline void setUpDelay(int upDelay) { if (upDelay >= 0) { _upDelay = upDelay; } } + inline void setUpDelay(int upDelay) + { + if (upDelay >= 0) { + _upDelay = upDelay; + } + } /** * @return Length of time before a newly-failed path is removed from the bond */ - inline uint16_t getDownDelay() { return _downDelay; } + inline uint16_t getDownDelay() + { + return _downDelay; + } /** * @param downDelay Length of time before a newly-failed path is removed from the bond */ - inline void setDownDelay(int downDelay) { if (downDelay >= 0) { _downDelay = downDelay; } } + inline void setDownDelay(int downDelay) + { + if (downDelay >= 0) { + _downDelay = downDelay; + } + } /** * @return the current monitoring interval for the bond (can be overridden with intervals specific to certain links.) */ - inline uint16_t getBondMonitorInterval() { return _bondMonitorInterval; } + inline uint16_t getBondMonitorInterval() + { + return _bondMonitorInterval; + } /** * Set the current monitoring interval for the bond (can be overridden with intervals specific to certain links.) * * @param monitorInterval How often gratuitous VERB_HELLO(s) are sent to remote peer. */ - inline void setBondMonitorInterval(uint16_t interval) { _bondMonitorInterval = interval; } + inline void setBondMonitorInterval(uint16_t interval) + { + _bondMonitorInterval = interval; + } /** * @param policy Bonding policy for this bond */ - inline void setPolicy(uint8_t policy) { _bondingPolicy = policy; } + inline void setPolicy(uint8_t policy) + { + _bondingPolicy = policy; + } /** * @return the current bonding policy */ - inline uint8_t getPolicy() { return _bondingPolicy; } + inline uint8_t getPolicy() + { + return _bondingPolicy; + } /** * @return the health status of the bond */ - inline bool isHealthy() { return _isHealthy; } + inline bool isHealthy() + { + return _isHealthy; + } /** * @return the number of links comprising this bond which are considered alive */ - inline uint8_t getNumAliveLinks() { return _numAliveLinks; }; + inline uint8_t getNumAliveLinks() + { + return _numAliveLinks; + }; /** * @return the number of links comprising this bond */ - inline uint8_t getNumTotalLinks() { return _numTotalLinks; } + inline uint8_t getNumTotalLinks() + { + return _numTotalLinks; + } /** * * @param allowFlowHashing */ - inline void setFlowHashing(bool allowFlowHashing) { _allowFlowHashing = allowFlowHashing; } + inline void setFlowHashing(bool allowFlowHashing) + { + _allowFlowHashing = allowFlowHashing; + } /** * @return Whether flow-hashing is currently enabled for this bond. */ - bool flowHashingEnabled() { return _allowFlowHashing; } + bool flowHashingEnabled() + { + return _allowFlowHashing; + } /** * * @param packetsPerLink */ - inline void setPacketsPerLink(int packetsPerLink) { _packetsPerLink = packetsPerLink; } + inline void setPacketsPerLink(int packetsPerLink) + { + _packetsPerLink = packetsPerLink; + } /** * @return Number of packets to be sent on each interface in a balance-rr bond */ - inline int getPacketsPerLink() { return _packetsPerLink; } + inline int getPacketsPerLink() + { + return _packetsPerLink; + } /** * * @param linkSelectMethod */ - inline void setLinkSelectMethod(uint8_t method) { _abLinkSelectMethod = method; } + inline void setLinkSelectMethod(uint8_t method) + { + _abLinkSelectMethod = method; + } /** * * @return */ - inline uint8_t getLinkSelectMethod() { return _abLinkSelectMethod; } + inline uint8_t getLinkSelectMethod() + { + return _abLinkSelectMethod; + } /** * * @param allowPathNegotiation */ - inline void setAllowPathNegotiation(bool allowPathNegotiation) { _allowPathNegotiation = allowPathNegotiation; } + inline void setAllowPathNegotiation(bool allowPathNegotiation) + { + _allowPathNegotiation = allowPathNegotiation; + } /** * * @return */ - inline bool allowPathNegotiation() { return _allowPathNegotiation; } + inline bool allowPathNegotiation() + { + return _allowPathNegotiation; + } /** * Forcibly rotates the currently active link used in an active-backup bond to the next link in the failover queue @@ -566,11 +661,13 @@ public: */ bool abForciblyRotateLink(); - SharedPtr getPeer() { return _peer; } + SharedPtr getPeer() + { + return _peer; + } -private: - - const RuntimeEnvironment *RR; + private: + const RuntimeEnvironment* RR; AtomicCounter __refCount; /** @@ -599,23 +696,23 @@ private: /** * Flows hashed according to port and protocol */ - std::map > _flows; + std::map > _flows; - float _qualityWeights[ZT_QOS_WEIGHT_SIZE]; // How much each factor contributes to the "quality" score of a path. + float _qualityWeights[ZT_QOS_WEIGHT_SIZE]; // How much each factor contributes to the "quality" score of a path. uint8_t _bondingPolicy; uint32_t _upDelay; uint32_t _downDelay; // active-backup - SharedPtr _abPath; // current active path + SharedPtr _abPath; // current active path std::list > _abFailoverQueue; - uint8_t _abLinkSelectMethod; // link re-selection policy for the primary link in active-backup + uint8_t _abLinkSelectMethod; // link re-selection policy for the primary link in active-backup bool _abOverflowEnabled; // balance-rr - uint8_t _rrIdx; // index to path currently in use during Round Robin operation - uint16_t _rrPacketsSentOnCurrLink; // number of packets sent on this link since the most recent path switch. + uint8_t _rrIdx; // index to path currently in use during Round Robin operation + uint16_t _rrPacketsSentOnCurrLink; // number of packets sent on this link since the most recent path switch. /** * How many packets will be sent on a path before moving to the next path * in the round-robin sequence. A value of zero will cause a random path @@ -745,6 +842,6 @@ private: bool _allowFlowHashing; }; -} // namespace ZeroTier +} // namespace ZeroTier #endif diff --git a/node/BondController.cpp b/node/BondController.cpp index cdd74d3d1..3b310ac35 100644 --- a/node/BondController.cpp +++ b/node/BondController.cpp @@ -11,25 +11,25 @@ */ /****/ -#include "../osdep/OSUtils.hpp" - -#include "Constants.hpp" #include "BondController.hpp" -#include "Peer.hpp" + +#include "../osdep/OSUtils.hpp" +#include "Bond.hpp" +#include "Node.hpp" +#include "RuntimeEnvironment.hpp" namespace ZeroTier { int BondController::_minReqPathMonitorInterval; uint8_t BondController::_defaultBondingPolicy; -BondController::BondController(const RuntimeEnvironment *renv) : - RR(renv) +BondController::BondController(const RuntimeEnvironment* renv) : RR(renv) { bondStartTime = RR->node->now(); _defaultBondingPolicy = ZT_BONDING_POLICY_NONE; } -bool BondController::linkAllowed(std::string &policyAlias, SharedPtr link) +bool BondController::linkAllowed(std::string& policyAlias, SharedPtr link) { bool foundInDefinitions = false; if (_linkDefinitions.count(policyAlias)) { @@ -52,14 +52,14 @@ void BondController::addCustomLink(std::string& policyAlias, SharedPtr lin auto search = _interfaceToLinkMap[policyAlias].find(link->ifname()); if (search == _interfaceToLinkMap[policyAlias].end()) { link->setAsUserSpecified(true); - _interfaceToLinkMap[policyAlias].insert(std::pair>(link->ifname(), link)); + _interfaceToLinkMap[policyAlias].insert(std::pair >(link->ifname(), link)); } } bool BondController::addCustomPolicy(const SharedPtr& newBond) { Mutex::Lock _l(_bonds_m); - if (!_bondPolicyTemplates.count(newBond->policyAlias())) { + if (! _bondPolicyTemplates.count(newBond->policyAlias())) { _bondPolicyTemplates[newBond->policyAlias()] = newBond; return true; } @@ -69,7 +69,7 @@ bool BondController::addCustomPolicy(const SharedPtr& newBond) bool BondController::assignBondingPolicyToPeer(int64_t identity, const std::string& policyAlias) { Mutex::Lock _l(_bonds_m); - if (!_policyTemplateAssignments.count(identity)) { + if (! _policyTemplateAssignments.count(identity)) { _policyTemplateAssignments[identity] = policyAlias; return true; } @@ -82,37 +82,40 @@ SharedPtr BondController::getBondByPeerId(int64_t identity) return _bonds.count(identity) ? _bonds[identity] : SharedPtr(); } -SharedPtr BondController::createTransportTriggeredBond(const RuntimeEnvironment *renv, const SharedPtr& peer) +SharedPtr BondController::createTransportTriggeredBond(const RuntimeEnvironment* renv, const SharedPtr& peer) { Mutex::Lock _l(_bonds_m); int64_t identity = peer->identity().address().toInt(); - Bond *bond = nullptr; + Bond* bond = nullptr; char traceMsg[128]; - if (!_bonds.count(identity)) { + if (! _bonds.count(identity)) { std::string policyAlias; - if (!_policyTemplateAssignments.count(identity)) { + if (! _policyTemplateAssignments.count(identity)) { if (_defaultBondingPolicy) { - sprintf(traceMsg, "%s (bond) Creating new default %s bond to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), getPolicyStrByCode(_defaultBondingPolicy).c_str(), identity); RR->t->bondStateMessage(NULL, traceMsg); + sprintf(traceMsg, "%s (bond) Creating new default %s bond to peer %llx", OSUtils::humanReadableTimestamp().c_str(), getPolicyStrByCode(_defaultBondingPolicy).c_str(), (unsigned long long)identity); + RR->t->bondStateMessage(NULL, traceMsg); bond = new Bond(renv, _defaultBondingPolicy, peer); } - if (!_defaultBondingPolicy && _defaultBondingPolicyStr.length()) { - sprintf(traceMsg, "%s (bond) Creating new default custom %s bond to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), _defaultBondingPolicyStr.c_str(), identity); + if (! _defaultBondingPolicy && _defaultBondingPolicyStr.length()) { + sprintf(traceMsg, "%s (bond) Creating new default custom %s bond to peer %llx", OSUtils::humanReadableTimestamp().c_str(), _defaultBondingPolicyStr.c_str(), (unsigned long long)identity); RR->t->bondStateMessage(NULL, traceMsg); bond = new Bond(renv, _bondPolicyTemplates[_defaultBondingPolicyStr].ptr(), peer); } } else { - if (!_bondPolicyTemplates[_policyTemplateAssignments[identity]]) { - sprintf(traceMsg, "%s (bond) Creating new bond. Assignment for peer %llx was specified as %s but the bond definition was not found. Using default %s", - OSUtils::humanReadableTimestamp().c_str(), identity, _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultBondingPolicy).c_str()); + if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) { + sprintf( + traceMsg, + "%s (bond) Creating new bond. Assignment for peer %llx was specified as %s but the bond definition was not found. Using default %s", + OSUtils::humanReadableTimestamp().c_str(), + (unsigned long long)identity, + _policyTemplateAssignments[identity].c_str(), + getPolicyStrByCode(_defaultBondingPolicy).c_str()); RR->t->bondStateMessage(NULL, traceMsg); bond = new Bond(renv, _defaultBondingPolicy, peer); } else { - sprintf(traceMsg, "%s (bond) Creating new default bond %s to peer %llx", - OSUtils::humanReadableTimestamp().c_str(), _defaultBondingPolicyStr.c_str(), identity); + sprintf(traceMsg, "%s (bond) Creating new default bond %s to peer %llx", OSUtils::humanReadableTimestamp().c_str(), _defaultBondingPolicyStr.c_str(), (unsigned long long)identity); RR->t->bondStateMessage(NULL, traceMsg); bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer); } @@ -150,12 +153,12 @@ SharedPtr BondController::getLinkBySocket(const std::string& policyAlias, { Mutex::Lock _l(_links_m); char ifname[16]; - _phy->getIfName((PhySocket *) ((uintptr_t)localSocket), ifname, 16); + _phy->getIfName((PhySocket*)((uintptr_t)localSocket), ifname, 16); std::string ifnameStr(ifname); auto search = _interfaceToLinkMap[policyAlias].find(ifnameStr); if (search == _interfaceToLinkMap[policyAlias].end()) { SharedPtr s = new Link(ifnameStr, 0, 0, 0, 0, 0, true, ZT_MULTIPATH_SLAVE_MODE_SPARE, "", 0.0); - _interfaceToLinkMap[policyAlias].insert(std::pair >(ifnameStr, s)); + _interfaceToLinkMap[policyAlias].insert(std::pair >(ifnameStr, s)); return s; } else { @@ -199,14 +202,14 @@ bool BondController::allowedToBind(const std::string& ifname) */ } -void BondController::processBackgroundTasks(void *tPtr, const int64_t now) +void BondController::processBackgroundTasks(void* tPtr, const int64_t now) { Mutex::Lock _l(_bonds_m); - std::map >::iterator bondItr = _bonds.begin(); + std::map >::iterator bondItr = _bonds.begin(); while (bondItr != _bonds.end()) { bondItr->second->processBackgroundTasks(tPtr, now); ++bondItr; } } -} // namespace ZeroTier +} // namespace ZeroTier diff --git a/node/BondController.hpp b/node/BondController.hpp index 640a8f663..1e96a0f40 100644 --- a/node/BondController.hpp +++ b/node/BondController.hpp @@ -14,46 +14,54 @@ #ifndef ZT_BONDCONTROLLER_HPP #define ZT_BONDCONTROLLER_HPP +#include "../osdep/Link.hpp" +#include "../osdep/Phy.hpp" +#include "SharedPtr.hpp" + #include #include -#include "SharedPtr.hpp" -#include "../osdep/Phy.hpp" -#include "../osdep/Link.hpp" - namespace ZeroTier { class RuntimeEnvironment; class Bond; class Peer; +class Mutex; -class BondController -{ +class BondController { friend class Bond; -public: - - BondController(const RuntimeEnvironment *renv); + public: + BondController(const RuntimeEnvironment* renv); /** * @return Whether this link is permitted to become a member of a bond. */ - bool linkAllowed(std::string &policyAlias, SharedPtr link); + bool linkAllowed(std::string& policyAlias, SharedPtr link); /** * @return The minimum interval required to poll the active bonds to fulfill all active monitoring timing requirements. */ - int minReqPathMonitorInterval() { return _minReqPathMonitorInterval; } + int minReqPathMonitorInterval() + { + return _minReqPathMonitorInterval; + } /** * @param minReqPathMonitorInterval The minimum interval required to poll the active bonds to fulfill all active monitoring timing requirements. */ - static void setMinReqPathMonitorInterval(int minReqPathMonitorInterval) { _minReqPathMonitorInterval = minReqPathMonitorInterval; } + static void setMinReqPathMonitorInterval(int minReqPathMonitorInterval) + { + _minReqPathMonitorInterval = minReqPathMonitorInterval; + } /** * @return Whether the bonding layer is currently set up to be used. */ - bool inUse() { return !_bondPolicyTemplates.empty() || _defaultBondingPolicy; } + bool inUse() + { + return ! _bondPolicyTemplates.empty() || _defaultBondingPolicy; + } /** * @param basePolicyName Bonding policy name (See ZeroTierOne.h) @@ -61,12 +69,22 @@ public: */ static int getPolicyCodeByStr(const std::string& basePolicyName) { - if (basePolicyName == "active-backup") { return 1; } - if (basePolicyName == "broadcast") { return 2; } - if (basePolicyName == "balance-rr") { return 3; } - if (basePolicyName == "balance-xor") { return 4; } - if (basePolicyName == "balance-aware") { return 5; } - return 0; // "none" + if (basePolicyName == "active-backup") { + return 1; + } + if (basePolicyName == "broadcast") { + return 2; + } + if (basePolicyName == "balance-rr") { + return 3; + } + if (basePolicyName == "balance-xor") { + return 4; + } + if (basePolicyName == "balance-aware") { + return 5; + } + return 0; // "none" } /** @@ -75,11 +93,21 @@ public: */ static std::string getPolicyStrByCode(int policy) { - if (policy == 1) { return "active-backup"; } - if (policy == 2) { return "broadcast"; } - if (policy == 3) { return "balance-rr"; } - if (policy == 4) { return "balance-xor"; } - if (policy == 5) { return "balance-aware"; } + if (policy == 1) { + return "active-backup"; + } + if (policy == 2) { + return "broadcast"; + } + if (policy == 3) { + return "balance-rr"; + } + if (policy == 4) { + return "balance-xor"; + } + if (policy == 5) { + return "balance-aware"; + } return "none"; } @@ -88,19 +116,28 @@ public: * * @param bp Bonding policy */ - void setBondingLayerDefaultPolicy(uint8_t bp) { _defaultBondingPolicy = bp; } + void setBondingLayerDefaultPolicy(uint8_t bp) + { + _defaultBondingPolicy = bp; + } /** * Sets the default (custom) bonding policy for new or undefined bonds. * * @param alias Human-readable string alias for bonding policy */ - void setBondingLayerDefaultPolicyStr(std::string alias) { _defaultBondingPolicyStr = alias; } + void setBondingLayerDefaultPolicyStr(std::string alias) + { + _defaultBondingPolicyStr = alias; + } /** * @return The default bonding policy */ - static int defaultBondingPolicy() { return _defaultBondingPolicy; } + static int defaultBondingPolicy() + { + return _defaultBondingPolicy; + } /** * Add a user-defined link to a given bonding policy. @@ -142,7 +179,7 @@ public: * @param peer Remote peer that this bond services * @return A pointer to the newly created Bond */ - SharedPtr createTransportTriggeredBond(const RuntimeEnvironment *renv, const SharedPtr& peer); + SharedPtr createTransportTriggeredBond(const RuntimeEnvironment* renv, const SharedPtr& peer); /** * Periodically perform maintenance tasks for the bonding layer. @@ -150,7 +187,7 @@ public: * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call * @param now Current time */ - void processBackgroundTasks(void *tPtr, int64_t now); + void processBackgroundTasks(void* tPtr, int64_t now); /** * Gets a reference to a physical link definition given a policy alias and a local socket. @@ -175,12 +212,14 @@ public: */ bool allowedToBind(const std::string& ifname); - uint64_t getBondStartTime() { return bondStartTime; } + uint64_t getBondStartTime() + { + return bondStartTime; + } -private: - - Phy *_phy; - const RuntimeEnvironment *RR; + private: + Phy* _phy; + const RuntimeEnvironment* RR; Mutex _bonds_m; Mutex _links_m; @@ -208,22 +247,22 @@ private: /** * All currently active bonds. */ - std::map > _bonds; + std::map > _bonds; /** * Map of peers to custom bonding policies */ - std::map _policyTemplateAssignments; + std::map _policyTemplateAssignments; /** * User-defined bonding policies (can be assigned to a peer) */ - std::map > _bondPolicyTemplates; + std::map > _bondPolicyTemplates; /** * Set of links defined for a given bonding policy */ - std::map > > _linkDefinitions; + std::map > > _linkDefinitions; /** * Set of link objects mapped to their physical interfaces @@ -234,6 +273,6 @@ private: uint64_t bondStartTime; }; -} // namespace ZeroTier +} // namespace ZeroTier #endif diff --git a/node/Flow.hpp b/node/Flow.hpp index 77a4b207f..91986ddee 100644 --- a/node/Flow.hpp +++ b/node/Flow.hpp @@ -22,20 +22,14 @@ namespace ZeroTier { /** * A protocol flow that is identified by the origin and destination port. */ -struct Flow -{ +struct Flow { /** * @param flowId Given flow ID * @param now Current time */ - Flow(int32_t flowId, int64_t now) : - _flowId(flowId), - _bytesInPerUnitTime(0), - _bytesOutPerUnitTime(0), - _lastActivity(now), - _lastPathReassignment(0), - _assignedPath(SharedPtr()) - {} + Flow(int32_t flowId, int64_t now) : _flowId(flowId), _bytesInPerUnitTime(0), _bytesOutPerUnitTime(0), _lastActivity(now), _lastPathReassignment(0), _assignedPath(SharedPtr()) + { + } /** * Reset flow statistics @@ -49,36 +43,54 @@ struct Flow /** * @return The Flow's ID */ - int32_t id() { return _flowId; } + int32_t id() + { + return _flowId; + } /** * @return Number of incoming bytes processed on this flow per unit time */ - int64_t bytesInPerUnitTime() { return _bytesInPerUnitTime; } + int64_t bytesInPerUnitTime() + { + return _bytesInPerUnitTime; + } /** * Record number of incoming bytes on this flow * * @param bytes Number of incoming bytes */ - void recordIncomingBytes(uint64_t bytes) { _bytesInPerUnitTime += bytes; } + void recordIncomingBytes(uint64_t bytes) + { + _bytesInPerUnitTime += bytes; + } /** * @return Number of outgoing bytes processed on this flow per unit time */ - int64_t bytesOutPerUnitTime() { return _bytesOutPerUnitTime; } + int64_t bytesOutPerUnitTime() + { + return _bytesOutPerUnitTime; + } /** * Record number of outgoing bytes on this flow * * @param bytes */ - void recordOutgoingBytes(uint64_t bytes) { _bytesOutPerUnitTime += bytes; } + void recordOutgoingBytes(uint64_t bytes) + { + _bytesOutPerUnitTime += bytes; + } /** * @return The total number of bytes processed on this flow */ - uint64_t totalBytes() { return _bytesInPerUnitTime + _bytesOutPerUnitTime; } + uint64_t totalBytes() + { + return _bytesInPerUnitTime + _bytesOutPerUnitTime; + } /** * How long since a packet was sent or received in this flow @@ -86,24 +98,34 @@ struct Flow * @param now Current time * @return The age of the flow in terms of last recorded activity */ - int64_t age(int64_t now) { return now - _lastActivity; } + int64_t age(int64_t now) + { + return now - _lastActivity; + } /** * Record that traffic was processed on this flow at the given time. * * @param now Current time */ - void updateActivity(int64_t now) { _lastActivity = now; } + void updateActivity(int64_t now) + { + _lastActivity = now; + } /** * @return Path assigned to this flow */ - SharedPtr assignedPath() { return _assignedPath; } + SharedPtr assignedPath() + { + return _assignedPath; + } /** * @param path Assigned path over which this flow should be handled */ - void assignPath(const SharedPtr &path, int64_t now) { + void assignPath(const SharedPtr& path, int64_t now) + { _assignedPath = path; _lastPathReassignment = now; } @@ -119,6 +141,6 @@ struct Flow SharedPtr _previouslyAssignedPath; }; -} // namespace ZeroTier +} // namespace ZeroTier #endif \ No newline at end of file diff --git a/osdep/Link.hpp b/osdep/Link.hpp index 88e24ecaf..80713f23b 100644 --- a/osdep/Link.hpp +++ b/osdep/Link.hpp @@ -14,19 +14,20 @@ #ifndef ZT_LINK_HPP #define ZT_LINK_HPP -#include - #include "../node/AtomicCounter.hpp" +#include "../node/SharedPtr.hpp" + +#include namespace ZeroTier { -class Link -{ +class Link { friend class SharedPtr; -public: - - Link() {} + public: + Link() + { + } /** * @@ -38,123 +39,165 @@ public: * @param failoverToLinkStr * @param userSpecifiedAlloc */ - Link(std::string& ifnameStr, - uint8_t ipvPref, - uint32_t speed, - uint32_t linkMonitorInterval, - uint32_t upDelay, - uint32_t downDelay, - bool enabled, - uint8_t mode, - std::string failoverToLinkStr, - float userSpecifiedAlloc) : - _ifnameStr(ifnameStr), - _ipvPref(ipvPref), - _speed(speed), - _relativeSpeed(0), - _linkMonitorInterval(linkMonitorInterval), - _upDelay(upDelay), - _downDelay(downDelay), - _enabled(enabled), - _mode(mode), - _failoverToLinkStr(failoverToLinkStr), - _userSpecifiedAlloc(userSpecifiedAlloc), - _isUserSpecified(false) - {} + Link(std::string& ifnameStr, uint8_t ipvPref, uint32_t speed, uint32_t linkMonitorInterval, uint32_t upDelay, uint32_t downDelay, bool enabled, uint8_t mode, std::string failoverToLinkStr, float userSpecifiedAlloc) + : _ifnameStr(ifnameStr) + , _ipvPref(ipvPref) + , _speed(speed) + , _relativeSpeed(0) + , _linkMonitorInterval(linkMonitorInterval) + , _upDelay(upDelay) + , _downDelay(downDelay) + , _enabled(enabled) + , _mode(mode) + , _failoverToLinkStr(failoverToLinkStr) + , _userSpecifiedAlloc(userSpecifiedAlloc) + , _isUserSpecified(false) + { + } /** * @return The string representation of this link's underlying interface's system name. */ - inline std::string ifname() { return _ifnameStr; } + inline std::string ifname() + { + return _ifnameStr; + } /** * @return Whether this link is designated as a primary. */ - inline bool primary() { return _mode == ZT_MULTIPATH_SLAVE_MODE_PRIMARY; } + inline bool primary() + { + return _mode == ZT_MULTIPATH_SLAVE_MODE_PRIMARY; + } /** * @return Whether this link is designated as a spare. */ - inline bool spare() { return _mode == ZT_MULTIPATH_SLAVE_MODE_SPARE; } + inline bool spare() + { + return _mode == ZT_MULTIPATH_SLAVE_MODE_SPARE; + } /** * @return The name of the link interface that should be used in the event of a failure. */ - inline std::string failoverToLink() { return _failoverToLinkStr; } + inline std::string failoverToLink() + { + return _failoverToLinkStr; + } /** * @return Whether this link interface was specified by the user or auto-detected. */ - inline bool isUserSpecified() { return _isUserSpecified; } + inline bool isUserSpecified() + { + return _isUserSpecified; + } /** * Signify that this link was specified by the user and not the result of auto-detection. * * @param isUserSpecified */ - inline void setAsUserSpecified(bool isUserSpecified) { _isUserSpecified = isUserSpecified; } + inline void setAsUserSpecified(bool isUserSpecified) + { + _isUserSpecified = isUserSpecified; + } /** * @return Whether or not the user has specified failover instructions. */ - inline bool userHasSpecifiedFailoverInstructions() { return _failoverToLinkStr.length(); } + inline bool userHasSpecifiedFailoverInstructions() + { + return _failoverToLinkStr.length(); + } /** * @return The speed of the link relative to others in the bond. */ - inline uint8_t relativeSpeed() { return _relativeSpeed; } + inline uint8_t relativeSpeed() + { + return _relativeSpeed; + } /** * Sets the speed of the link relative to others in the bond. * * @param relativeSpeed The speed relative to the rest of the link. */ - inline void setRelativeSpeed(uint8_t relativeSpeed) { _relativeSpeed = relativeSpeed; } + inline void setRelativeSpeed(uint8_t relativeSpeed) + { + _relativeSpeed = relativeSpeed; + } /** * Sets the speed of the link relative to others in the bond. * * @param relativeSpeed */ - inline void setMonitorInterval(uint32_t interval) { _linkMonitorInterval = interval; } + inline void setMonitorInterval(uint32_t interval) + { + _linkMonitorInterval = interval; + } /** * @return The absolute speed of the link (as specified by the user.) */ - inline uint32_t monitorInterval() { return _linkMonitorInterval; } + inline uint32_t monitorInterval() + { + return _linkMonitorInterval; + } /** * @return The absolute speed of the link (as specified by the user.) */ - inline uint32_t speed() { return _speed; } + inline uint32_t speed() + { + return _speed; + } /** * @return The address preference for this link (as specified by the user.) */ - inline uint8_t ipvPref() { return _ipvPref; } + inline uint8_t ipvPref() + { + return _ipvPref; + } /** * @return The mode (e.g. primary/spare) for this link (as specified by the user.) */ - inline uint8_t mode() { return _mode; } + inline uint8_t mode() + { + return _mode; + } /** * @return The upDelay parameter for all paths on this link. */ - inline uint32_t upDelay() { return _upDelay; } + inline uint32_t upDelay() + { + return _upDelay; + } /** * @return The downDelay parameter for all paths on this link. */ - inline uint32_t downDelay() { return _downDelay; } + inline uint32_t downDelay() + { + return _downDelay; + } /** * @return Whether this link is enabled or disabled */ - inline uint8_t enabled() { return _enabled; } - -private: + inline uint8_t enabled() + { + return _enabled; + } + private: /** * String representation of underlying interface's system name */ @@ -223,15 +266,15 @@ private: float _userSpecifiedAlloc; /** - * Whether or not this link was created as a result of manual user specification. This is - * important to know because certain policy decisions are dependent on whether the user - * intents to use a specific set of interfaces. - */ + * Whether or not this link was created as a result of manual user specification. This is + * important to know because certain policy decisions are dependent on whether the user + * intents to use a specific set of interfaces. + */ bool _isUserSpecified; AtomicCounter __refCount; }; -} // namespace ZeroTier +} // namespace ZeroTier -#endif \ No newline at end of file +#endif