This commit is contained in:
Adam Ierymenko 2022-02-01 16:03:04 -05:00
commit 25e14e2164
5 changed files with 70 additions and 40 deletions

View File

@ -123,12 +123,12 @@ extern "C" {
/** /**
* Maximum number of pushed routes on a network * Maximum number of pushed routes on a network
*/ */
#define ZT_MAX_NETWORK_ROUTES 32 #define ZT_MAX_NETWORK_ROUTES 128
/** /**
* Maximum number of statically assigned IP addresses per network endpoint using ZT address management (not DHCP) * Maximum number of statically assigned IP addresses per network endpoint using ZT address management (not DHCP)
*/ */
#define ZT_MAX_ZT_ASSIGNED_ADDRESSES 16 #define ZT_MAX_ZT_ASSIGNED_ADDRESSES 32
/** /**
* Maximum number of "specialists" on a network -- bridges, relays, etc. * Maximum number of "specialists" on a network -- bridges, relays, etc.

View File

@ -99,21 +99,21 @@ SharedPtr<Bond> Bond::createTransportTriggeredBond(const RuntimeEnvironment* ren
if (! _policyTemplateAssignments.count(identity)) { if (! _policyTemplateAssignments.count(identity)) {
if (_defaultPolicy) { if (_defaultPolicy) {
bond = new Bond(renv, _defaultPolicy, peer); bond = new Bond(renv, _defaultPolicy, peer);
bond->log("new default bond"); bond->debug("new default bond");
} }
if (! _defaultPolicy && _defaultPolicyStr.length()) { if (! _defaultPolicy && _defaultPolicyStr.length()) {
bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer); bond = new Bond(renv, _bondPolicyTemplates[_defaultPolicyStr].ptr(), peer);
bond->log("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str()); bond->debug("new default custom bond (based on %s)", bond->getPolicyStrByCode(bond->policy()).c_str());
} }
} }
else { else {
if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) { if (! _bondPolicyTemplates[_policyTemplateAssignments[identity]]) {
bond = new Bond(renv, _defaultPolicy, peer); bond = new Bond(renv, _defaultPolicy, peer);
bond->log("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str()); bond->debug("peer-specific bond, was specified as %s but the bond definition was not found, using default %s", _policyTemplateAssignments[identity].c_str(), getPolicyStrByCode(_defaultPolicy).c_str());
} }
else { else {
bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer); bond = new Bond(renv, _bondPolicyTemplates[_policyTemplateAssignments[identity]].ptr(), peer);
bond->log("new default bond"); bond->debug("new default bond");
} }
} }
} }
@ -453,7 +453,7 @@ void Bond::receivedQoS(const SharedPtr<Path>& path, int64_t now, int count, uint
if (pathIdx == ZT_MAX_PEER_NETWORK_PATHS) { if (pathIdx == ZT_MAX_PEER_NETWORK_PATHS) {
return; return;
} }
// log("received QoS packet (sampling %d frames) via %s", count, pathToStr(path).c_str()); // debug("received QoS packet (sampling %d frames) via %s", count, pathToStr(path).c_str());
// Look up egress times and compute latency values for each record // Look up egress times and compute latency values for each record
std::map<uint64_t, uint64_t>::iterator it; std::map<uint64_t, uint64_t>::iterator it;
for (int j = 0; j < count; j++) { for (int j = 0; j < count; j++) {
@ -488,10 +488,10 @@ int32_t Bond::generateQoSPacket(int pathIdx, int64_t now, char* qosBuffer)
bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now) bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now)
{ {
if (! _numBondedPaths) { if (! _numBondedPaths) {
log("unable to assign flow %x (bond has no links)\n", flow->id); debug("unable to assign flow %x (bond has no links)\n", flow->id);
return false; return false;
} }
unsigned int idx = ZT_MAX_PEER_NETWORK_PATHS; unsigned int idx = ZT_MAX_PEER_NETWORK_PATHS;
if (_policy == ZT_BOND_POLICY_BALANCE_XOR) { if (_policy == ZT_BOND_POLICY_BALANCE_XOR) {
idx = abs((int)(flow->id % (_numBondedPaths))); idx = abs((int)(flow->id % (_numBondedPaths)));
@ -530,29 +530,29 @@ bool Bond::assignFlowToBondedPath(SharedPtr<Flow>& flow, int64_t now)
++(_paths[idx].assignedFlowCount); ++(_paths[idx].assignedFlowCount);
} }
else { else {
log("unable to assign out-flow %x (unknown reason)", flow->id); debug("unable to assign out-flow %x (unknown reason)", flow->id);
return false; return false;
} }
} }
if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) { if (_policy == ZT_BOND_POLICY_ACTIVE_BACKUP) {
if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) { if (_abPathIdx == ZT_MAX_PEER_NETWORK_PATHS) {
log("unable to assign out-flow %x (no active backup link)", flow->id); debug("unable to assign out-flow %x (no active backup link)", flow->id);
} }
flow->assignPath(_abPathIdx, now); flow->assignPath(_abPathIdx, now);
} }
SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[flow->assignedPath].p->localSocket()); SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[flow->assignedPath].p->localSocket());
log("assign out-flow %04x to link %s (%lu / %lu flows)", flow->id, pathToStr(_paths[flow->assignedPath].p).c_str(), _paths[flow->assignedPath].assignedFlowCount, (unsigned long)_flows.size()); debug("assign out-flow %04x to link %s (%lu / %lu flows)", flow->id, pathToStr(_paths[flow->assignedPath].p).c_str(), _paths[flow->assignedPath].assignedFlowCount, (unsigned long)_flows.size());
return true; return true;
} }
SharedPtr<Bond::Flow> Bond::createFlow(int pathIdx, int32_t flowId, unsigned char entropy, int64_t now) SharedPtr<Bond::Flow> Bond::createFlow(int pathIdx, int32_t flowId, unsigned char entropy, int64_t now)
{ {
if (! _numBondedPaths) { if (! _numBondedPaths) {
log("unable to assign flow %x (bond has no links)\n", flowId); debug("unable to assign flow %x (bond has no links)\n", flowId);
return SharedPtr<Flow>(); return SharedPtr<Flow>();
} }
if (_flows.size() >= ZT_FLOW_MAX_COUNT) { if (_flows.size() >= ZT_FLOW_MAX_COUNT) {
log("forget oldest flow (max flows reached: %d)\n", ZT_FLOW_MAX_COUNT); debug("forget oldest flow (max flows reached: %d)\n", ZT_FLOW_MAX_COUNT);
forgetFlowsWhenNecessary(0, true, now); forgetFlowsWhenNecessary(0, true, now);
} }
SharedPtr<Flow> flow = new Flow(flowId, now); SharedPtr<Flow> flow = new Flow(flowId, now);
@ -565,7 +565,7 @@ SharedPtr<Bond::Flow> Bond::createFlow(int pathIdx, int32_t flowId, unsigned cha
if (pathIdx != ZT_MAX_PEER_NETWORK_PATHS) { if (pathIdx != ZT_MAX_PEER_NETWORK_PATHS) {
flow->assignPath(pathIdx, now); flow->assignPath(pathIdx, now);
_paths[pathIdx].assignedFlowCount++; _paths[pathIdx].assignedFlowCount++;
log("assign in-flow %x to link %s (%lu / %lu)", flow->id, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].assignedFlowCount, (unsigned long)_flows.size()); debug("assign in-flow %x to link %s (%lu / %lu)", flow->id, pathToStr(_paths[pathIdx].p).c_str(), _paths[pathIdx].assignedFlowCount, (unsigned long)_flows.size());
} }
/** /**
* Add a flow when no path was provided. This means that it is an outgoing packet * Add a flow when no path was provided. This means that it is an outgoing packet
@ -585,7 +585,7 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
if (age) { // Remove by specific age if (age) { // Remove by specific age
while (it != _flows.end()) { while (it != _flows.end()) {
if (it->second->age(now) > age) { if (it->second->age(now) > age) {
log("forget flow %x (age %llu) (%lu / %lu)", it->first, (unsigned long long)it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (unsigned long)(_flows.size() - 1)); debug("forget flow %x (age %llu) (%lu / %lu)", it->first, (unsigned long long)it->second->age(now), _paths[it->second->assignedPath].assignedFlowCount, (unsigned long)(_flows.size() - 1));
_paths[it->second->assignedPath].assignedFlowCount--; _paths[it->second->assignedPath].assignedFlowCount--;
it = _flows.erase(it); it = _flows.erase(it);
} }
@ -604,7 +604,7 @@ void Bond::forgetFlowsWhenNecessary(uint64_t age, bool oldest, int64_t now)
++it; ++it;
} }
if (oldestFlow != _flows.end()) { if (oldestFlow != _flows.end()) {
log("forget oldest flow %x (age %llu) (total flows: %lu)", oldestFlow->first, (unsigned long long)oldestFlow->second->age(now), (unsigned long)(_flows.size() - 1)); debug("forget oldest flow %x (age %llu) (total flows: %lu)", oldestFlow->first, (unsigned long long)oldestFlow->second->age(now), (unsigned long)(_flows.size() - 1));
_paths[oldestFlow->second->assignedPath].assignedFlowCount--; _paths[oldestFlow->second->assignedPath].assignedFlowCount--;
_flows.erase(oldestFlow); _flows.erase(oldestFlow);
} }
@ -629,20 +629,20 @@ void Bond::processIncomingPathNegotiationRequest(uint64_t now, SharedPtr<Path>&
SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[pathIdx].p->localSocket()); SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[pathIdx].p->localSocket());
if (remoteUtility > _localUtility) { if (remoteUtility > _localUtility) {
_paths[pathIdx].p->address().toString(pathStr); _paths[pathIdx].p->address().toString(pathStr);
log("peer suggests alternate link %s/%s, remote utility (%d) greater than local utility (%d), switching to suggested link\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); debug("peer suggests alternate link %s/%s, remote utility (%d) greater than local utility (%d), switching to suggested link\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
_negotiatedPathIdx = pathIdx; _negotiatedPathIdx = pathIdx;
} }
if (remoteUtility < _localUtility) { if (remoteUtility < _localUtility) {
log("peer suggests alternate link %s/%s, remote utility (%d) less than local utility (%d), not switching\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); debug("peer suggests alternate link %s/%s, remote utility (%d) less than local utility (%d), not switching\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
} }
if (remoteUtility == _localUtility) { if (remoteUtility == _localUtility) {
log("peer suggests alternate link %s/%s, remote utility (%d) equal to local utility (%d)\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility); debug("peer suggests alternate link %s/%s, remote utility (%d) equal to local utility (%d)\n", link->ifname().c_str(), pathStr, remoteUtility, _localUtility);
if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) { if (_peer->_id.address().toInt() > RR->node->identity().address().toInt()) {
log("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); debug("agree with peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
_negotiatedPathIdx = pathIdx; _negotiatedPathIdx = pathIdx;
} }
else { else {
log("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr); debug("ignore petition from peer to use alternate link %s/%s\n", link->ifname().c_str(), pathStr);
} }
} }
} }
@ -704,7 +704,7 @@ void Bond::pathNegotiationCheck(void* tPtr, int64_t now)
void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, int pathIdx) void Bond::sendPATH_NEGOTIATION_REQUEST(void* tPtr, int pathIdx)
{ {
log("send link negotiation request to peer via link %s, local utility is %d", pathToStr(_paths[pathIdx].p).c_str(), _localUtility); debug("send link negotiation request to peer via link %s, local utility is %d", pathToStr(_paths[pathIdx].p).c_str(), _localUtility);
if (_abLinkSelectMethod != ZT_BOND_RESELECTION_POLICY_OPTIMIZE) { if (_abLinkSelectMethod != ZT_BOND_RESELECTION_POLICY_OPTIMIZE) {
return; return;
} }
@ -735,7 +735,7 @@ void Bond::sendQOS_MEASUREMENT(void* tPtr, int pathIdx, int64_t localSocket, con
_paths[pathIdx].packetsReceivedSinceLastQoS = 0; _paths[pathIdx].packetsReceivedSinceLastQoS = 0;
_paths[pathIdx].lastQoSMeasurement = now; _paths[pathIdx].lastQoSMeasurement = now;
} }
// log("send QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len); // debug("send QOS via link %s (len=%d)", pathToStr(_paths[pathIdx].p).c_str(), len);
} }
void Bond::processBackgroundBondTasks(void* tPtr, int64_t now) void Bond::processBackgroundBondTasks(void* tPtr, int64_t now)
@ -762,9 +762,9 @@ void Bond::processBackgroundBondTasks(void* tPtr, int64_t now)
outp.armor(_peer->key(), true, _peer->aesKeysIfSupported()); outp.armor(_peer->key(), true, _peer->aesKeysIfSupported());
RR->node->expectReplyTo(outp.packetId()); RR->node->expectReplyTo(outp.packetId());
RR->node->putPacket(tPtr, _paths[i].p->localSocket(), _paths[i].p->address(), outp.data(), outp.size()); RR->node->putPacket(tPtr, _paths[i].p->localSocket(), _paths[i].p->address(), outp.data(), outp.size());
_paths[i].p->_lastOut = now; _paths[i].p->_lastOut = now;
_overheadBytes += outp.size(); _overheadBytes += outp.size();
log("sent ECHO via link %s", pathToStr(_paths[i].p).c_str()); debug("sent ECHO via link %s", pathToStr(_paths[i].p).c_str());
} }
} }
// QOS // QOS
@ -833,7 +833,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
// currEligibility = false; // currEligibility = false;
//} //}
currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay) || inTrial); currEligibility = _paths[i].allowed() && ((acceptableAge && satisfiedUpDelay) || inTrial);
// log("[%d] allowed=%d, acceptableAge=%d, satisfiedUpDelay=%d, inTrial=%d ==== %d", i, _paths[i].allowed(), acceptableAge, satisfiedUpDelay, inTrial, currEligibility); // debug("[%d] allowed=%d, acceptableAge=%d, satisfiedUpDelay=%d, inTrial=%d ==== %d", i, _paths[i].allowed(), acceptableAge, satisfiedUpDelay, inTrial, currEligibility);
/** /**
* Note eligibility state change (if any) and take appropriate action * Note eligibility state change (if any) and take appropriate action
@ -853,7 +853,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
_paths[i].adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, ! currEligibility); _paths[i].adjustRefractoryPeriod(now, _defaultPathRefractoryPeriod, ! currEligibility);
if (_paths[i].bonded) { if (_paths[i].bonded) {
if (_allowFlowHashing) { if (_allowFlowHashing) {
log("link %s was bonded, flow reallocation will occur soon", pathToStr(_paths[i].p).c_str()); debug("link %s was bonded, flow reallocation will occur soon", pathToStr(_paths[i].p).c_str());
rebuildBond = true; rebuildBond = true;
_paths[i].shouldReallocateFlows = _paths[i].bonded; _paths[i].shouldReallocateFlows = _paths[i].bonded;
} }
@ -909,7 +909,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
rebuildBond = true; rebuildBond = true;
} }
if (rebuildBond) { if (rebuildBond) {
log("rebuilding bond"); debug("rebuilding bond");
int updatedBondedPathCount = 0; int updatedBondedPathCount = 0;
// Build map associating paths with local physical links. Will be selected from in next step // Build map associating paths with local physical links. Will be selected from in next step
std::map<SharedPtr<Link>, std::vector<int> > linkMap; std::map<SharedPtr<Link>, std::vector<int> > linkMap;
@ -934,7 +934,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
} }
addPathToBond(idx, updatedBondedPathCount); addPathToBond(idx, updatedBondedPathCount);
++updatedBondedPathCount; ++updatedBondedPathCount;
log("add %s (no user addr preference)", pathToStr(_paths[idx].p).c_str()); debug("add %s (no user addr preference)", pathToStr(_paths[idx].p).c_str());
} }
} }
// If the user prefers to only use one address type (IPv4 or IPv6) // If the user prefers to only use one address type (IPv4 or IPv6)
@ -945,12 +945,12 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
continue; continue;
} }
if (! _paths[idx].allowed()) { if (! _paths[idx].allowed()) {
log("did not add %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref); debug("did not add %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref);
continue; continue;
} }
addPathToBond(idx, updatedBondedPathCount); addPathToBond(idx, updatedBondedPathCount);
++updatedBondedPathCount; ++updatedBondedPathCount;
log("add path %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref); debug("add path %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref);
} }
} }
// If the users prefers one address type to another, try to find at least // If the users prefers one address type to another, try to find at least
@ -966,13 +966,13 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
if (_paths[idx].preferred()) { if (_paths[idx].preferred()) {
addPathToBond(idx, updatedBondedPathCount); addPathToBond(idx, updatedBondedPathCount);
++updatedBondedPathCount; ++updatedBondedPathCount;
log("add %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref); debug("add %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref);
foundPreferredPath = true; foundPreferredPath = true;
} }
} }
// Unable to find a path that matches user preference, settle for another address type // Unable to find a path that matches user preference, settle for another address type
if (! foundPreferredPath) { if (! foundPreferredPath) {
log("did not find first-choice path type on link %s (user preference %d)", link->ifname().c_str(), ipvPref); debug("did not find first-choice path type on link %s (user preference %d)", link->ifname().c_str(), ipvPref);
for (int j = 0; j < it->second.size(); j++) { for (int j = 0; j < it->second.size(); j++) {
int idx = it->second.at(j); int idx = it->second.at(j);
if (! _paths[idx].p || ! _paths[idx].eligible) { if (! _paths[idx].p || ! _paths[idx].eligible) {
@ -980,7 +980,7 @@ void Bond::curateBond(int64_t now, bool rebuildBond)
} }
addPathToBond(idx, updatedBondedPathCount); addPathToBond(idx, updatedBondedPathCount);
++updatedBondedPathCount; ++updatedBondedPathCount;
log("add %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref); debug("add %s (user addr preference %d)", pathToStr(_paths[idx].p).c_str(), ipvPref);
foundPreferredPath = true; foundPreferredPath = true;
} }
} }
@ -1234,7 +1234,7 @@ void Bond::processActiveBackupTasks(void* tPtr, int64_t now)
* simply find the next eligible path. * simply find the next eligible path.
*/ */
if (! userHasSpecifiedLinks()) { if (! userHasSpecifiedLinks()) {
log("no user-specified links"); debug("no user-specified links");
for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) { for (int i = 0; i < ZT_MAX_PEER_NETWORK_PATHS; ++i) {
if (_paths[i].p && _paths[i].eligible) { if (_paths[i].p && _paths[i].eligible) {
SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket()); SharedPtr<Link> link = RR->bc->getLinkBySocket(_policyAlias, _paths[i].p->localSocket());

View File

@ -1159,6 +1159,36 @@ class Bond {
va_end(args); va_end(args);
RR->t->bondStateMessage(NULL, traceMsg); RR->t->bondStateMessage(NULL, traceMsg);
#undef MAX_MSG_LEN #undef MAX_MSG_LEN
#endif
}
/**
* Emit message to tracing system but with added timestamp and subsystem info
*
* TODO: Will be replaced when better logging facilities exist in Trace.hpp
*/
void debug(const char* fmt, ...)
{
#ifdef ZT_DEBUG
time_t rawtime;
struct tm* timeinfo;
char timestamp[80];
time(&rawtime);
timeinfo = localtime(&rawtime);
strftime(timestamp, 80, "%F %T", timeinfo);
#define MAX_BOND_MSG_LEN 1024
char traceMsg[MAX_BOND_MSG_LEN];
char userMsg[MAX_BOND_MSG_LEN];
va_list args;
va_start(args, fmt);
if (vsnprintf(userMsg, sizeof(userMsg), fmt, args) < 0) {
fprintf(stderr, "Encountered format encoding error while writing to trace log\n");
return;
}
snprintf(traceMsg, MAX_BOND_MSG_LEN, "%s (%llx/%s) %s", timestamp, _peerId, _policyAlias.c_str(), userMsg);
va_end(args);
RR->t->bondStateMessage(NULL, traceMsg);
#undef MAX_MSG_LEN
#endif #endif
} }

View File

@ -523,7 +523,7 @@
/** /**
* Maximum number of VERB_NETWORK_CREDENTIALS within cutoff time * Maximum number of VERB_NETWORK_CREDENTIALS within cutoff time
*/ */
#define ZT_PEER_CREDEITIALS_CUTOFF_LIMIT 15 #define ZT_PEER_CREDENTIALS_CUTOFF_LIMIT 15
/** /**
* WHOIS rate limit (we allow these to be pretty fast) * WHOIS rate limit (we allow these to be pretty fast)

View File

@ -391,7 +391,7 @@ public:
++_credentialsCutoffCount; ++_credentialsCutoffCount;
else _credentialsCutoffCount = 0; else _credentialsCutoffCount = 0;
_lastCredentialsReceived = now; _lastCredentialsReceived = now;
return (_directPathPushCutoffCount < ZT_PEER_CREDEITIALS_CUTOFF_LIMIT); return (_credentialsCutoffCount < ZT_PEER_CREDENTIALS_CUTOFF_LIMIT);
} }
/** /**