From ecfac0601a9a725ab653b76017889a6c06ff19bf Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Mon, 23 Nov 2020 09:59:28 -0800 Subject: [PATCH] Add new bond control commands to CLI --- node/Bond.cpp | 43 ++++++++- node/Bond.hpp | 25 +++++ node/BondController.cpp | 6 ++ node/BondController.hpp | 8 ++ node/Path.hpp | 26 ++++++ one.cpp | 198 +++++++++++++++++++++++++++++++++++++++- service/OneService.cpp | 131 +++++++++++++++++++------- 7 files changed, 395 insertions(+), 42 deletions(-) diff --git a/node/Bond.cpp b/node/Bond.cpp index f504effe5..d6a16ad86 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -414,6 +414,16 @@ bool Bond::assignFlowToBondedPath(SharedPtr &flow, int64_t now) return false; } } + if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) { + if (_abOverflowEnabled) { + flow->assignPath(_abPath, now); + } else { + sprintf(traceMsg, "%s (bond) Unable to assign outgoing flow %x to peer %llx, no active overflow link", + OSUtils::humanReadableTimestamp().c_str(), flow->id(), _peer->_id.address().toInt()); + RR->t->bondStateMessage(NULL, traceMsg); + return false; + } + } flow->assignedPath()->address().toString(curPathStr); SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, flow->assignedPath()->localSocket()); sprintf(traceMsg, "%s (bond) Assigned outgoing flow %x to peer %llx to link %s/%s, %lu active flow(s)", @@ -1240,6 +1250,29 @@ void Bond::dequeueNextActiveBackupPath(const uint64_t now) } } +bool Bond::abForciblyRotateLink() +{ + char traceMsg[256]; + char prevPathStr[128]; + char curPathStr[128]; + if (_bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) { + SharedPtr prevPath = _abPath; + _abPath->address().toString(prevPathStr); + dequeueNextActiveBackupPath(RR->node->now()); + _abPath->address().toString(curPathStr); + sprintf(traceMsg, "%s (active-backup) Forcibly rotating peer %llx link from %s/%s to %s/%s", + OSUtils::humanReadableTimestamp().c_str(), + _peer->_id.address().toInt(), + getLink(prevPath)->ifname().c_str(), + prevPathStr, + getLink(_abPath)->ifname().c_str(), + curPathStr); + RR->t->bondStateMessage(NULL, traceMsg); + return true; + } + return false; +} + void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) { char traceMsg[256]; @@ -1302,9 +1335,9 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) } } /** - * [Manual mode] - * The user has specified links or failover rules that the bonding policy should adhere to. - */ + * [Manual mode] + * The user has specified links or failover rules that the bonding policy should adhere to. + */ else if (userHasSpecifiedLinks()) { if (userHasSpecifiedPrimaryLink()) { //sprintf(traceMsg, "%s (active-backup) Checking local.conf for user-specified primary link\n", OSUtils::humanReadableTimestamp().c_str()); @@ -1507,7 +1540,7 @@ void Bond::processActiveBackupTasks(void *tPtr, const int64_t now) if (!bFoundPathInQueue) { _abFailoverQueue.push_front(_paths[i]); _paths[i]->address().toString(curPathStr); - sprintf(traceMsg, "%s (active-backup) Added link %s/%s to peer %llx, there are %zu links in the queue", + sprintf(traceMsg, "%s (active-backup) Added link %s/%s to peer %llx to failover queue, there are %zu links in the queue", OSUtils::humanReadableTimestamp().c_str(), getLink(_paths[i])->ifname().c_str(), curPathStr, _peer->_id.address().toInt(), _abFailoverQueue.size()); RR->t->bondStateMessage(NULL, traceMsg); } @@ -1747,7 +1780,7 @@ void Bond::setReasonableDefaults(int policy, SharedPtr templateBond, bool them onto the defaults that were previously set */ if (useTemplate) { _policyAlias = templateBond->_policyAlias; - _failoverInterval = templateBond->_failoverInterval; + _failoverInterval = templateBond->_failoverInterval >= 250 ? templateBond->_failoverInterval : _failoverInterval; _downDelay = templateBond->_downDelay; _upDelay = templateBond->_upDelay; if (templateBond->_linkMonitorStrategy == ZT_MULTIPATH_SLAVE_MONITOR_STRATEGY_PASSIVE diff --git a/node/Bond.hpp b/node/Bond.hpp index 763089427..697aee676 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -436,6 +436,11 @@ public: */ inline void setFailoverInterval(uint32_t interval) { _failoverInterval = interval; } + /** + * @param interval Maximum amount of time user expects a failover to take on this bond. + */ + inline uint32_t getFailoverInterval() { return _failoverInterval; } + /** * @param strategy Strategy that the bond uses to re-assign protocol flows. */ @@ -446,6 +451,11 @@ public: */ inline void setLinkMonitorStrategy(uint8_t strategy) { _linkMonitorStrategy = strategy; } + /** + * @param abOverflowEnabled Whether "overflow" mode is enabled for this active-backup bond + */ + inline void setOverflowMode(bool abOverflowEnabled) { _abOverflowEnabled = abOverflowEnabled; } + /** * @return the current up delay parameter */ @@ -520,6 +530,11 @@ public: */ inline void setPacketsPerLink(int packetsPerLink) { _packetsPerLink = packetsPerLink; } + /** + * @return Number of packets to be sent on each interface in a balance-rr bond + */ + inline int getPacketsPerLink() { return _packetsPerLink; } + /** * * @param linkSelectMethod @@ -544,6 +559,15 @@ public: */ inline bool allowPathNegotiation() { return _allowPathNegotiation; } + /** + * Forcibly rotates the currently active link used in an active-backup bond to the next link in the failover queue + * + * @return True if this operation succeeded, false if otherwise + */ + bool abForciblyRotateLink(); + + SharedPtr getPeer() { return _peer; } + private: const RuntimeEnvironment *RR; @@ -587,6 +611,7 @@ private: SharedPtr _abPath; // current active path std::list > _abFailoverQueue; uint8_t _abLinkSelectMethod; // link re-selection policy for the primary link in active-backup + bool _abOverflowEnabled; // balance-rr uint8_t _rrIdx; // index to path currently in use during Round Robin operation diff --git a/node/BondController.cpp b/node/BondController.cpp index 7c97ef7a4..5692390c1 100644 --- a/node/BondController.cpp +++ b/node/BondController.cpp @@ -76,6 +76,12 @@ bool BondController::assignBondingPolicyToPeer(int64_t identity, const std::stri return false; } +SharedPtr BondController::getBondByPeerId(int64_t identity) +{ + Mutex::Lock _l(_bonds_m); + return _bonds.count(identity) ? _bonds[identity] : SharedPtr(); +} + SharedPtr BondController::createTransportTriggeredBond(const RuntimeEnvironment *renv, const SharedPtr& peer) { Mutex::Lock _l(_bonds_m); diff --git a/node/BondController.hpp b/node/BondController.hpp index 892da2a43..7d8e8e69b 100644 --- a/node/BondController.hpp +++ b/node/BondController.hpp @@ -127,6 +127,14 @@ public: */ bool assignBondingPolicyToPeer(int64_t identity, const std::string& policyAlias); + /** + * Get pointer to bond by a given peer ID + * + * @param peer Remote peer ID + * @return A pointer to the Bond + */ + SharedPtr getBondByPeerId(int64_t identity); + /** * Add a new bond to the bond controller. * diff --git a/node/Path.hpp b/node/Path.hpp index dfafaccfb..0839158af 100644 --- a/node/Path.hpp +++ b/node/Path.hpp @@ -467,6 +467,32 @@ public: _packetsOut = 0; } + + /** + * The mean latency (computed from a sliding window.) + */ + float latencyMean() { return _latencyMean; } + + /** + * Packet delay variance (computed from a sliding window.) + */ + float latencyVariance() { return _latencyVariance; } + + /** + * The ratio of lost packets to received packets. + */ + float packetLossRatio() { return _packetLossRatio; } + + /** + * The ratio of packets that failed their MAC/CRC checks to those that did not. + */ + float packetErrorRatio() { return _packetErrorRatio; } + + /** + * + */ + uint8_t allocation() { return _allocation; } + private: volatile int64_t _lastOut; diff --git a/one.cpp b/one.cpp index a685ecf20..f09ad5df1 100644 --- a/one.cpp +++ b/one.cpp @@ -491,6 +491,198 @@ static int cli(int argc,char **argv) printf("%u %s %s" ZT_EOL_S,scode,command.c_str(),responseBody.c_str()); return 1; } + } else if (command == "bond") { + /* zerotier-cli bond */ + if (arg1.empty()) { + printf("(bond) command is missing required arugments" ZT_EOL_S); + return 2; + } + /* zerotier-cli bond list */ + if (arg1 == "list") { + fprintf(stderr, "zerotier-cli bond list\n"); + const unsigned int scode = Http::GET(1024 * 1024 * 16,60000,(const struct sockaddr *)&addr,"/bonds",requestHeaders,responseHeaders,responseBody); + if (scode == 0) { + printf("Error connecting to the ZeroTier service: %s\n\nPlease check that the service is running and that TCP port 9993 can be contacted via 127.0.0.1." ZT_EOL_S, responseBody.c_str()); + return 1; + } + nlohmann::json j; + try { + j = OSUtils::jsonParse(responseBody); + } catch (std::exception &exc) { + printf("%u %s invalid JSON response (%s)" ZT_EOL_S,scode,command.c_str(),exc.what()); + return 1; + } catch ( ... ) { + printf("%u %s invalid JSON response (unknown exception)" ZT_EOL_S,scode,command.c_str()); + return 1; + } + if (scode == 200) { + if (json) { + printf("%s" ZT_EOL_S,OSUtils::jsonDump(j).c_str()); + } else { + bool bFoundBond = false; + printf(" " ZT_EOL_S); + if (j.is_array()) { + for(unsigned long k=0;k= ZT_BONDING_POLICY_NONE && bondingPolicy <= ZT_BONDING_POLICY_BALANCE_AWARE) { + policyStr = BondController::getPolicyStrByCode(bondingPolicy); + } + printf("%10s %32s %8s %d/%d" ZT_EOL_S, + OSUtils::jsonString(p ["address"],"-").c_str(), + policyStr.c_str(), + healthStr.c_str(), + numAliveLinks, + numTotalLinks); + } + } + } + if (!bFoundBond) { + printf(" NONE\t\t\t\tNONE\t NONE NONE" ZT_EOL_S); + } + } + return 0; + } else { + printf("%u %s %s" ZT_EOL_S,scode,command.c_str(),responseBody.c_str()); + return 1; + } + } + else if (arg1.length() == 10) { /* zerotier-cli bond enable */ + if (arg2 == "enable") { + fprintf(stderr, "zerotier-cli bond enable\n"); + return 0; + } + if (arg2 == "rotate") { /* zerotier-cli bond rotate */ + fprintf(stderr, "zerotier-cli bond rotate\n"); + requestHeaders["Content-Type"] = "application/json"; + requestHeaders["Content-Length"] = "2"; + unsigned int scode = Http::POST( + 1024 * 1024 * 16, + 60000, + (const struct sockaddr *)&addr, + (std::string("/bond/") + arg2 + "/" + arg1).c_str(), + requestHeaders, + "{}", + 2, + responseHeaders, + responseBody); + if (scode == 200) { + if (json) { + printf("%s",cliFixJsonCRs(responseBody).c_str()); + } else { + printf("200 bond OK" ZT_EOL_S); + } + return 0; + } else { + printf("%u %s %s" ZT_EOL_S,scode,command.c_str(),responseBody.c_str()); + return 1; + } + return 0; + } + if (arg2 == "show") { + //fprintf(stderr, "zerotier-cli bond show\n"); + const unsigned int scode = Http::GET( + 1024 * 1024 * 16,60000, + (const struct sockaddr *)&addr,(std::string("/bond/") + arg2 + "/" + arg1).c_str(), + requestHeaders, + responseHeaders, + responseBody); + if (scode == 0) { + printf("Error connecting to the ZeroTier service: %s\n\nPlease check that the service is running and that TCP port 9993 can be contacted via 127.0.0.1." ZT_EOL_S, responseBody.c_str()); + return 1; + } + nlohmann::json j; + try { + j = OSUtils::jsonParse(responseBody); + } catch (std::exception &exc) { + printf("%u %s invalid JSON response (%s)" ZT_EOL_S,scode,command.c_str(),exc.what()); + return 1; + } catch ( ... ) { + printf("%u %s invalid JSON response (unknown exception)" ZT_EOL_S,scode,command.c_str()); + return 1; + } + if (scode == 200) { + if (json) { + printf("%s" ZT_EOL_S,OSUtils::jsonDump(j).c_str()); + } else { + bool bFoundBond = false; + std::string healthStr; + if (OSUtils::jsonInt(j["isHealthy"],0)) { + healthStr = "Healthy"; + } else { + healthStr = "Degraded"; + } + int numAliveLinks = OSUtils::jsonInt(j["numAliveLinks"],0); + int numTotalLinks = OSUtils::jsonInt(j["numTotalLinks"],0); + printf("Peer : %s\n", arg1.c_str()); + printf("Bond : %s\n", OSUtils::jsonString(j["bondingPolicy"],"-").c_str()); + //if (bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) { + printf("Link Select Method : %d\n", OSUtils::jsonInt(j["linkSelectMethod"],0)); + //} + printf("Status : %s\n", healthStr.c_str()); + printf("Links : %d/%d\n", numAliveLinks, numTotalLinks); + printf("Failover Interval : %d (ms)\n", OSUtils::jsonInt(j["failoverInterval"],0)); + printf("Up Delay : %d (ms)\n", OSUtils::jsonInt(j["upDelay"],0)); + printf("Down Delay : %d (ms)\n", OSUtils::jsonInt(j["downDelay"],0)); + printf("Packets Per Link : %d (ms)\n", OSUtils::jsonInt(j["packetsPerLink"],0)); + nlohmann::json &p = j["links"]; + if (p.is_array()) { + printf("\n Interface Name\t\t\t\t\t Path\t Alive\n"); + for(int i=0; i<80; i++) { printf("-"); } + printf("\n"); + for (int i=0; i= ZT_BONDING_POLICY_NONE && bondingPolicy <= ZT_BONDING_POLICY_BALANCE_AWARE) { @@ -540,7 +732,7 @@ static int cli(int argc,char **argv) } printf("%10s %32s %8s %d/%d" ZT_EOL_S, - OSUtils::jsonString(p ["address"],"-").c_str(), + OSUtils::jsonString(p["address"],"-").c_str(), policyStr.c_str(), healthStr.c_str(), numAliveLinks, diff --git a/service/OneService.cpp b/service/OneService.cpp index 4c977c6f7..fb1d65c9a 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -296,40 +296,48 @@ static void _peerToJson(nlohmann::json &pj,const ZT_Peer *peer) pj["paths"] = pa; } -static void _peerBondToJson(nlohmann::json &pj,const ZT_Peer *peer) +static void _bondToJson(nlohmann::json &pj, SharedPtr &bond) { char tmp[256]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.10llx",peer->address); - //pj["aggregateLinkLatency"] = peer->latency; - std::string policyStr = BondController::getPolicyStrByCode(peer->bondingPolicy); - pj["policy"] = policyStr; + uint64_t now = OSUtils::now(); + + int bondingPolicy = bond->getPolicy(); + pj["bondingPolicy"] = BondController::getPolicyStrByCode(bondingPolicy); + if (bondingPolicy == ZT_BONDING_POLICY_NONE) { + return; + } + + pj["isHealthy"] = bond->isHealthy(); + pj["numAliveLinks"] = bond->getNumAliveLinks(); + pj["numTotalLinks"] = bond->getNumTotalLinks(); + pj["failoverInterval"] = bond->getFailoverInterval(); + pj["downDelay"] = bond->getDownDelay(); + pj["upDelay"] = bond->getUpDelay(); + if (bondingPolicy == ZT_BONDING_POLICY_BALANCE_RR) { + pj["packetsPerLink"] = bond->getPacketsPerLink(); + } + if (bondingPolicy == ZT_BONDING_POLICY_ACTIVE_BACKUP) { + pj["linkSelectMethod"] = bond->getLinkSelectMethod(); + } nlohmann::json pa = nlohmann::json::array(); - for(unsigned int i=0;ipathCount;++i) { - int64_t lastSend = peer->paths[i].lastSend; - int64_t lastReceive = peer->paths[i].lastReceive; - nlohmann::json j; - j["ifname"] = std::string(peer->paths[i].ifname); - j["path"] = reinterpret_cast(&(peer->paths[i].address))->toString(tmp); - j["lastTX"] = (lastSend < 0) ? 0 : lastSend; - j["lastRX"] = (lastReceive < 0) ? 0 : lastReceive; - j["lat"] = peer->paths[i].latencyMean; - j["pdv"] = peer->paths[i].latencyVariance; + std::vector< SharedPtr > paths = bond->getPeer()->paths(now); - //j["trustedPathId"] = peer->paths[i].trustedPathId; - //j["active"] = (bool)(peer->paths[i].expired == 0); - //j["expired"] = (bool)(peer->paths[i].expired != 0); - //j["preferred"] = (bool)(peer->paths[i].preferred != 0); - //j["ltm"] = peer->paths[i].latencyMax; - //j["plr"] = peer->paths[i].packetLossRatio; - //j["per"] = peer->paths[i].packetErrorRatio; - //j["thr"] = peer->paths[i].throughputMean; - //j["thm"] = peer->paths[i].throughputMax; - //j["thv"] = peer->paths[i].throughputVariance; - //j["avl"] = peer->paths[i].availability; - //j["age"] = peer->paths[i].age; - //j["alloc"] = peer->paths[i].allocation; - //j["ifname"] = peer->paths[i].ifname; + for(unsigned int i=0;iaddress().toString(pathStr); + + nlohmann::json j; + j["ifname"] = bond->getLink(paths[i])->ifname(); + j["path"] = pathStr; + j["alive"] = paths[i]->alive(now,true); + j["bonded"] = paths[i]->bonded(); + j["latencyMean"] = paths[i]->latencyMean(); + j["latencyVariance"] = paths[i]->latencyVariance(); + j["packetLossRatio"] = paths[i]->packetLossRatio(); + j["packetErrorRatio"] = paths[i]->packetErrorRatio(); + j["givenLinkSpeed"] = 1000; + j["allocation"] = paths[i]->allocation(); pa.push_back(j); } pj["links"] = pa; @@ -1104,6 +1112,7 @@ public: virtual void terminate() { _run_m.lock(); + _run = false; _run_m.unlock(); _phy.whack(); @@ -1230,9 +1239,34 @@ public: } } #endif - if (httpMethod == HTTP_GET) { if (isAuth) { + if (ps[0] == "bond") { + if (_node->bondController()->inUse()) { + if (ps.size() == 3) { + //fprintf(stderr, "ps[0]=%s\nps[1]=%s\nps[2]=%s\n", ps[0].c_str(), ps[1].c_str(), ps[2].c_str()); + if (ps[2].length() == 10) { + // check if hex string + const uint64_t id = Utils::hexStrToU64(ps[2].c_str()); + if (ps[1] == "show") { + SharedPtr bond = _node->bondController()->getBondByPeerId(id); + if (bond) { + _bondToJson(res,bond); + scode = 200; + } else { + fprintf(stderr, "unable to find bond to peer %llx\n", id); + scode = 400; + } + } + if (ps[1] == "flows") { + fprintf(stderr, "displaying flows\n"); + } + } + } + } else { + scode = 400; /* bond controller is not enabled */ + } + } if (ps[0] == "status") { ZT_NodeStatus status; _node->status(&status); @@ -1257,7 +1291,7 @@ public: json &settings = res["config"]["settings"]; settings["primaryPort"] = OSUtils::jsonInt(settings["primaryPort"],(uint64_t)_primaryPort) & 0xffff; settings["allowTcpFallbackRelay"] = OSUtils::jsonBool(settings["allowTcpFallbackRelay"],_allowTcpFallbackRelay); - +/* if (_node->bondController()->inUse()) { json &multipathConfig = res["bonds"]; ZT_PeerList *pl = _node->peers(); @@ -1266,13 +1300,14 @@ public: for(unsigned long i=0;ipeerCount;++i) { if (pl->peers[i].isBonded) { nlohmann::json pj; - _peerBondToJson(pj,&(pl->peers[i])); + _bondToJson(pj,&(pl->peers[i])); OSUtils::ztsnprintf(peerAddrStr,sizeof(peerAddrStr),"%.10llx",pl->peers[i].address); multipathConfig[peerAddrStr] = (pj); } } } } +*/ #ifdef ZT_USE_MINIUPNPC settings["portMappingEnabled"] = OSUtils::jsonBool(settings["portMappingEnabled"],true); @@ -1413,8 +1448,32 @@ public: } else scode = 401; // isAuth == false } else if ((httpMethod == HTTP_POST)||(httpMethod == HTTP_PUT)) { - if (isAuth) { - + if (isAuth) { + if (ps[0] == "bond") { + if (_node->bondController()->inUse()) { + if (ps.size() == 3) { + //fprintf(stderr, "ps[0]=%s\nps[1]=%s\nps[2]=%s\n", ps[0].c_str(), ps[1].c_str(), ps[2].c_str()); + if (ps[2].length() == 10) { + // check if hex string + const uint64_t id = Utils::hexStrToU64(ps[2].c_str()); + if (ps[1] == "rotate") { + SharedPtr bond = _node->bondController()->getBondByPeerId(id); + if (bond) { + scode = bond->abForciblyRotateLink() ? 200 : 400; + } else { + fprintf(stderr, "unable to find bond to peer %llx\n", id); + scode = 400; + } + } + if (ps[1] == "enable") { + fprintf(stderr, "enabling bond\n"); + } + } + } + } else { + scode = 400; /* bond controller is not enabled */ + } + } if (ps[0] == "moon") { if (ps.size() == 2) { @@ -1644,6 +1703,9 @@ public: std::string customPolicyStr(policyItr.key()); json &customPolicy = policyItr.value(); std::string basePolicyStr(OSUtils::jsonString(customPolicy["basePolicy"],"")); + if (basePolicyStr.empty()) { + fprintf(stderr, "error: no base policy was specified for custom policy (%s)\n", customPolicyStr.c_str()); + } if (_node->bondController()->getPolicyCodeByStr(basePolicyStr) == ZT_BONDING_POLICY_NONE) { fprintf(stderr, "error: custom policy (%s) is invalid, unknown base policy (%s).\n", customPolicyStr.c_str(), basePolicyStr.c_str()); @@ -1677,6 +1739,7 @@ public: newTemplateBond->setUserQualityWeights(weights,ZT_QOS_WEIGHT_SIZE); } // Bond-specific properties + newTemplateBond->setOverflowMode(OSUtils::jsonInt(customPolicy["overflow"],false)); newTemplateBond->setUpDelay(OSUtils::jsonInt(customPolicy["upDelay"],-1)); newTemplateBond->setDownDelay(OSUtils::jsonInt(customPolicy["downDelay"],-1)); newTemplateBond->setFlowRebalanceStrategy(OSUtils::jsonInt(customPolicy["flowRebalanceStrategy"],(uint64_t)0));