Added VERB_ACK and VERB_QOS_MEASUREMENT, refined notion of path quality

This commit is contained in:
Joseph Henry 2018-05-30 17:45:29 -07:00
parent 8199dbd0dc
commit 46a7a2be2e
18 changed files with 954 additions and 720 deletions

View File

@ -86,7 +86,7 @@
#include <android/log.h>
#define ZT_LOG_TAG "ZTSDK"
#endif
#if defined(ZT_TRACE)
#if defined(ZT_DEBUG_TRACE)
#if ZT_MSG_INFO == true
#if defined(__ANDROID__)
#define DEBUG_INFO(fmt, args...) ((void)__android_log_print(ANDROID_LOG_VERBOSE, ZT_LOG_TAG, \

View File

@ -45,6 +45,9 @@ ONE_OBJS+=ext/http-parser/http_parser.o
ifeq ($(ZT_TRACE),1)
override DEFS+=-DZT_TRACE
endif
ifeq ($(ZT_DEBUG_TRACE),1)
DEFS+=-DZT_DEBUG_TRACE
endif
ifeq ($(ZT_RULES_ENGINE_DEBUGGING),1)
override DEFS+=-DZT_RULES_ENGINE_DEBUGGING

View File

@ -43,7 +43,10 @@ ONE_OBJS+=ext/libnatpmp/natpmp.o ext/libnatpmp/getgateway.o ext/miniupnpc/connec
# Build with address sanitization library for advanced debugging (clang)
ifeq ($(ZT_SANITIZE),1)
SANFLAGS+=-fsanitize=address -DASAN_OPTIONS=symbolize=1
DEFS+=-fsanitize=address -DASAN_OPTIONS=symbolize=1
endif
ifeq ($(ZT_DEBUG_TRACE),1)
DEFS+=-DZT_DEBUG_TRACE
endif
# Debug mode -- dump trace output, build binary with -g
ifeq ($(ZT_DEBUG),1)

View File

@ -267,11 +267,6 @@
*/
#define ZT_PING_CHECK_INVERVAL 5000
/**
* Length of interface name
*/
#define ZT_PATH_INTERFACE_NAME_SZ 16
/**
* How frequently to check for changes to the system's network interfaces. When
* the service decides to use this constant it's because we want to react more
@ -285,27 +280,6 @@
*/
#define ZT_MULTIPATH_PROPORTION_WIN_SZ 128
/**
* Threshold for flow to be considered balanced.
*/
#define ZT_MULTIPATH_FLOW_BALANCE_THESHOLD 0.80
/**
* Number of samples to consider when computing path statistics
*/
#define ZT_PATH_QUALITY_METRIC_WIN_SZ 64
/**
* How often important path metrics are sampled (in ms). These metrics are later used
* for path quality estimates
*/
#define ZT_PATH_QUALITY_SAMPLE_INTERVAL 100
/**
* How often new path quality estimates are computed
*/
#define ZT_PATH_QUALITY_ESTIMATE_INTERVAL 100
/**
* How often we will sample packet latency. Should be at least greater than ZT_PING_CHECK_INVERVAL
* since we will record a 0 bit/s measurement if no valid latency measurement was made within this
@ -317,47 +291,85 @@
* Interval used for rate-limiting the computation of path quality estimates. Set at 0
* to compute as new packets arrive with no delay.
*/
#define ZT_PATH_QUALITY_COMPUTE_INTERVAL 0
#define ZT_PATH_QUALITY_COMPUTE_INTERVAL 1000
/**
* Path error rate history window size. This is used to keep track of packet error
* measurements over a path's medium-term history.
* Number of samples to consider when computing real-time path statistics
*/
#define ZT_PATH_ERROR_HIST_WIN_SZ 10
#define ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ 128
/**
* The number of packet error measurements in each sample
* Number of samples to consider when computing performing long-term path quality analysis.
* By default this value is set to ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ but can
* be set to any value greater than that to observe longer-term path quality behavior.
*/
#define ZT_PATH_ERROR_SAMPLE_WIN_SZ 1024
#define ZT_PATH_QUALITY_METRIC_WIN_SZ ZT_PATH_QUALITY_METRIC_REALTIME_CONSIDERATION_WIN_SZ
/**
* How often a peer will prune its own paths. Pruning is important when multipath is
* enabled because we want to prevent the allocation algorithms from sending anything
* out on known dead paths. Additionally, quickly marking paths as dead helps when
* a new path is learned and needs to replace an older path.
* Maximum acceptable Packet Delay Variance (PDV) over a path
*/
#define ZT_CLOSED_PATH_PRUNING_INTERVAL 1000
#define ZT_PATH_MAX_PDV 1000
/**
* Datagram used to test link throughput. Contents are random.
* Maximum acceptable time interval between expectation and receipt of at least one ACK over a path
*/
#define ZT_LINK_TEST_DATAGRAM_SZ 1024
#define ZT_PATH_MAX_AGE 30000
/**
* Size of datagram expected as a reply to a link speed test
* Maximum acceptable mean latency over a path
*/
#define ZT_LINK_TEST_DATAGRAM_RESPONSE_SZ 8
#define ZT_PATH_MAX_MEAN_LATENCY 1000
/**
* Time before a link test datagram is considered lost. Any corresponding
* timing records that would have been used to compute a RTT are purged.
* How much each factor contributes to the "stability" score of a path
*/
#define ZT_LINK_TEST_TIMEOUT 10000
#define ZT_PATH_CONTRIB_PDV 1.0 / 3.0
#define ZT_PATH_CONTRIB_LATENCY 1.0 / 3.0
#define ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE 1.0 / 3.0
/**
* How often the service tests the link throughput.
* How much each factor contributes to the "quality" score of a path
*/
#define ZT_LINK_SPEED_TEST_INTERVAL 1000
#define ZT_PATH_CONTRIB_STABILITY 0.75 / 3.0
#define ZT_PATH_CONTRIB_THROUGHPUT 1.50 / 3.0
#define ZT_PATH_CONTRIB_SCOPE 0.75 / 3.0
/**
* Min and max acceptable sizes for a VERB_QOS_MEASUREMENT packet
*/
#define ZT_PATH_MIN_QOS_PACKET_SZ 8 + 1
#define ZT_PATH_MAX_QOS_PACKET_SZ 1400
/**
* How many ID:sojourn time pairs in a single QoS packet
*/
#define ZT_PATH_QOS_TABLE_SIZE (ZT_PATH_MAX_QOS_PACKET_SZ * 8) / (64 + 8)
/**
* How often the service tests the path throughput
*/
#define ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL ZT_PATH_ACK_INTERVAL * 8
/**
* Minimum amount of time between each ACK packet
*/
#define ZT_PATH_ACK_INTERVAL 250
/**
* How often a QoS packet is sent
*/
#define ZT_PATH_QOS_INTERVAL 1000
/**
* How often an aggregate link statistics report is emitted into this tracing system
*/
#define ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL 60000
/**
* How much an aggregate link's component paths can vary from their target allocation
* before the link is considered to be in a state of imbalance.
*/
#define ZT_PATH_IMBALANCE_THRESHOLD 0.20
/**
* How frequently to send heartbeats over in-use paths

View File

@ -80,7 +80,7 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
if (!trusted) {
if (!dearmor(peer->key())) {
RR->t->incomingPacketMessageAuthenticationFailure(tPtr,_path,packetId(),sourceAddress,hops(),"invalid MAC");
_path->recordPacket(false);
_path->recordInvalidPacket();
return true;
}
}
@ -90,15 +90,15 @@ bool IncomingPacket::tryDecode(const RuntimeEnvironment *RR,void *tPtr)
return true;
}
_path->recordPacket(true);
const Packet::Verb v = verb();
switch(v) {
//case Packet::VERB_NOP:
default: // ignore unknown verbs, but if they pass auth check they are "received"
peer->received(tPtr,_path,hops(),packetId(),v,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),v,0,Packet::VERB_NOP,false,0);
return true;
case Packet::VERB_HELLO: return _doHELLO(RR,tPtr,true);
case Packet::VERB_ACK: return _doACK(RR,tPtr,peer);
case Packet::VERB_QOS_MEASUREMENT: return _doQOS_MEASUREMENT(RR,tPtr,peer);
case Packet::VERB_ERROR: return _doERROR(RR,tPtr,peer);
case Packet::VERB_OK: return _doOK(RR,tPtr,peer);
case Packet::VERB_WHOIS: return _doWHOIS(RR,tPtr,peer);
@ -197,11 +197,55 @@ bool IncomingPacket::_doERROR(const RuntimeEnvironment *RR,void *tPtr,const Shar
default: break;
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_ERROR,inRePacketId,inReVerb,false,networkId);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_ERROR,inRePacketId,inReVerb,false,networkId);
return true;
}
bool IncomingPacket::_doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
{
/* Dissect incoming ACK packet. From this we can estimate current throughput of the path, establish known
* maximums and detect packet loss. */
if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
int32_t ackedBytes;
memcpy(&ackedBytes, payload(), sizeof(int32_t));
_path->receivedAck(RR->node->now(), Utils::ntoh(ackedBytes));
}
return true;
}
bool IncomingPacket::_doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer)
{
/* Dissect incoming QoS packet. From this we can compute latency values and their variance.
* The latency variance is used as a measure of "jitter". */
if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
if (payloadLength() < ZT_PATH_MAX_QOS_PACKET_SZ && payloadLength() > ZT_PATH_MIN_QOS_PACKET_SZ) {
const int64_t now = RR->node->now();
uint64_t rx_id[ZT_PATH_QOS_TABLE_SIZE];
uint8_t rx_ts[ZT_PATH_QOS_TABLE_SIZE];
char *begin = (char *)payload();
char *ptr = begin;
int count = 0;
int len = payloadLength();
// Read packet IDs and latency compensation intervals for each packet tracked by thie QoS packet
while (ptr < (begin + len)) {
memcpy((void*)&rx_id[count], ptr, sizeof(uint64_t));
rx_id[count] = Utils::ntoh(rx_id[count]);
ptr+=sizeof(uint64_t);
memcpy((void*)&rx_ts[count], ptr, sizeof(uint8_t));
ptr+=sizeof(uint8_t);
count++;
}
_path->receivedQoS(now, count, rx_id, rx_ts);
}
}
return true;
}
bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool alreadyAuthenticated)
{
const int64_t now = RR->node->now();
@ -398,7 +442,7 @@ bool IncomingPacket::_doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool
_path->send(RR,tPtr,outp.data(),outp.size(),now);
peer->setRemoteVersion(protoVersion,vMajor,vMinor,vRevision); // important for this to go first so received() knows the version
peer->received(tPtr,_path,hops(),pid,Packet::VERB_HELLO,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),pid,payloadLength(),Packet::VERB_HELLO,0,Packet::VERB_NOP,false,0);
return true;
}
@ -448,8 +492,9 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
}
}
if (!hops())
if (!hops() && (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE)) {
_path->updateLatency((unsigned int)latency, RR->node->now());
}
peer->setRemoteVersion(vProto,vMajor,vMinor,vRevision);
@ -510,7 +555,7 @@ bool IncomingPacket::_doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedP
default: break;
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_OK,inRePacketId,inReVerb,false,networkId);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_OK,inRePacketId,inReVerb,false,networkId);
return true;
}
@ -545,7 +590,7 @@ bool IncomingPacket::_doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const Shar
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_WHOIS,0,Packet::VERB_NOP,false,0);
return true;
}
@ -569,7 +614,7 @@ bool IncomingPacket::_doRENDEZVOUS(const RuntimeEnvironment *RR,void *tPtr,const
}
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_RENDEZVOUS,0,Packet::VERB_NOP,false,0);
return true;
}
@ -598,7 +643,7 @@ bool IncomingPacket::_doFRAME(const RuntimeEnvironment *RR,void *tPtr,const Shar
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_FRAME,0,Packet::VERB_NOP,trustEstablished,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_FRAME,0,Packet::VERB_NOP,trustEstablished,nwid);
return true;
}
@ -621,7 +666,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
if (!network->gate(tPtr,peer)) {
RR->t->incomingNetworkAccessDenied(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,true);
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
@ -633,7 +678,7 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
const uint8_t *const frameData = (const uint8_t *)field(comLen + ZT_PROTO_VERB_EXT_FRAME_IDX_PAYLOAD,frameLen);
if ((!from)||(from == network->mac())) {
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
@ -644,19 +689,19 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
network->learnBridgeRoute(from,peer->address());
} else {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"bridging not allowed (remote)");
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
} else if (to != network->mac()) {
if (to.isMulticast()) {
if (network->config().multicastLimit == 0) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"multicast disabled");
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
} else if (!network->config().permitsBridging(RR->identity.address())) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_EXT_FRAME,from,to,"bridging not allowed (local)");
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
}
@ -676,9 +721,9 @@ bool IncomingPacket::_doEXT_FRAME(const RuntimeEnvironment *RR,void *tPtr,const
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,true,nwid);
} else {
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_EXT_FRAME,0,Packet::VERB_NOP,false,nwid);
}
return true;
@ -698,7 +743,7 @@ bool IncomingPacket::_doECHO(const RuntimeEnvironment *RR,void *tPtr,const Share
outp.armor(peer->key(),true);
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
peer->received(tPtr,_path,hops(),pid,Packet::VERB_ECHO,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),pid,payloadLength(),Packet::VERB_ECHO,0,Packet::VERB_NOP,false,0);
return true;
}
@ -743,7 +788,7 @@ bool IncomingPacket::_doMULTICAST_LIKE(const RuntimeEnvironment *RR,void *tPtr,c
}
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_LIKE,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
return true;
}
@ -866,7 +911,7 @@ bool IncomingPacket::_doNETWORK_CREDENTIALS(const RuntimeEnvironment *RR,void *t
}
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_NETWORK_CREDENTIALS,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_NETWORK_CREDENTIALS,0,Packet::VERB_NOP,trustEstablished,(network) ? network->id() : 0);
return true;
}
@ -892,7 +937,7 @@ bool IncomingPacket::_doNETWORK_CONFIG_REQUEST(const RuntimeEnvironment *RR,void
_path->send(RR,tPtr,outp.data(),outp.size(),RR->node->now());
}
peer->received(tPtr,_path,hopCount,requestPacketId,Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,false,nwid);
peer->received(tPtr,_path,hopCount,requestPacketId,payloadLength(),Packet::VERB_NETWORK_CONFIG_REQUEST,0,Packet::VERB_NOP,false,nwid);
return true;
}
@ -913,7 +958,7 @@ bool IncomingPacket::_doNETWORK_CONFIG(const RuntimeEnvironment *RR,void *tPtr,c
}
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_NETWORK_CONFIG,0,Packet::VERB_NOP,false,(network) ? network->id() : 0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_NETWORK_CONFIG,0,Packet::VERB_NOP,false,(network) ? network->id() : 0);
return true;
}
@ -956,7 +1001,7 @@ bool IncomingPacket::_doMULTICAST_GATHER(const RuntimeEnvironment *RR,void *tPtr
}
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_GATHER,0,Packet::VERB_NOP,trustEstablished,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_GATHER,0,Packet::VERB_NOP,trustEstablished,nwid);
return true;
}
@ -982,7 +1027,7 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
if (!network->gate(tPtr,peer)) {
RR->t->incomingNetworkAccessDenied(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,true);
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
@ -1006,19 +1051,19 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
if (network->config().multicastLimit == 0) {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"multicast disabled");
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
return true;
}
if ((frameLen > 0)&&(frameLen <= ZT_MAX_MTU)) {
if (!to.mac().isMulticast()) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"destination not multicast");
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
if ((!from)||(from.isMulticast())||(from == network->mac())) {
RR->t->incomingPacketInvalid(tPtr,_path,packetId(),source(),hops(),Packet::VERB_MULTICAST_FRAME,"invalid source MAC");
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
@ -1032,7 +1077,7 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
network->learnBridgeRoute(from,peer->address());
} else {
RR->t->incomingNetworkFrameDropped(tPtr,network,_path,packetId(),size(),peer->address(),Packet::VERB_MULTICAST_FRAME,from,to.mac(),"bridging not allowed (remote)");
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid); // trustEstablished because COM is okay
return true;
}
}
@ -1055,10 +1100,10 @@ bool IncomingPacket::_doMULTICAST_FRAME(const RuntimeEnvironment *RR,void *tPtr,
}
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,true,nwid);
} else {
_sendErrorNeedCredentials(RR,tPtr,peer,nwid);
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_MULTICAST_FRAME,0,Packet::VERB_NOP,false,nwid);
}
return true;
@ -1070,7 +1115,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
// First, subject this to a rate limit
if (!peer->rateGatePushDirectPaths(now)) {
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
return true;
}
@ -1094,7 +1139,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
case 4: {
const InetAddress a(field(ptr,4),4,at<uint16_t>(ptr + 4));
if (
((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
@ -1108,7 +1153,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
case 6: {
const InetAddress a(field(ptr,16),16,at<uint16_t>(ptr + 16));
if (
((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
((flags & ZT_PUSH_DIRECT_PATHS_FLAG_FORGET_PATH) == 0) && // not being told to forget
(!( ((flags & ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT) == 0) && (peer->hasActivePathTo(now,a)) )) && // not already known
(RR->node->shouldUsePathForZeroTierTraffic(tPtr,peer->address(),_path->localSocket(),a)) ) // should use path
{
@ -1123,7 +1168,7 @@ bool IncomingPacket::_doPUSH_DIRECT_PATHS(const RuntimeEnvironment *RR,void *tPt
ptr += addrLen;
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_PUSH_DIRECT_PATHS,0,Packet::VERB_NOP,false,0);
return true;
}
@ -1139,7 +1184,7 @@ bool IncomingPacket::_doUSER_MESSAGE(const RuntimeEnvironment *RR,void *tPtr,con
RR->node->postEvent(tPtr,ZT_EVENT_USER_MESSAGE,reinterpret_cast<const void *>(&um));
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_USER_MESSAGE,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_USER_MESSAGE,0,Packet::VERB_NOP,false,0);
return true;
}
@ -1163,7 +1208,7 @@ bool IncomingPacket::_doREMOTE_TRACE(const RuntimeEnvironment *RR,void *tPtr,con
}
}
peer->received(tPtr,_path,hops(),packetId(),Packet::VERB_REMOTE_TRACE,0,Packet::VERB_NOP,false,0);
peer->received(tPtr,_path,hops(),packetId(),payloadLength(),Packet::VERB_REMOTE_TRACE,0,Packet::VERB_NOP,false,0);
return true;
}

View File

@ -125,6 +125,8 @@ private:
// been authenticated, decrypted, decompressed, and classified.
bool _doERROR(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doHELLO(const RuntimeEnvironment *RR,void *tPtr,const bool alreadyAuthenticated);
bool _doACK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doQOS_MEASUREMENT(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doOK(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doWHOIS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);
bool _doRENDEZVOUS(const RuntimeEnvironment *RR,void *tPtr,const SharedPtr<Peer> &peer);

View File

@ -419,7 +419,7 @@ public:
template<unsigned int C2>
Fragment(const Buffer<C2> &b) :
Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
{
}
@ -930,7 +930,53 @@ public:
*/
VERB_PUSH_DIRECT_PATHS = 0x10,
// 0x11, 0x12 -- deprecated
// 0x11 -- deprecated
/**
* An acknowledgement of receipt of a series of recent packets from another
* peer. This is used to calculate relative throughput values and to detect
* packet loss. Only VERB_FRAME and VERB_EXT_FRAME packets are counted.
*
* ACK response format:
* <[4] 32-bit number of bytes received since last ACK>
*
* Upon receipt of this packet, the local peer will verify that the correct
* number of bytes were received by the remote peer. If these values do
* not agree that could be an indicator of packet loss.
*
* Additionally, the local peer knows the interval of time that has
* elapsed since the last received ACK. With this information it can compute
* a rough estimate of the current throughput.
*
* This is sent at a maximum rate of once per every ZT_PATH_ACK_INTERVAL
*/
VERB_ACK = 0x12,
/**
* A packet containing timing measurements useful for estimating path quality.
* Composed of a list of <packet ID:internal sojourn time> pairs for an
* arbitrary set of recent packets. This is used to sample for latency and
* packet delay variance (PDV, "jitter").
*
* QoS record format:
*
* <[8] 64-bit packet ID of previously-received packet>
* <[1] 8-bit packet sojourn time>
* <...repeat until end of max 1400 byte packet...>
*
* The number of possible records per QoS packet is: (1400 * 8) / 72 = 155
* This packet should be sent very rarely (every few seconds) as it can be
* somewhat large if the connection is saturated. Future versions might use
* a bloom table to probablistically determine these values in a vastly
* more space-efficient manner.
*
* Note: The 'internal packet sojourn time' is a slight misnomer as it is a
* measure of the amount of time between when a packet was received and the
* egress time of its tracking QoS packet.
*
* This is sent at a maximum rate of once per every ZT_PATH_QOS_INTERVAL
*/
VERB_QOS_MEASUREMENT = 0x13,
/**
* A message with arbitrary user-definable content:
@ -999,7 +1045,7 @@ public:
template<unsigned int C2>
Packet(const Buffer<C2> &b) :
Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(b)
{
}

View File

@ -102,24 +102,23 @@ public:
_latency(0xffff),
_addr(),
_ipScope(InetAddress::IP_SCOPE_NONE),
_currentPacketSampleCounter(0),
_meanPacketErrorRatio(0.0),
_meanLatency(0.0),
_lastLatencyUpdate(0),
_jitter(0.0),
_lastPathQualitySampleTime(0),
_lastComputedQuality(0.0),
_lastPathQualityEstimate(0),
_meanAge(0.0),
_lastAck(0),
_lastThroughputEstimation(0),
_lastQoSMeasurement(0),
_unackedBytes(0),
_expectingAckAsOf(0),
_packetsReceivedSinceLastAck(0),
_meanThroughput(0.0),
_packetLossRatio(0)
_maxLifetimeThroughput(0),
_bytesAckedSinceLastThroughputEstimation(0),
_meanLatency(0.0),
_packetDelayVariance(0.0),
_packetErrorRatio(0.0),
_packetLossRatio(0),
_lastComputedStability(0.0),
_lastComputedRelativeQuality(0)
{
memset(_ifname, 0, sizeof(_ifname));
memset(_addrString, 0, sizeof(_addrString));
_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
prepareBuffers();
}
Path(const int64_t localSocket,const InetAddress &addr) :
@ -131,37 +130,35 @@ public:
_latency(0xffff),
_addr(addr),
_ipScope(addr.ipScope()),
_currentPacketSampleCounter(0),
_meanPacketErrorRatio(0.0),
_meanLatency(0.0),
_lastLatencyUpdate(0),
_jitter(0.0),
_lastPathQualitySampleTime(0),
_lastComputedQuality(0.0),
_lastPathQualityEstimate(0),
_meanAge(0.0),
_lastAck(0),
_lastThroughputEstimation(0),
_lastQoSMeasurement(0),
_unackedBytes(0),
_expectingAckAsOf(0),
_packetsReceivedSinceLastAck(0),
_meanThroughput(0.0),
_packetLossRatio(0)
_maxLifetimeThroughput(0),
_bytesAckedSinceLastThroughputEstimation(0),
_meanLatency(0.0),
_packetDelayVariance(0.0),
_packetErrorRatio(0.0),
_packetLossRatio(0),
_lastComputedStability(0.0),
_lastComputedRelativeQuality(0)
{
memset(_ifname, 0, sizeof(_ifname));
memset(_addrString, 0, sizeof(_addrString));
_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_ageSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_errSamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
prepareBuffers();
}
~Path()
{
delete _throughputSamples;
delete _ageSamples;
delete _latencySamples;
delete _errSamples;
delete _qualitySamples;
delete _packetValiditySamples;
_throughputSamples = NULL;
_ageSamples = NULL;
_latencySamples = NULL;
_errSamples = NULL;
_qualitySamples = NULL;
_packetValiditySamples = NULL;
}
/**
@ -209,7 +206,6 @@ public:
else {
_latency = l;
}
_lastLatencyUpdate = now;
_latencySamples->push(l);
}
@ -299,195 +295,274 @@ public:
}
/**
* @return An estimate of path quality -- higher is better.
*/
inline float computeQuality(const int64_t now)
{
float latency_contrib = _meanLatency ? (float)1.0 / _meanLatency : 0;
float jitter_contrib = _jitter ? (float)1.0 / _jitter : 0;
float throughput_contrib = _meanThroughput ? _meanThroughput / 1000000 : 0; // in Mbps
float age_contrib = _meanAge > 0 ? (float)sqrt(_meanAge) : 1;
float error_contrib = (float)1.0 - _meanPacketErrorRatio;
float sum = (latency_contrib + jitter_contrib + throughput_contrib + error_contrib) / age_contrib;
_lastComputedQuality = sum * (long)((_ipScope) + 1);
return _lastComputedQuality;
}
/**
* Since quality estimates can become expensive we should cache the most recent result for traffic allocation
* algorithms which may need to reference this value multiple times through the course of their execution.
*/
inline float lastComputedQuality() {
return _lastComputedQuality;
}
/**
* @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
*/
inline char *getName() { return _ifname; }
/**
* @return Estimated throughput in bps of this link
*/
inline uint64_t getThroughput() { return _phy->getThroughput((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return Packet delay varience
*/
inline float jitter() { return _jitter; }
/**
* @return Previously-computed mean latency
*/
inline float meanLatency() { return _meanLatency; }
/**
* @return Packet loss rate
*/
inline float packetLossRatio() { return _packetLossRatio; }
/**
* @return Mean packet error ratio
*/
inline float meanPacketErrorRatio() { return _meanPacketErrorRatio; }
/**
* @return Current packet error ratio (possibly incomplete sample set)
*/
inline float currentPacketErrorRatio() {
int errorsPerSample = 0;
for (int i=0; i<_currentPacketSampleCounter; i++) {
if (_packetValidity[i] == false) {
errorsPerSample++;
}
}
return (float)errorsPerSample / (float)ZT_PATH_ERROR_SAMPLE_WIN_SZ;
}
/**
* @return Whether the Path's local socket is in a CLOSED state
*/
inline bool isClosed() { return _phy->isClosed((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return The state of a Path's local socket
*/
inline int getState() { return _phy->getState((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return Whether this socket may have been erased by the virtual physical link layer
*/
inline bool isValidState() { return _phy->isValidState((PhySocket *)((uintptr_t)_localSocket)); }
/**
* @return Whether the path quality monitors have collected enough data to provide a quality value
* TODO: expand this
*/
inline bool monitorsReady() {
return _latencySamples->count() && _ageSamples->count() && _throughputSamples->count();
}
/**
* @return A pointer to a cached copy of the address string for this Path (For debugging only)
*/
inline char *getAddressString() { return _addrString; }
/**
* Handle path sampling, computation of quality estimates, and other periodic tasks
* @param now Current time
*/
inline void measureLink(int64_t now) {
// Sample path properties and store them in a continuously-revolving buffer
if (now - _lastPathQualitySampleTime > ZT_PATH_QUALITY_SAMPLE_INTERVAL) {
_lastPathQualitySampleTime = now;
_throughputSamples->push(getThroughput()); // Thoughtput in bits/s
_ageSamples->push(now - _lastIn); // Age (time since last received packet)
if (now - _lastLatencyUpdate > ZT_PATH_LATENCY_SAMPLE_INTERVAL) {
_lastLatencyUpdate = now;
// Record 0 bp/s. Since we're using this to detect possible packet loss
updateLatency(0, now);
}
}
// Compute statistical values for use in link quality estimates
if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
_lastPathQualityComputeTime = now;
// Cache Path address string
address().toString(_addrString);
_phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, ZT_PATH_INTERFACE_NAME_SZ); // Cache Interface name
// Derived values
if (_throughputSamples->count()) {
_packetLossRatio = (float)_throughputSamples->zeroCount() / (float)_throughputSamples->count();
}
_meanThroughput = _throughputSamples->mean();
_meanAge = _ageSamples->mean();
_meanLatency = _latencySamples->mean();
// Jitter
// SEE: RFC 3393, RFC 4689
_jitter = _latencySamples->stddev();
_meanPacketErrorRatio = _errSamples->mean(); // Packet Error Ratio (PER)
}
// Periodically compute a path quality estimate
if (now - _lastPathQualityEstimate > ZT_PATH_QUALITY_ESTIMATE_INTERVAL) {
computeQuality(now);
}
}
/**
* @param buf Buffer to store resultant string
* @return Description of path, in ASCII string format
*/
inline char *toString(char *buf) {
sprintf(buf,"%6s, q=%8.3f, %5.3f Mb/s, j=%8.2f, ml=%8.2f, meanAge=%8.2f, addr=%45s",
getName(),
lastComputedQuality(),
(float)meanThroughput() / (float)1000000,
jitter(),
meanLatency(),
meanAge(),
getAddressString());
return buf;
}
/**
* Record whether a packet is considered invalid by MAC/compression/cipher checks. This
* could be an indication of a bit error. This function will keep a running counter of
* up to a given window size and with each counter overflow it will compute a mean error rate
* and store that in a continuously shifting sample window.
* Take note that we're expecting a VERB_ACK on this path as of a specific time
*
* @param isValid Whether the packet in question is considered invalid
* @param now Current time
* @param packetId ID of the packet
* @param payloadLength Number of bytes we're is expecting a reply to
*/
inline void recordPacket(bool isValid) {
if (_currentPacketSampleCounter < ZT_PATH_ERROR_SAMPLE_WIN_SZ) {
_packetValidity[_currentPacketSampleCounter] = isValid;
_currentPacketSampleCounter++;
}
else {
// Sample array is full, compute an mean and stick it in the ring buffer for trend analysis
_errSamples->push(currentPacketErrorRatio());
_currentPacketSampleCounter=0;
inline void expectingAck(int64_t now, int64_t packetId, uint16_t payloadLength)
{
_expectingAckAsOf = ackAge(now) > ZT_PATH_ACK_INTERVAL ? _expectingAckAsOf : now;
_unackedBytes += payloadLength;
_outgoingPacketRecords[packetId] = now;
}
/**
* Record that we've received a VERB_ACK on this path, also compute throughput if required.
*
* @param now Current time
* @param ackedBytes Number of bytes awknowledged by other peer
*/
inline void receivedAck(int64_t now, int32_t ackedBytes)
{
_expectingAckAsOf = 0;
_unackedBytes = (ackedBytes > _unackedBytes) ? 0 : _unackedBytes - ackedBytes;
int64_t timeSinceThroughputEstimate = (now - _lastThroughputEstimation);
if (timeSinceThroughputEstimate >= ZT_PATH_THROUGHPUT_MEASUREMENT_INTERVAL) {
uint64_t throughput = (float)(_bytesAckedSinceLastThroughputEstimation) / ((float)timeSinceThroughputEstimate / (float)1000);
_throughputSamples->push(throughput);
_maxLifetimeThroughput = throughput > _maxLifetimeThroughput ? throughput : _maxLifetimeThroughput;
_lastThroughputEstimation = now;
_bytesAckedSinceLastThroughputEstimation = 0;
} else {
_bytesAckedSinceLastThroughputEstimation += ackedBytes;
}
}
/**
* @return The mean age (in ms) of this link
* @return Number of bytes this peer is responsible for ACKing since last ACK
*/
inline float meanAge() { return _meanAge; }
inline int32_t bytesToAck()
{
int32_t bytesToAck = 0;
for (int i=0; i<_packetsReceivedSinceLastAck; i++) {
bytesToAck += _recorded_len[i];
}
return bytesToAck;
}
/**
* @return Number of bytes thusfar sent that have not been awknowledged by the remote peer
*/
inline int64_t unackedSentBytes()
{
return _unackedBytes;
}
/**
* Account for the fact that an ACK was just sent. Reset counters, timers, and clear statistics buffers
*
* @param Current time
*/
inline void sentAck(int64_t now)
{
memset(_recorded_id, 0, sizeof(_recorded_id));
memset(_recorded_ts, 0, sizeof(_recorded_ts));
memset(_recorded_len, 0, sizeof(_recorded_len));
_packetsReceivedSinceLastAck = 0;
_lastAck = now;
}
/**
* Receive QoS data, match with recorded egress times from this peer, compute latency
* estimates.
*
* @param now Current time
* @param count Number of records
* @param rx_id table of packet IDs
* @param rx_ts table of holding times
*/
inline void receivedQoS(int64_t now, int count, uint64_t *rx_id, uint8_t *rx_ts)
{
// Look up egress times and compute latency values for each record
for (int j=0; j<count; j++) {
std::map<uint64_t,uint64_t>::iterator it = _outgoingPacketRecords.find(rx_id[j]);
if (it != _outgoingPacketRecords.end()) {
uint16_t rtt = (uint16_t)(now - it->second);
uint16_t rtt_compensated = rtt - rx_ts[j];
float latency = rtt_compensated / 2.0;
updateLatency(latency, now);
_outgoingPacketRecords.erase(it);
}
}
}
/**
* Generate the contents of a VERB_QOS_MEASUREMENT packet.
*
* @param now Current time
* @param qosBuffer destination buffer
* @return Size of payload
*/
inline int32_t generateQoSPacket(int64_t now, char *qosBuffer)
{
int32_t len = 0;
for (int i=0; i<_packetsReceivedSinceLastAck; i++) {
uint64_t id = _recorded_id[i];
memcpy(qosBuffer, &id, sizeof(uint64_t));
qosBuffer+=sizeof(uint64_t);
uint8_t holdingTime = (uint8_t)(now - _recorded_ts[i]);
memcpy(qosBuffer, &holdingTime, sizeof(uint8_t));
qosBuffer+=sizeof(uint8_t);
len+=sizeof(uint64_t)+sizeof(uint8_t);
}
return len;
}
/**
* Account for the fact that a VERB_QOS_MEASUREMENT was just sent. Reset timers.
*
* @param Current time
*/
inline void sentQoS(int64_t now) { _lastQoSMeasurement = now; }
/**
* Record statistics on incoming packets. Used later to estimate QoS.
*
* @param now Current time
* @param packetId
* @param payloadLength
*/
inline void recordIncomingPacket(int64_t now, int64_t packetId, int32_t payloadLength)
{
_recorded_ts[_packetsReceivedSinceLastAck] = now;
_recorded_id[_packetsReceivedSinceLastAck] = packetId;
_recorded_len[_packetsReceivedSinceLastAck] = payloadLength;
_packetsReceivedSinceLastAck++;
_packetValiditySamples->push(true);
}
/**
* @param now Current time
* @return Whether an ACK (VERB_ACK) packet needs to be emitted at this time
*/
inline bool needsToSendAck(int64_t now) {
return ((now - _lastAck) >= ZT_PATH_ACK_INTERVAL ||
(_packetsReceivedSinceLastAck == ZT_PATH_QOS_TABLE_SIZE)) && _packetsReceivedSinceLastAck;
}
/**
* @param now Current time
* @return Whether a QoS (VERB_QOS_MEASUREMENT) packet needs to be emitted at this time
*/
inline bool needsToSendQoS(int64_t now) {
return ((_packetsReceivedSinceLastAck >= ZT_PATH_QOS_TABLE_SIZE) ||
((now - _lastQoSMeasurement) > ZT_PATH_QOS_INTERVAL)) && _packetsReceivedSinceLastAck;
}
/**
* How much time has elapsed since we've been expecting a VERB_ACK on this path. This value
* is used to determine a more relevant path "age". This lets us penalize paths which are no
* longer ACKing, but not those that simple aren't being used to carry traffic at the
* current time.
*/
inline int64_t ackAge(int64_t now) { return _expectingAckAsOf ? now - _expectingAckAsOf : 0; }
/**
* The maximum observed throughput for this path
*/
inline uint64_t maxLifetimeThroughput() { return _maxLifetimeThroughput; }
/**
* @return The mean throughput (in bits/s) of this link
*/
inline float meanThroughput() { return _meanThroughput; }
/**
* Assign a new relative quality value for this path in the aggregate link
*
* @param rq Quality of this path in comparison to other paths available to this peer
*/
inline void updateRelativeQuality(float rq) { _lastComputedRelativeQuality = rq; }
/**
* @return Quality of this path compared to others in the aggregate link
*/
inline float relativeQuality() { return _lastComputedRelativeQuality; }
/**
* @return Stability estimates can become expensive to compute, we cache the most recent result.
*/
inline float lastComputedStability() { return _lastComputedStability; }
/**
* @return A pointer to a cached copy of the human-readable name of the interface this Path's localSocket is bound to
*/
inline char *getName() { return _ifname; }
/**
* @return Packet delay varience
*/
inline float packetDelayVariance() { return _packetDelayVariance; }
/**
* @return Previously-computed mean latency
*/
inline float meanLatency() { return _meanLatency; }
/**
* @return Packet loss rate (PLR)
*/
inline float packetLossRatio() { return _packetLossRatio; }
/**
* @return Packet error ratio (PER)
*/
inline float packetErrorRatio() { return _packetErrorRatio; }
/**
* Record an invalid incoming packet. This packet failed MAC/compression/cipher checks and will now
* contribute to a Packet Error Ratio (PER).
*/
inline void recordInvalidPacket() { _packetValiditySamples->push(false); }
/**
* @return A pointer to a cached copy of the address string for this Path (For debugging only)
*/
inline char *getAddressString() { return _addrString; }
/**
* Compute and cache stability and performance metrics. The resultant stability coefficint is a measure of how "well behaved"
* this path is. This figure is substantially different from (but required for the estimation of the path's overall "quality".
*
* @param now Current time
*/
inline void processBackgroundPathMeasurements(int64_t now, const int64_t peerId) {
if (now - _lastPathQualityComputeTime > ZT_PATH_QUALITY_COMPUTE_INTERVAL) {
_lastPathQualityComputeTime = now;
_phy->getIfName((PhySocket *)((uintptr_t)_localSocket), _ifname, 16);
address().toString(_addrString);
_meanThroughput = _throughputSamples->mean();
_meanLatency = _latencySamples->mean();
_packetDelayVariance = _latencySamples->stddev(); // Similar to "jitter" (SEE: RFC 3393, RFC 4689)
// If no packet validity samples, assume PER==0
_packetErrorRatio = 1 - (_packetValiditySamples->count() ? _packetValiditySamples->mean() : 1);
// Compute path stability
// Normalize measurements with wildly different ranges into a reasonable range
float normalized_pdv = Utils::normalize(_packetDelayVariance, 0, ZT_PATH_MAX_PDV, 0, 10);
float normalized_la = Utils::normalize(_meanLatency, 0, ZT_PATH_MAX_MEAN_LATENCY, 0, 10);
float throughput_cv = _throughputSamples->mean() > 0 ? _throughputSamples->stddev() / _throughputSamples->mean() : 1;
// Form an exponential cutoff and apply contribution weights
float pdv_contrib = exp((-1)*normalized_pdv) * ZT_PATH_CONTRIB_PDV;
float latency_contrib = exp((-1)*normalized_la) * ZT_PATH_CONTRIB_LATENCY;
float throughput_disturbance_contrib = exp((-1)*throughput_cv) * ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE;
// Obey user-defined ignored contributions
pdv_contrib = ZT_PATH_CONTRIB_PDV > 0.0 ? pdv_contrib : 1;
latency_contrib = ZT_PATH_CONTRIB_LATENCY > 0.0 ? latency_contrib : 1;
throughput_disturbance_contrib = ZT_PATH_CONTRIB_THROUGHPUT_DISTURBANCE > 0.0 ? throughput_disturbance_contrib : 1;
// Compute the quality product
_lastComputedStability = pdv_contrib + latency_contrib + throughput_disturbance_contrib;
_lastComputedStability *= 1 - _packetErrorRatio;
_qualitySamples->push(_lastComputedStability);
}
}
/**
* @return True if this path is alive (receiving heartbeats)
*/
inline bool alive(const int64_t now) const { return ((now - _lastIn) < (ZT_PATH_HEARTBEAT_PERIOD + 5000)); }
/**
* @return True if this path hasn't received a packet in a "significant" amount of time
*/
inline bool stale(const int64_t now) const { return ((now - _lastIn) > ZT_LINK_SPEED_TEST_INTERVAL * 10); }
/**
* @return True if this path needs a heartbeat
*/
@ -508,6 +583,21 @@ public:
*/
inline int64_t lastTrustEstablishedPacketReceived() const { return _lastTrustEstablishedPacketReceived; }
/**
* Initialize statistical buffers
*/
inline void prepareBuffers() {
_throughputSamples = new RingBuffer<uint64_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_latencySamples = new RingBuffer<uint32_t>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_qualitySamples = new RingBuffer<float>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
_packetValiditySamples = new RingBuffer<bool>(ZT_PATH_QUALITY_METRIC_WIN_SZ);
memset(_ifname, 0, 16);
memset(_recorded_id, 0, sizeof(_recorded_id));
memset(_recorded_ts, 0, sizeof(_recorded_ts));
memset(_recorded_len, 0, sizeof(_recorded_len));
memset(_addrString, 0, sizeof(_addrString));
}
private:
volatile int64_t _lastOut;
volatile int64_t _lastIn;
@ -519,32 +609,42 @@ private:
InetAddress::IpScope _ipScope; // memoize this since it's a computed value checked often
AtomicCounter __refCount;
// Packet Error Ratio (PER)
int _packetValidity[ZT_PATH_ERROR_SAMPLE_WIN_SZ];
int _currentPacketSampleCounter;
volatile float _meanPacketErrorRatio;
uint64_t _recorded_id[ZT_PATH_QOS_TABLE_SIZE];
uint64_t _recorded_ts[ZT_PATH_QOS_TABLE_SIZE];
uint16_t _recorded_len[ZT_PATH_QOS_TABLE_SIZE];
// Latency and Jitter
volatile float _meanLatency;
int64_t _lastLatencyUpdate;
volatile float _jitter;
std::map<uint64_t, uint64_t> _outgoingPacketRecords;
int64_t _lastAck;
int64_t _lastThroughputEstimation;
int64_t _lastQoSMeasurement;
int64_t _unackedBytes;
int64_t _expectingAckAsOf;
int16_t _packetsReceivedSinceLastAck;
int64_t _lastPathQualitySampleTime;
float _lastComputedQuality;
int64_t _lastPathQualityEstimate;
float _meanAge;
float _meanThroughput;
uint64_t _maxLifetimeThroughput;
uint64_t _bytesAckedSinceLastThroughputEstimation;
// Circular buffers used to efficiently store large time series
RingBuffer<uint64_t> *_throughputSamples;
RingBuffer<uint32_t> *_latencySamples;
RingBuffer<uint64_t> *_ageSamples;
RingBuffer<float> *_errSamples;
volatile float _meanLatency;
float _packetDelayVariance;
float _packetErrorRatio;
float _packetLossRatio;
char _ifname[ZT_PATH_INTERFACE_NAME_SZ];
// cached estimates
float _lastComputedStability;
float _lastComputedRelativeQuality;
// cached human-readable strings for tracing purposes
char _ifname[16];
char _addrString[256];
RingBuffer<uint64_t> *_throughputSamples;
RingBuffer<uint32_t> *_latencySamples;
RingBuffer<float> *_qualitySamples;
RingBuffer<bool> *_packetValiditySamples;
};
} // namespace ZeroTier

View File

@ -24,8 +24,8 @@
* of your own application.
*/
#include "../version.h"
#include "../version.h"
#include "Constants.hpp"
#include "Peer.hpp"
#include "Node.hpp"
@ -36,6 +36,7 @@
#include "Trace.hpp"
#include "InetAddress.hpp"
#include "RingBuffer.hpp"
#include "Utils.hpp"
namespace ZeroTier {
@ -61,13 +62,13 @@ Peer::Peer(const RuntimeEnvironment *renv,const Identity &myIdentity,const Ident
_id(peerIdentity),
_directPathPushCutoffCount(0),
_credentialsCutoffCount(0),
_linkBalanceStatus(false),
_linkRedundancyStatus(false)
_linkIsBalanced(false),
_linkIsRedundant(false),
_lastAggregateStatsReport(0)
{
if (!myIdentity.agree(peerIdentity,_key,ZT_PEER_SECRET_KEY_LENGTH))
throw ZT_EXCEPTION_INVALID_ARGUMENT;
_pathChoiceHist = new RingBuffer<int>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
_flowBalanceHist = new RingBuffer<float>(ZT_MULTIPATH_PROPORTION_WIN_SZ);
}
void Peer::received(
@ -75,6 +76,7 @@ void Peer::received(
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@ -103,13 +105,13 @@ void Peer::received(
{
Mutex::Lock _l(_paths_m);
if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) {
_lastPathPrune = now;
prunePaths();
recordIncomingPacket(tPtr, path, packetId, payloadLength, verb, now);
if (path->needsToSendQoS(now)) {
sendQOS_MEASUREMENT(tPtr, path, path->localSocket(), path->address(), now);
}
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
_paths[i].p->measureLink(now);
_paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt());
}
}
}
@ -117,7 +119,6 @@ void Peer::received(
if (hops == 0) {
// If this is a direct packet (no hops), update existing paths or learn new ones
bool havePath = false;
{
Mutex::Lock _l(_paths_m);
@ -164,6 +165,19 @@ void Peer::received(
}
}
// If we find a pre-existing path with the same address, just replace it.
// If we don't find anything we can replace, just use the replacePath that we previously decided on.
if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
if ( _paths[i].p->address().ss_family == path->address().ss_family && _paths[i].p->address().ipsEqual2(path->address())) {
replacePath = i;
break;
}
}
}
}
if (replacePath != ZT_MAX_PEER_NETWORK_PATHS) {
if (verb == Packet::VERB_OK) {
RR->t->peerLearnedNewPath(tPtr,networkId,*this,path,packetId);
@ -252,6 +266,117 @@ void Peer::received(
}
}
void Peer::recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, int64_t now)
{
if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
path->expectingAck(now, packetId, payloadLength);
}
}
}
void Peer::recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId,
uint16_t payloadLength, const Packet::Verb verb, int64_t now)
{
if (verb == Packet::VERB_FRAME || verb == Packet::VERB_EXT_FRAME) {
if (path->needsToSendAck(now)) {
sendACK(tPtr, path, path->localSocket(), path->address(), now);
}
path->recordIncomingPacket(now, packetId, payloadLength);
}
}
float Peer::computeAggregateLinkRelativeQuality(int64_t now)
{
float maxStability = 0;
float totalRelativeQuality = 0;
float maxThroughput = 1;
float maxScope = 0;
float relStability[ZT_MAX_PEER_NETWORK_PATHS];
float relThroughput[ZT_MAX_PEER_NETWORK_PATHS];
memset(&relStability, 0, sizeof(relStability));
memset(&relThroughput, 0, sizeof(relThroughput));
// Survey all paths
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
relStability[i] = _paths[i].p->lastComputedStability();
relThroughput[i] = _paths[i].p->maxLifetimeThroughput();
maxStability = relStability[i] > maxStability ? relStability[i] : maxStability;
maxThroughput = relThroughput[i] > maxThroughput ? relThroughput[i] : maxThroughput;
maxScope = _paths[i].p->ipScope() > maxScope ? _paths[i].p->ipScope() : maxScope;
}
}
// Convert to relative values
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
relStability[i] /= maxStability ? maxStability : 1;
relThroughput[i] /= maxThroughput ? maxThroughput : 1;
float normalized_ma = Utils::normalize(_paths[i].p->ackAge(now), 0, ZT_PATH_MAX_AGE, 0, 10);
float age_contrib = exp((-1)*normalized_ma);
float relScope = ((float)(_paths[i].p->ipScope()+1) / (maxScope + 1));
float relQuality =
(relStability[i] * ZT_PATH_CONTRIB_STABILITY)
+ (fmax(1, relThroughput[i]) * ZT_PATH_CONTRIB_THROUGHPUT)
+ relScope * ZT_PATH_CONTRIB_SCOPE;
relQuality *= age_contrib;
totalRelativeQuality += relQuality;
_paths[i].p->updateRelativeQuality(relQuality);
}
}
return (float)1.0 / totalRelativeQuality; // Used later to convert relative quantities into flow allocations
}
float Peer::computeAggregateLinkPacketDelayVariance()
{
float pdv = 0.0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
pdv += _paths[i].p->relativeQuality() * _paths[i].p->packetDelayVariance();
}
}
return pdv;
}
float Peer::computeAggregateLinkMeanLatency()
{
float ml = 0.0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
ml += _paths[i].p->relativeQuality() * _paths[i].p->meanLatency();
}
}
return ml;
}
int Peer::aggregateLinkPhysicalPathCount()
{
std::map<std::string, bool> ifnamemap;
int pathCount = 0;
int64_t now = RR->node->now();
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p && _paths[i].p->alive(now)) {
if (!ifnamemap[_paths[i].p->getName()]) {
ifnamemap[_paths[i].p->getName()] = true;
pathCount++;
}
}
}
return pathCount;
}
int Peer::aggregateLinkLogicalPathCount()
{
int pathCount = 0;
int64_t now = RR->node->now();
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p && _paths[i].p->alive(now)) {
pathCount++;
}
}
return pathCount;
}
SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
{
Mutex::Lock _l(_paths_m);
@ -264,7 +389,7 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
if (RR->node->getMultipathMode() == ZT_MULTIPATH_NONE) {
long bestPathQuality = 2147483647;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p && _paths[i].p->isValidState()) {
if (_paths[i].p) {
if ((includeExpired)||((now - _paths[i].lr) < ZT_PEER_PATH_EXPIRATION)) {
const long q = _paths[i].p->quality(now) / _paths[i].priority;
if (q <= bestPathQuality) {
@ -280,23 +405,14 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
return SharedPtr<Path>();
}
if ((now - _lastPathPrune) > ZT_CLOSED_PATH_PRUNING_INTERVAL) {
_lastPathPrune = now;
prunePaths();
}
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
_paths[i].p->measureLink(now);
_paths[i].p->processBackgroundPathMeasurements(now, _id.address().toInt());
}
}
/**
* Randomly distribute traffic across all paths
*
* Behavior:
* - If path DOWN: Stop randomly choosing that path
* - If path UP: Start randomly choosing that path
* - If all paths are unresponsive: randomly choose from all paths
*/
int numAlivePaths = 0;
int numStalePaths = 0;
@ -307,15 +423,13 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
memset(&stalePaths, -1, sizeof(stalePaths));
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
if (_paths[i].p->isValidState()) {
if (_paths[i].p->alive(now)) {
alivePaths[numAlivePaths] = i;
numAlivePaths++;
}
else {
stalePaths[numStalePaths] = i;
numStalePaths++;
}
if (_paths[i].p->alive(now)) {
alivePaths[numAlivePaths] = i;
numAlivePaths++;
}
else {
stalePaths[numStalePaths] = i;
numStalePaths++;
}
}
}
@ -337,162 +451,106 @@ SharedPtr<Path> Peer::getAppropriatePath(int64_t now, bool includeExpired)
* Proportionally allocate traffic according to dynamic path quality measurements
*/
if (RR->node->getMultipathMode() == ZT_MULTIPATH_PROPORTIONALLY_BALANCED) {
float relq[ZT_MAX_PEER_NETWORK_PATHS];
memset(&relq, 0, sizeof(relq));
float alloc[ZT_MAX_PEER_NETWORK_PATHS];
memset(&alloc, 0, sizeof(alloc));
// Survey
//
// Take a survey of all available link qualities. We use this to determine if we
// can skip this algorithm altogether and if not, to establish baseline for physical
// link quality used in later calculations.
//
// We find the min/max quality of our currently-active links so
// that we can form a relative scale to rank each link proportionally
// to each other link.
uint16_t alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
uint16_t stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
int numAlivePaths = 0;
int numStalePaths = 0;
int alivePaths[ZT_MAX_PEER_NETWORK_PATHS];
int stalePaths[ZT_MAX_PEER_NETWORK_PATHS];
memset(&alivePaths, -1, sizeof(alivePaths));
memset(&stalePaths, -1, sizeof(stalePaths));
uint16_t numAlivePaths = 0;
uint16_t numStalePaths = 0;
float minQuality = 10000;
float maxQuality = -1;
float currQuality;
for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p && _paths[i].p->isValidState()) {
if (!_paths[i].p->monitorsReady()) {
// TODO: This should fix itself anyway but we should test whether forcing the use of a new path will
// aid in establishing flow balance more quickly.
}
// Compute quality here, going forward we will use lastComputedQuality()
currQuality = _paths[i].p->computeQuality(now);
if (!_paths[i].p->stale(now)) {
// Attempt to find an excuse not to use the rest of this algorithm
// Alive or Stale?
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
if (_paths[i].p->alive(now)) {
alivePaths[numAlivePaths] = i;
numAlivePaths++;
}
else {
} else {
stalePaths[numStalePaths] = i;
numStalePaths++;
}
if (currQuality > maxQuality) {
maxQuality = currQuality;
bestPath = i;
}
if (currQuality < minQuality) {
minQuality = currQuality;
}
relq[i] = currQuality;
// Record a default path to use as a short-circuit for the rest of the algorithm
bestPath = i;
}
}
// Attempt to find an excuse not to use the rest of this algorithm
if (bestPath == ZT_MAX_PEER_NETWORK_PATHS || (numAlivePaths == 0 && numStalePaths == 0)) {
if (numAlivePaths == 0 && numStalePaths == 0) {
return SharedPtr<Path>();
} if (numAlivePaths == 1) {
//return _paths[bestPath].p;
} if (numStalePaths == 1) {
//return _paths[bestPath].p;
} if (numAlivePaths == 1 || numStalePaths == 1) {
return _paths[bestPath].p;
}
// Relative quality
//
// The strongest link will have a value of 1.0 whereas every other
// link will have a value which represents some fraction of the strongest link.
float totalRelativeQuality = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p && _paths[i].p->isValidState()) {
relq[i] /= maxQuality ? maxQuality : 1;
totalRelativeQuality += relq[i];
}
}
// Convert the relative quality values into flow allocations.
// Additionally, determine whether each path in the flow is
// contributing more or less than its target allocation. If
// it is contributing more than required, don't allow it to be
// randomly selected for the next packet. If however the path
// needs to contribute more to the flow, we should record
float imbalance = 0;
float qualityScalingFactor = (float)1.0 / totalRelativeQuality;
// Compare paths to each-other
float qualityScalingFactor = computeAggregateLinkRelativeQuality(now);
// Convert set of relative performances into an allocation set
for(uint16_t i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
// Out of the last N packets to this peer, how many were sent by this path?
int numPktSentWithinWin = (int)_pathChoiceHist->countValue(i);
// Compute traffic allocation for each path in the flow
if (_paths[i].p && _paths[i].p->isValidState()) {
// Allocation
// This is the percentage of traffic we want to send over a given path
alloc[i] = relq[i] * qualityScalingFactor;
float currProportion = numPktSentWithinWin / (float)ZT_MULTIPATH_PROPORTION_WIN_SZ;
float targetProportion = alloc[i];
float diffProportion = currProportion - targetProportion;
// Imbalance
//
// This is the sum of the distances of each path's currently observed flow contributions
// from its most recent target allocation. In other words, this is a measure of how closely we
// are adhering to our desired allocations. It is worth noting that this value can be greater
// than 1.0 if a significant change to allocations is made by the algorithm, this will
// eventually correct itself.
imbalance += fabs(diffProportion);
if (diffProportion < 0) {
alloc[i] = targetProportion;
}
else {
alloc[i] = targetProportion;
}
if (_paths[i].p) {
alloc[i] = _paths[i].p->relativeQuality() * qualityScalingFactor;
}
}
// Compute and record current flow balance
float balance = (float)1.0 - imbalance;
if (balance >= ZT_MULTIPATH_FLOW_BALANCE_THESHOLD) {
if (!_linkBalanceStatus) {
_linkBalanceStatus = true;
RR->t->peerLinkBalanced(NULL,0,*this);
}
}
else {
if (_linkBalanceStatus) {
_linkBalanceStatus = false;
RR->t->peerLinkImbalanced(NULL,0,*this);
}
}
// Record the current flow balance. Later used for computing a mean flow balance value.
_flowBalanceHist->push(balance);
// Randomly choose path from allocated candidates
// Randomly choose path according to their allocations
unsigned int r;
Utils::getSecureRandom(&r, 1);
float rf = (float)(r %= 100) / 100;
for(int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p && _paths[i].p->isValidState() && _paths[i].p->address().isV4()) {
if (alloc[i] > 0 && rf < alloc[i]) {
if (_paths[i].p) {
if (rf < alloc[i]) {
bestPath = i;
_pathChoiceHist->push(bestPath); // Record which path we chose
break;
}
if (alloc[i] > 0) {
rf -= alloc[i];
}
else {
rf -= alloc[i]*-1;
}
rf -= alloc[i];
}
}
if (bestPath < ZT_MAX_PEER_NETWORK_PATHS) {
return _paths[bestPath].p;
}
return SharedPtr<Path>();
}
// Adhere to a user-defined interface/allocation scheme
if (RR->node->getMultipathMode() == ZT_MULTIPATH_MANUALLY_BALANCED) {
// TODO
}
return SharedPtr<Path>();
}
char *Peer::interfaceListStr()
{
std::map<std::string, int> ifnamemap;
char tmp[32];
const int64_t now = RR->node->now();
char *ptr = _interfaceListStr;
bool imbalanced = false;
memset(_interfaceListStr, 0, sizeof(_interfaceListStr));
int alivePathCount = aggregateLinkLogicalPathCount();
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p && _paths[i].p->alive(now)) {
int ipv = _paths[i].p->address().isV4();
// If this is acting as an aggregate link, check allocations
float targetAllocation = 1.0 / alivePathCount;
float currentAllocation = 1.0;
if (alivePathCount > 1) {
currentAllocation = (float)_pathChoiceHist->countValue(i) / (float)_pathChoiceHist->count();
if (fabs(targetAllocation - currentAllocation) > ZT_PATH_IMBALANCE_THRESHOLD) {
imbalanced = true;
}
}
char *ipvStr = ipv ? (char*)"ipv4" : (char*)"ipv6";
sprintf(tmp, "(%s, %s, %5.4f)", _paths[i].p->getName(), ipvStr, currentAllocation);
// Prevent duplicates
if(ifnamemap[_paths[i].p->getName()] != ipv) {
memcpy(ptr, tmp, strlen(tmp));
ptr += strlen(tmp);
*ptr = ' ';
ptr++;
ifnamemap[_paths[i].p->getName()] = ipv;
}
}
}
ptr--; // Overwrite trailing space
if (imbalanced) {
sprintf(tmp, ", is IMBALANCED");
memcpy(ptr, tmp, sizeof(tmp));
} else {
*ptr = '\0';
}
return _interfaceListStr;
}
void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &other) const
{
unsigned int myBestV4ByScope[ZT_INETADDRESS_MAX_SCOPE+1];
@ -614,6 +672,35 @@ void Peer::introduce(void *const tPtr,const int64_t now,const SharedPtr<Peer> &o
}
}
void Peer::sendACK(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_ACK);
uint32_t bytesToAck = path->bytesToAck();
outp.append<uint32_t>(bytesToAck);
if (atAddress) {
outp.armor(_key,false);
RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
} else {
RR->sw->send(tPtr,outp,false);
}
path->sentAck(now);
}
void Peer::sendQOS_MEASUREMENT(void *tPtr,const SharedPtr<Path> &path,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_QOS_MEASUREMENT);
char qosData[ZT_PATH_MAX_QOS_PACKET_SZ];
path->generateQoSPacket(now,qosData);
outp.append(qosData,sizeof(qosData));
if (atAddress) {
outp.armor(_key,false);
RR->node->putPacket(tPtr,localSocket,atAddress,outp.data(),outp.size());
} else {
RR->sw->send(tPtr,outp,false);
}
path->sentQoS(now);
}
void Peer::sendHELLO(void *tPtr,const int64_t localSocket,const InetAddress &atAddress,int64_t now)
{
Packet outp(_id.address(),RR->identity.address(),Packet::VERB_HELLO);
@ -688,6 +775,25 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
const bool sendFullHello = ((now - _lastSentFullHello) >= ZT_PEER_PING_PERIOD);
_lastSentFullHello = now;
// Emit traces regarding the status of aggregate links
if (RR->node->getMultipathMode() != ZT_MULTIPATH_NONE) {
int alivePathCount = aggregateLinkPhysicalPathCount();
if ((now - _lastAggregateStatsReport) > ZT_PATH_AGGREGATE_STATS_REPORT_INTERVAL) {
_lastAggregateStatsReport = now;
if (alivePathCount) {
RR->t->peerLinkAggregateStatistics(NULL,*this);
}
}
// Report link redundancy
if (alivePathCount < 2 && _linkIsRedundant) {
_linkIsRedundant = !_linkIsRedundant;
RR->t->peerLinkNoLongerRedundant(NULL,*this);
} if (alivePathCount > 1 && !_linkIsRedundant) {
_linkIsRedundant = !_linkIsRedundant;
RR->t->peerLinkNowRedundant(NULL,*this);
}
}
// Right now we only keep pinging links that have the maximum priority. The
// priority is used to track cluster redirections, meaning that when a cluster
// redirects us its redirect target links override all other links and we
@ -726,22 +832,6 @@ unsigned int Peer::doPingAndKeepalive(void *tPtr,int64_t now)
return sent;
}
unsigned int Peer::prunePaths()
{
unsigned int pruned = 0;
for(unsigned int i=0;i<ZT_MAX_PEER_NETWORK_PATHS;++i) {
if (_paths[i].p) {
if(_paths[i].p->isClosed() || !_paths[i].p->isValidState()) {
_paths[i].lr = 0;
_paths[i].p.zero();
_paths[i].priority = 1;
pruned++;
}
}
}
return pruned;
}
void Peer::clusterRedirect(void *tPtr,const SharedPtr<Path> &originatingPath,const InetAddress &remoteAddress,const int64_t now)
{
SharedPtr<Path> np(RR->topology->getPath(originatingPath->localSocket(),remoteAddress));

View File

@ -68,9 +68,7 @@ public:
~Peer() {
Utils::burn(_key,sizeof(_key));
delete _pathChoiceHist;
delete _flowBalanceHist;
_pathChoiceHist = NULL;
_flowBalanceHist = NULL;
}
/**
@ -114,6 +112,7 @@ public:
const SharedPtr<Path> &path,
const unsigned int hops,
const uint64_t packetId,
const unsigned int payloadLength,
const Packet::Verb verb,
const uint64_t inRePacketId,
const Packet::Verb inReVerb,
@ -158,7 +157,74 @@ public:
}
/**
* Get the most appropriate direct path based on current multipath configuration
* Record statistics on outgoing packets
*
* @param path Path over which packet was sent
* @param id Packet ID
* @param len Length of packet payload
* @param verb Packet verb
* @param now Current time
*/
void recordOutgoingPacket(const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
/**
* Record statistics on incoming packets
*
* @param path Path over which packet was sent
* @param id Packet ID
* @param len Length of packet payload
* @param verb Packet verb
* @param now Current time
*/
void recordIncomingPacket(void *tPtr, const SharedPtr<Path> &path, const uint64_t packetId, uint16_t payloadLength, const Packet::Verb verb, int64_t now);
/**
* Send an ACK to peer for the most recent packets received
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param localSocket Raw socket the ACK packet will be sent over
* @param atAddress Destination for the ACK packet
* @param now Current time
*/
void sendACK(void *tPtr, const SharedPtr<Path> &path, const int64_t localSocket,const InetAddress &atAddress,int64_t now);
/**
* Send a QoS packet to peer so that it can evaluate the quality of this link
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
* @param localSocket Raw socket the QoS packet will be sent over
* @param atAddress Destination for the QoS packet
* @param now Current time
*/
void sendQOS_MEASUREMENT(void *tPtr, const SharedPtr<Path> &path, const int64_t localSocket,const InetAddress &atAddress,int64_t now);
/**
* @return The relative quality values for each path
*/
float computeAggregateLinkRelativeQuality(int64_t now);
/**
* @return The aggregate link Packet Delay Variance (PDV)
*/
float computeAggregateLinkPacketDelayVariance();
/**
* @return The aggregate link mean latenct
*/
float computeAggregateLinkMeanLatency();
/**
* @return The number of currently alive "physical" paths in the aggregate link
*/
int aggregateLinkPhysicalPathCount();
/**
* @return The number of currently alive "logical" paths in the aggregate link
*/
int aggregateLinkLogicalPathCount();
/**
* Get the most appropriate direct path based on current multipath and QoS configuration
*
* @param now Current time
* @param includeExpired If true, include even expired paths
@ -166,6 +232,12 @@ public:
*/
SharedPtr<Path> getAppropriatePath(int64_t now, bool includeExpired);
/**
* Generate a human-readable string of interface names making up the aggregate link, also include
* moving allocation and IP version number for each (for tracing)
*/
char *interfaceListStr();
/**
* Send VERB_RENDEZVOUS to this and another peer via the best common IP scope and path
*/
@ -549,11 +621,12 @@ private:
AtomicCounter __refCount;
RingBuffer<int> *_pathChoiceHist;
RingBuffer<float> *_flowBalanceHist;
bool _linkBalanceStatus;
bool _linkRedundancyStatus;
bool _linkIsBalanced;
bool _linkIsRedundant;
uint64_t _lastAggregateStatsReport;
char _interfaceListStr[256]; // 16 characters * 16 paths in a link
};
} // namespace ZeroTier

View File

@ -172,6 +172,11 @@ public:
write(&value, 1);
}
/**
* @return The most recently pushed element on the buffer
*/
T get_most_recent() { return *(buf + end); }
/**
* @param dest Destination buffer
* @param n Size (in terms of number of elements) of the destination buffer
@ -218,10 +223,7 @@ public:
/**
* @return The number of slots that are unused in the buffer
*/
size_t getFree()
{
return size - count();
}
size_t getFree() { return size - count(); }
/**
* @return The arithmetic mean of the contents of the buffer
@ -229,45 +231,67 @@ public:
float mean()
{
size_t iterator = begin;
float mean = 0;
for (size_t i=0; i<size; i++) {
iterator = (iterator + size - 1) % size;
mean += *(buf + iterator);
float subtotal = 0;
size_t curr_cnt = count();
for (size_t i=0; i<curr_cnt; i++) {
iterator = (iterator + size - 1) % curr_cnt;
subtotal += (float)*(buf + iterator);
}
return count() ? mean / (float)count() : 0;
return curr_cnt ? subtotal / (float)curr_cnt : 0;
}
/**
* @return The sample standard deviation of the contents of the ring buffer
* @return The arithmetic mean of the most recent 'n' elements of the buffer
*/
float stddev()
float mean(size_t n)
{
n = n < size ? n : size;
size_t iterator = begin;
float subtotal = 0;
size_t curr_cnt = count();
for (size_t i=0; i<n; i++) {
iterator = (iterator + size - 1) % curr_cnt;
subtotal += (float)*(buf + iterator);
}
return curr_cnt ? subtotal / (float)curr_cnt : 0;
}
/**
* @return The sample standard deviation of element values
*/
float stddev() { return sqrt(variance()); }
/**
* @return The variance of element values
*/
float variance()
{
size_t iterator = begin;
float cached_mean = mean();
size_t curr_cnt = count();
if (size) {
T sum_of_squared_deviations = 0;
for (size_t i=0; i<size; i++) {
iterator = (iterator + size - 1) % size;
for (size_t i=0; i<curr_cnt; i++) {
iterator = (iterator + size - 1) % curr_cnt;
float deviation = (buf[i] - cached_mean);
float sdev = deviation*deviation;
sum_of_squared_deviations += sdev;
sum_of_squared_deviations += (deviation*deviation);
}
float variance = sum_of_squared_deviations / (size - 1);
float sd = sqrt(variance);
return sd;
float variance = (float)sum_of_squared_deviations / (float)(size - 1);
return variance;
}
return 0;
}
/**
* @return The number of elements of zero value, O(n)
* @return The number of elements of zero value
*/
size_t zeroCount()
{
size_t iterator = begin;
size_t zeros = 0;
for (size_t i=0; i<size; i++) {
iterator = (iterator + size - 1) % size;
size_t curr_cnt = count();
for (size_t i=0; i<curr_cnt; i++) {
iterator = (iterator + size - 1) % curr_cnt;
if (*(buf + iterator) == 0) {
zeros++;
}
@ -282,14 +306,15 @@ public:
size_t countValue(T value)
{
size_t iterator = begin;
size_t count = 0;
for (size_t i=0; i<size; i++) {
iterator = (iterator + size - 1) % size;
size_t cnt = 0;
size_t curr_cnt = count();
for (size_t i=0; i<curr_cnt; i++) {
iterator = (iterator + size - 1) % curr_cnt;
if (*(buf + iterator) == value) {
count++;
cnt++;
}
}
return count;
return cnt;
}
/**
@ -301,10 +326,10 @@ public:
for (size_t i=0; i<size; i++) {
iterator = (iterator + size - 1) % size;
if (typeid(T) == typeid(int)) {
// DEBUG_INFO("buf[%2zu]=%2d", iterator, (int)*(buf + iterator));
//DEBUG_INFO("buf[%2zu]=%2d", iterator, (int)*(buf + iterator));
}
else {
// DEBUG_INFO("buf[%2zu]=%2f", iterator, (float)*(buf + iterator));
//DEBUG_INFO("buf[%2zu]=%2f", iterator, (float)*(buf + iterator));
}
}
}

View File

@ -666,6 +666,8 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
unsigned int chunkSize = std::min(packet.size(),mtu);
packet.setFragmented(chunkSize < packet.size());
peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
if (trustedPathId) {
packet.setTrusted(trustedPathId);
} else {

View File

@ -106,24 +106,24 @@ void Trace::peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,
}
}
void Trace::peerLinkNowRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath)
void Trace::peerLinkNowRedundant(void *const tPtr,Peer &peer)
{
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is fully redundant",peer.address().toInt(),networkId);
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is fully redundant",peer.address().toInt());
}
void Trace::peerLinkNoLongerRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath)
void Trace::peerLinkNoLongerRedundant(void *const tPtr,Peer &peer)
{
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is no longer redundant",peer.address().toInt(),networkId);
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is no longer redundant",peer.address().toInt());
}
void Trace::peerLinkBalanced(void *const tPtr,const uint64_t networkId,Peer &peer)
void Trace::peerLinkAggregateStatistics(void *const tPtr,Peer &peer)
{
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is balanced",peer.address().toInt(),networkId);
}
void Trace::peerLinkImbalanced(void *const tPtr,const uint64_t networkId,Peer &peer)
{
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx on network %.16llx is unbalanced",peer.address().toInt(),networkId);
ZT_LOCAL_TRACE(tPtr,RR,"link to peer %.10llx is composed of (%d) physical paths %s, has packet delay variance (%.0f ms), mean latency (%.0f ms)",
peer.address().toInt(),
peer.aggregateLinkPhysicalPathCount(),
peer.interfaceListStr(),
peer.computeAggregateLinkPacketDelayVariance(),
peer.computeAggregateLinkMeanLatency());
}
void Trace::peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId)

View File

@ -122,10 +122,10 @@ public:
void peerConfirmingUnknownPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &path,const uint64_t packetId,const Packet::Verb verb);
void peerLinkNowRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);
void peerLinkNoLongerRedundant(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);
void peerLinkBalanced(void *const tPtr,const uint64_t networkId,Peer &peer);
void peerLinkImbalanced(void *const tPtr,const uint64_t networkId,Peer &peer);
void peerLinkNowRedundant(void *const tPtr,Peer &peer);
void peerLinkNoLongerRedundant(void *const tPtr,Peer &peer);
void peerLinkAggregateStatistics(void *const tPtr,Peer &peer);
void peerLearnedNewPath(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath,const uint64_t packetId);
void peerRedirected(void *const tPtr,const uint64_t networkId,Peer &peer,const SharedPtr<Path> &newPath);

View File

@ -261,6 +261,14 @@ public:
return l;
}
static inline float normalize(float value, int64_t bigMin, int64_t bigMax, int32_t targetMin, int32_t targetMax)
{
int64_t bigSpan = bigMax - bigMin;
int64_t smallSpan = targetMax - targetMin;
float valueScaled = (value - (float)bigMin) / (float)bigSpan;
return (float)targetMin + valueScaled * (float)smallSpan;
}
/**
* Generate secure random bytes
*

View File

@ -456,20 +456,6 @@ public:
return false;
}
/**
* Get a list of socket pointers for all bindings.
*
* @return A list of socket pointers for current bindings
*/
inline std::vector<PhySocket*> getBoundSockets()
{
std::vector<PhySocket*> sockets;
for (int i=0; i<ZT_BINDER_MAX_BINDINGS; i++) {
sockets.push_back(_bindings[i].udpSock);
}
return sockets;
}
private:
_Binding _bindings[ZT_BINDER_MAX_BINDINGS];
std::atomic<unsigned int> _bindingCount;

View File

@ -27,8 +27,6 @@
#ifndef ZT_PHY_HPP
#define ZT_PHY_HPP
#include "../osdep/OSUtils.hpp"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -88,22 +86,6 @@ namespace ZeroTier {
*/
typedef void PhySocket;
struct link_test_record
{
link_test_record(PhySocket *_s, uint64_t _id, uint64_t _egress_time, uint32_t _length) :
s(_s),
id(_id),
egress_time(_egress_time),
length(_length)
{
//
}
PhySocket *s;
uint64_t id;
uint64_t egress_time;
uint32_t length;
};
/**
* Simple templated non-blocking sockets implementation
*
@ -170,19 +152,13 @@ private:
ZT_PHY_SOCKET_UNIX_LISTEN = 0x08
};
struct PhySocketImpl
{
PhySocketImpl() :
throughput(0)
{
memset(ifname, 0, sizeof(ifname));
}
struct PhySocketImpl {
PhySocketImpl() { memset(ifname, 0, sizeof(ifname)); }
PhySocketType type;
ZT_PHY_SOCKFD_TYPE sock;
void *uptr; // user-settable pointer
ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP
char ifname[16];
uint64_t throughput;
};
std::list<PhySocketImpl> _socks;
@ -198,7 +174,6 @@ private:
bool _noDelay;
bool _noCheck;
std::vector<struct link_test_record*> link_test_records;
public:
/**
@ -282,7 +257,9 @@ public:
*/
static inline void getIfName(PhySocket *s, char *nameBuf, int buflen)
{
memcpy(nameBuf, reinterpret_cast<PhySocketImpl *>(s)->ifname, buflen);
if (s) {
memcpy(nameBuf, reinterpret_cast<PhySocketImpl *>(s)->ifname, buflen);
}
}
/**
@ -292,18 +269,9 @@ public:
*/
static inline void setIfName(PhySocket *s, char *ifname, int len)
{
memcpy(&(reinterpret_cast<PhySocketImpl *>(s)->ifname), ifname, len);
}
/**
* Get result of most recent throughput test
*
* @param s Socket object
*/
inline uint64_t getThroughput(PhySocket *s)
{
PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
return sws ? sws->throughput : 0;
if (s) {
memcpy(&(reinterpret_cast<PhySocketImpl *>(s)->ifname), ifname, len);
}
}
/**
@ -339,105 +307,9 @@ public:
*/
inline bool isValidState(PhySocket *s)
{
PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
return sws->type >= ZT_PHY_SOCKET_CLOSED && sws->type <= ZT_PHY_SOCKET_UNIX_LISTEN;
}
/**
* Send a datagram of a known size to a selected peer and record egress time. The peer
* shall eventually respond by echoing back a smaller datagram.
*
* @param s Socket object
* @param remoteAddress Address of remote peer to receive link test packet
* @param data Buffer containing random packet data
* @param len Length of packet data buffer
* @return Number of bytes successfully written to socket
*/
inline int test_link_speed(PhySocket *s, const struct sockaddr *to, void *data, uint32_t len) {
if (!reinterpret_cast<PhySocketImpl *>(s)) {
return 0;
}
uint64_t *buf = (uint64_t*)data;
uint64_t id = buf[0];
if (to->sa_family != AF_INET && to->sa_family != AF_INET6) {
return 0;
}
uint64_t egress_time = OSUtils::now();
PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
#if defined(_WIN32) || defined(_WIN64)
int w = ::sendto(sws->sock,reinterpret_cast<const char *>(data),len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in))
#else
int w = ::sendto(sws->sock,data,len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
#endif
if (w > 0) {
link_test_records.push_back(new link_test_record(s, id, egress_time, len));
}
return w;
}
/**
* Remove link speed test records which have timed-out and record a 0 bits/s measurement
*/
inline void refresh_link_speed_records()
{
for(size_t i=0;i<link_test_records.size();i++) {
if(OSUtils::now() - link_test_records[i]->egress_time > ZT_LINK_TEST_TIMEOUT) {
PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(link_test_records[i]->s));
if (sws) {
sws->throughput = 0;
}
link_test_records.erase(link_test_records.begin() + i);
}
}
}
/**
* Upon receipt of a link speed test datagram we echo back only the identification portion
*
* @param s Socket object
* @param from Address of remote peer that sent this datagram
* @param data Buffer containing datagram's contents
* @param len Length of datagram
* @return Number of bytes successfully written to socket in response
*/
inline int respond_to_link_test(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) {
PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
uint64_t *id = (uint64_t*)data;
#if defined(_WIN32) || defined(_WIN64)
int w = ::sendto(sws->sock,reinterpret_cast<const char *>(id),sizeof(id[0]),0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
#else
int w = ::sendto(sws->sock,id,sizeof(id[0]),0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
#endif
return w;
}
/**
* Upon receipt of a response to our original link test datagram, correlate this new datagram with the record
* of the one we sent. Compute the transit time and update the throughput field of the relevant socket. This
* value will later be read by the path quality estimation logic located in Path.hpp.
*
* @param s Socket object
* @param from Address of remote peer that sent this datagram
* @param data Buffer containing datagram contents (ID of original link test datagram)
* @param len Length of datagram
* @return true if datagram correponded to previous record, false if otherwise
*/
inline bool handle_link_test_response(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) {
uint64_t *id = (uint64_t*)data;
for(size_t i=0;i<link_test_records.size();i++) {
if(link_test_records[i]->id == id[0]) {
float rtt = (OSUtils::now()-link_test_records[i]->egress_time) / (float)1000; // s
uint32_t sz = (link_test_records[i]->length) * 8; // bits
float transit_time = rtt / (float)2.0;
uint64_t raw = (uint64_t)(sz / transit_time);
PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
if (sws) {
sws->throughput = raw;
}
delete link_test_records[i];
link_test_records.erase(link_test_records.begin() + i);
return true;
}
if (s) {
PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
return sws->type >= ZT_PHY_SOCKET_CLOSED && sws->type <= ZT_PHY_SOCKET_UNIX_LISTEN;
}
return false;
}

View File

@ -37,7 +37,6 @@
#include "../version.h"
#include "../include/ZeroTierOne.h"
#include "../include/ZeroTierDebug.h"
#include "../node/Constants.hpp"
#include "../node/Mutex.hpp"
@ -458,9 +457,6 @@ public:
// Last potential sleep/wake event
uint64_t _lastRestart;
// Last time link throughput was tested
uint64_t _lastLinkSpeedTest;
// Deadline for the next background task service function
volatile int64_t _nextBackgroundTaskDeadline;
@ -881,26 +877,6 @@ public:
lastMultipathModeUpdate = now;
_node->setMultipathMode(_multipathMode);
}
// Test link speeds
// TODO: This logic should eventually find its way into the core or as part of a passive
// measure within the protocol.
if (_multipathMode && ((now - _lastLinkSpeedTest) >= ZT_LINK_SPEED_TEST_INTERVAL)) {
_phy.refresh_link_speed_records();
_lastLinkSpeedTest = now;
// Generate random data to fill UDP packet
uint64_t pktBuf[ZT_LINK_TEST_DATAGRAM_SZ / sizeof(uint64_t)];
Utils::getSecureRandom(pktBuf, ZT_LINK_TEST_DATAGRAM_SZ);
ZT_PeerList *pl = _node->peers();
std::vector<PhySocket*> sockets = _binder.getBoundSockets();
for (int i=0; i<ZT_BINDER_MAX_BINDINGS; i++) {
for(size_t j=0;j<pl->peerCount;++j) {
for (int k=0; k<(ZT_MAX_PEER_NETWORK_PATHS/4); k++) {
Utils::getSecureRandom(pktBuf, 8); // generate one random integer for unique id
_phy.test_link_speed(sockets[i], (struct sockaddr*)&(pl->peers[j].paths[k].address), pktBuf, ZT_LINK_TEST_DATAGRAM_SZ);
}
}
}
}
// Run background task processor in core if it's time to do so
int64_t dl = _nextBackgroundTaskDeadline;
@ -1799,15 +1775,6 @@ public:
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
{
if (_multipathMode) {
// Handle link test packets (should eventually be moved into the protocol itself)
if (len == ZT_LINK_TEST_DATAGRAM_SZ) {
_phy.respond_to_link_test(sock, from, data, len);
}
if (len == ZT_LINK_TEST_DATAGRAM_RESPONSE_SZ) {
_phy.handle_link_test_response(sock, from, data, len);
}
}
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
_lastDirectReceiveFromGlobal = OSUtils::now();
const ZT_ResultCode rc = _node->processWirePacket(