diff --git a/node/Bond.cpp b/node/Bond.cpp index 0c4037e75..45b34976f 100644 --- a/node/Bond.cpp +++ b/node/Bond.cpp @@ -407,8 +407,9 @@ void Bond::recordOutgoingPacket(const SharedPtr& path, uint64_t packetId, _lastFrame = now; } if (shouldRecord) { + //_paths[pathIdx].expectingAckAsOf = now; + //_paths[pathIdx].totalBytesSentSinceLastAckRecieved += payloadLength; //_paths[pathIdx].unackedBytes += payloadLength; - // Take note that we're expecting a VERB_ACK on this path as of a specific time if (_paths[pathIdx].qosStatsOut.size() < ZT_QOS_MAX_PENDING_RECORDS) { _paths[pathIdx].qosStatsOut[packetId] = now; } @@ -443,10 +444,24 @@ void Bond::recordIncomingPacket(const SharedPtr& path, uint64_t packetId, } if (shouldRecord) { if (_paths[pathIdx].qosStatsIn.size() < ZT_QOS_MAX_PENDING_RECORDS) { + // debug("Recording QoS information (table size = %d)", _paths[pathIdx].qosStatsIn.size()); _paths[pathIdx].qosStatsIn[packetId] = now; ++(_paths[pathIdx].packetsReceivedSinceLastQoS); //_paths[pathIdx].packetValiditySamples.push(true); } + else { + debug("QoS buffer full, will not record information"); + } + /* + if (_paths[pathIdx].ackStatsIn.size() < ZT_ACK_MAX_PENDING_RECORDS) { + //debug("Recording ACK information (table size = %d)", _paths[pathIdx].ackStatsIn.size()); + _paths[pathIdx].ackStatsIn[packetId] = payloadLength; + ++(_paths[pathIdx].packetsReceivedSinceLastAck); + } + else { + debug("ACK buffer full, will not record information"); + } + */ } } } @@ -491,6 +506,16 @@ void Bond::receivedQoS(const SharedPtr& path, int64_t now, int count, uint _paths[pathIdx].qosRecordSize.push(count); } +void Bond::receivedAck(int pathIdx, int64_t now, int32_t ackedBytes) +{ + /* + Mutex::Lock _l(_paths_m); + debug("received ACK of %d bytes on path %s, there are still %d un-acked bytes", ackedBytes, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].unackedBytes); + _paths[pathIdx].lastAckReceived = now; + _paths[pathIdx].unackedBytes = (ackedBytes > _paths[pathIdx].unackedBytes) ? 0 : _paths[pathIdx].unackedBytes - ackedBytes; + */ +} + int32_t Bond::generateQoSPacket(int pathIdx, int64_t now, char* qosBuffer) { int32_t len = 0; @@ -743,12 +768,38 @@ void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, int pathIdx) } } +void Bond::sendACK(void* tPtr, int pathIdx, int64_t localSocket, const InetAddress& atAddress, int64_t now) +{ + /* + Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_ACK); + int32_t bytesToAck = 0; + std::map::iterator it = _paths[pathIdx].ackStatsIn.begin(); + while (it != _paths[pathIdx].ackStatsIn.end()) { + bytesToAck += it->second; + ++it; + } + debug("sending ACK of %d bytes on path %s (table size = %d)", bytesToAck, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].ackStatsIn.size()); + outp.append(bytesToAck); + if (atAddress) { + outp.armor(_peer->key(), false, _peer->aesKeysIfSupported()); + RR->node->putPacket(tPtr, localSocket, atAddress, outp.data(), outp.size()); + } + else { + RR->sw->send(tPtr, outp, false); + } + _paths[pathIdx].ackStatsIn.clear(); + _paths[pathIdx].packetsReceivedSinceLastAck = 0; + _paths[pathIdx].lastAckSent = now; + */ +} + void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, const InetAddress& atAddress, int64_t now) { int64_t _now = RR->node->now(); Packet outp(_peer->_id.address(), RR->identity.address(), Packet::VERB_QOS_MEASUREMENT); char qosData[ZT_QOS_MAX_PACKET_SIZE]; int16_t len = generateQoSPacket(pathIdx, _now, qosData); + // debug("sending QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len); if (len) { outp.append(qosData, len); if (atAddress) { @@ -762,7 +813,6 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con _paths[pathIdx].lastQoSMeasurement = now; _overheadBytes += outp.size(); } - // debug("send QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len); } void Bond::processBackgroundBondTasks(void* tPtr, int64_t now) @@ -799,6 +849,12 @@ void Bond::processBackgroundBondTasks(void* tPtr, int64_t now) if (_paths[i].needsToSendQoS(now, _qosSendInterval)) { sendQOS_MEASUREMENT(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now); } + // ACK + /* + if (_paths[i].needsToSendAck(now, _ackSendInterval)) { + sendACK(tPtr, i, _paths[i].p->localSocket(), _paths[i].p->address(), now); + } + */ } } } @@ -1095,6 +1151,20 @@ void Bond::estimatePathQuality(int64_t now) log("Dropped %d QOS out-records", numDroppedQosOutRecords); } + /* + for (unsigned int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { + if (! _paths[i].p) { + continue; + } + // if ((now - _paths[i].lastAckReceived) > ackSendInterval) { + // debug("been a while since ACK"); + // if (_paths[i].unackedBytes > 0) { + // _paths[i].unackedBytes / _paths[i].bytesSen + // } + // } + } + */ + it = _paths[i].qosStatsIn.begin(); int numDroppedQosInRecords = 0; while (it != _paths[i].qosStatsIn.end()) { @@ -1238,6 +1308,7 @@ void Bond::dequeueNextActiveBackupPath(uint64_t now) bool Bond::abForciblyRotateLink() { + Mutex::Lock _l(_paths_m); if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) { int prevPathIdx = _abPathIdx; dequeueNextActiveBackupPath(RR->node->now()); @@ -1366,12 +1437,18 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) // Remove ineligible paths from the failover link queue for (std::deque::iterator it(_abFailoverQueue.begin()); it != _abFailoverQueue.end();) { + if (! _paths[(*it)].p) { + log("link is no longer valid, removing from failover queue (%zu links remain in queue)", _abFailoverQueue.size()); + it = _abFailoverQueue.erase(it); + continue; + } if (_paths[(*it)].p && ! _paths[(*it)].eligible) { SharedPtr link = RR->bc->getLinkBySocket(_policyAlias, _paths[(*it)].p->localSocket()); it = _abFailoverQueue.erase(it); if (link) { - log("link %s is ineligible, removing from failover queue (%zu links in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); + log("link %s is ineligible, removing from failover queue (%zu links remain in queue)", pathToStr(_paths[_abPathIdx].p).c_str(), _abFailoverQueue.size()); } + continue; } else { ++it; @@ -1533,8 +1610,17 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now) if (prevActiveBackupPathIdx != _abPathIdx) { _lastActiveBackupPathChange = now; } + if (_abFailoverQueue.empty()) { + return; // No sense in continuing since there are no links to switch to + } + if (_abLinkSelectMethod == ZT_BOND_RESELECTION_POLICY_ALWAYS) { SharedPtr abLink = getLink(_paths[_abPathIdx].p); + if (! _paths[_abFailoverQueue.front()].p) { + log("invalid link. not switching"); + return; + } + SharedPtr abFailoverLink = getLink(_paths[_abFailoverQueue.front()].p); if (abLink && ! abLink->primary() && _paths[_abFailoverQueue.front()].p && abFailoverLink && abFailoverLink->primary()) { dequeueNextActiveBackupPath(now); @@ -1589,6 +1675,7 @@ void Bond::initTimers() _lastPathNegotiationCheck = 0; _lastPathNegotiationReceived = 0; _lastQoSRateCheck = 0; + _lastAckRateCheck = 0; _lastQualityEstimation = 0; _lastBondStatusLog = 0; _lastSummaryDump = 0; @@ -1621,10 +1708,6 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _localUtility = 0; _negotiatedPathIdx = 0; - // QOS Verb (and related checks) - - _qosCutoffCount = 0; - // User preferences which may override the default bonding algorithm's behavior _userHasSpecifiedPrimaryLink = false; @@ -1717,7 +1800,9 @@ void Bond::setBondParameters(int policy, SharedPtr templateBond, bool useT _monitorInterval = _failoverInterval / ZT_BOND_ECHOS_PER_FAILOVER_INTERVAL; _qualityEstimationInterval = _failoverInterval * 2; _qosSendInterval = _failoverInterval * 2; + _ackSendInterval = _failoverInterval * 2; _qosCutoffCount = 0; + _ackCutoffCount = 0; _defaultPathRefractoryPeriod = 8000; } @@ -1736,7 +1821,7 @@ void Bond::setUserQualityWeights(float weights[], int len) SharedPtr Bond::getLink(const SharedPtr& path) { - return RR->bc->getLinkBySocket(_policyAlias, path->localSocket()); + return ! path ? SharedPtr() : RR->bc->getLinkBySocket(_policyAlias, path->localSocket()); } std::string Bond::pathToStr(const SharedPtr& path) diff --git a/node/Bond.hpp b/node/Bond.hpp index e8a5898c0..bc7bd55c5 100644 --- a/node/Bond.hpp +++ b/node/Bond.hpp @@ -638,6 +638,15 @@ class Bond { */ void receivedQoS(const SharedPtr& path, int64_t now, int count, uint64_t* rx_id, uint16_t* rx_ts); + /** + * Process the contents of an inbound VERB_ACK to gather path quality observations. + * + * @param pathIdx Path over which packet was received + * @param now Current time + * @param ackedBytes Number of bytes ACKed by this VERB_ACK + */ + void receivedAck(int pathIdx, int64_t now, int32_t ackedBytes); + /** * Generate the contents of a VERB_QOS_MEASUREMENT packet. * @@ -879,6 +888,26 @@ class Bond { */ void processBackgroundBondTasks(void* tPtr, int64_t now); + /** + * Rate limit gate for VERB_ACK + * + * @param now Current time + * @return Whether the incoming packet should be rate-gated + */ + inline bool rateGateACK(const int64_t now) + { + _ackCutoffCount++; + int numToDrain = _lastAckRateCheck ? (now - _lastAckRateCheck) / ZT_ACK_DRAINAGE_DIVISOR : _ackCutoffCount; + _lastAckRateCheck = now; + if (_ackCutoffCount > numToDrain) { + _ackCutoffCount -= numToDrain; + } + else { + _ackCutoffCount = 0; + } + return (_ackCutoffCount < ZT_ACK_CUTOFF_LIMIT); + } + /** * Rate limit gate for VERB_QOS_MEASUREMENT * @@ -1204,7 +1233,11 @@ class Bond { private: struct NominatedPath { NominatedPath() - : lastQoSMeasurement(0) + : lastAckSent(0) + , lastAckReceived(0) + , unackedBytes(0) + , packetsReceivedSinceLastAck(0) + , lastQoSMeasurement(0) , lastThroughputEstimation(0) , lastRefractoryUpdate(0) , lastAliveToggle(0) @@ -1295,6 +1328,15 @@ class Bond { return ((packetsReceivedSinceLastQoS >= ZT_QOS_TABLE_SIZE) || ((now - lastQoSMeasurement) > qosSendInterval)) && packetsReceivedSinceLastQoS; } + /** + * @param now Current time + * @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time + */ + inline bool needsToSendAck(int64_t now, int ackSendInterval) + { + return ((now - lastAckSent) >= ackSendInterval || (packetsReceivedSinceLastAck == ZT_QOS_TABLE_SIZE)) && packetsReceivedSinceLastAck; + } + /** * Reset packet counters */ @@ -1306,6 +1348,7 @@ class Bond { std::map qosStatsOut; // id:egress_time std::map qosStatsIn; // id:now + std::map ackStatsIn; // id:now RingBuffer qosRecordSize; RingBuffer qosRecordLossSamples; @@ -1314,6 +1357,11 @@ class Bond { RingBuffer throughputVarianceSamples; RingBuffer latencySamples; + uint64_t lastAckSent; + uint64_t lastAckReceived; + uint64_t unackedBytes; + uint64_t packetsReceivedSinceLastAck; + uint64_t lastQoSMeasurement; // Last time that a VERB_QOS_MEASUREMENT was sent out on this path. uint64_t lastThroughputEstimation; // Last time that the path's throughput was estimated. uint64_t lastRefractoryUpdate; // The last time that the refractory period was updated. @@ -1513,7 +1561,9 @@ class Bond { * Rate-limiting */ uint16_t _qosCutoffCount; + uint16_t _ackCutoffCount; uint64_t _lastQoSRateCheck; + uint64_t _lastAckRateCheck; uint16_t _pathNegotiationCutoffCount; uint64_t _lastPathNegotiationReceived; diff --git a/node/Constants.hpp b/node/Constants.hpp index 0be2a881c..6389bdcd3 100644 --- a/node/Constants.hpp +++ b/node/Constants.hpp @@ -372,6 +372,11 @@ */ #define ZT_QOS_TABLE_SIZE ((ZT_QOS_MAX_PACKET_SIZE * 8) / (64 + 16)) +/** + * Maximum number of packets we monitor for ACK information at any given time + */ +#define ZT_ACK_MAX_PENDING_RECORDS (32 * 1024) + /** * Maximum number of packets we monitor for QoS information at any given time */ diff --git a/node/IncomingPacket.cpp b/node/IncomingPacket.cpp index 0d7e606c5..72cd4bde7 100644 --- a/node/IncomingPacket.cpp +++ b/node/IncomingPacket.cpp @@ -88,6 +88,7 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr,int32_t f peer->received(tPtr,_path,hops(),packetId(),payloadLength(),v,0,Packet::VERB_NOP,false,0,ZT_QOS_NO_FLOW); break; case Packet::VERB_HELLO: r = _doHELLO(RR,tPtr,true); break; + case Packet::VERB_ACK : r = _doACK(RR,tPtr,peer); break; case Packet::VERB_QOS_MEASUREMENT: r = _doQOS_MEASUREMENT(RR,tPtr,peer); break; case Packet::VERB_ERROR: r = _doERROR(RR,tPtr,peer); break; case Packet::VERB_OK: r = _doOK(RR,tPtr,peer); break; @@ -250,28 +251,47 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar return true; } -bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer) +bool IncomingPacket::_doACK(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr& peer) +{ + /* + SharedPtr bond = peer->bond(); + if (! bond || ! bond->rateGateACK(RR->node->now())) { + return true; + } + int32_t ackedBytes; + if (payloadLength() != sizeof(ackedBytes)) { + return true; // ignore + } + memcpy(&ackedBytes, payload(), sizeof(ackedBytes)); + if (bond) { + bond->receivedAck(_path, RR->node->now(), Utils::ntoh(ackedBytes)); + } + */ + return true; +} + +bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment* RR, void* tPtr, const SharedPtr& peer) { SharedPtr bond = peer->bond(); - if (!bond || !bond->rateGateQoS(RR->node->now(), _path)) { + if (! bond || ! bond->rateGateQoS(RR->node->now(), _path)) { return true; } if (payloadLength() > ZT_QOS_MAX_PACKET_SIZE || payloadLength() < ZT_QOS_MIN_PACKET_SIZE) { - return true; // ignore + return true; // ignore } const int64_t now = RR->node->now(); uint64_t rx_id[ZT_QOS_TABLE_SIZE]; uint16_t rx_ts[ZT_QOS_TABLE_SIZE]; - char *begin = (char *)payload(); - char *ptr = begin; + char* begin = (char*)payload(); + char* ptr = begin; int count = 0; unsigned int len = payloadLength(); // Read packet IDs and latency compensation intervals for each packet tracked by this QoS packet while (ptr < (begin + len) && (count < ZT_QOS_TABLE_SIZE)) { memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t)); - ptr+=sizeof(uint64_t); + ptr += sizeof(uint64_t); memcpy((void*)&rx_ts[count], ptr, sizeof(uint16_t)); - ptr+=sizeof(uint16_t); + ptr += sizeof(uint16_t); count++; } if (bond) { diff --git a/node/IncomingPacket.hpp b/node/IncomingPacket.hpp index 0014ddee4..1e6bb8f15 100644 --- a/node/IncomingPacket.hpp +++ b/node/IncomingPacket.hpp @@ -116,6 +116,7 @@ private: // been authenticated, decrypted, decompressed, and classified. bool _doERROR(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer); bool _doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool alreadyAuthenticated); + bool _doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer); bool _doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer); bool _doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer); bool _doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr &peer);