root cleanup and add nifty geoip thing

This commit is contained in:
Adam Ierymenko 2019-09-06 07:37:44 -07:00
parent c6141ffbc7
commit 3123102211
No known key found for this signature in database
GPG Key ID: C8877CF2D7A5D7F3
2 changed files with 215 additions and 157 deletions

53
root/geoip-html.h Normal file
View File

@ -0,0 +1,53 @@
#ifndef ZT_ROOT_GEOIP_HTML_H
#define ZT_ROOT_GEOIP_HTML_H
#define ZT_GEOIP_HTML_HEAD \
"<!DOCTYPE html>\n" \
"<html>\n" \
" <head>\n" \
" <meta name=\"viewport\" content=\"initial-scale=1.0, user-scalable=no\">\n" \
" <meta charset=\"utf-8\">\n" \
" <title>GeoIP Map</title>\n" \
" <style>\n" \
" #map {\n" \
" height: 100%;\n" \
" }\n" \
" html, body {\n" \
" height: 100%;\n" \
" width: 100%;\n" \
" margin: 0;\n" \
" padding: 0;\n" \
" }\n" \
" </style>\n" \
" </head>\n" \
" <body>\n" \
" <div id=\"map\"></div>\n" \
" <script>\n" \
"\n" \
" function initMap() {\n" \
" var map = new google.maps.Map(document.getElementById('map'), {\n" \
" zoom: 3\n" \
" });\n" \
" var markers = locations.map(function(location,i) {\n" \
" return new google.maps.Marker({\n" \
" position: location,\n" \
" label: location._l\n" \
" });\n" \
" });\n" \
"\n" \
" var markerCluster = new MarkerClusterer(map,markers,{imagePath: 'https://developers.google.com/maps/documentation/javascript/examples/markerclusterer/m'});\n" \
" }\n" \
" var locations = ["
#define ZT_GEOIP_HTML_TAIL \
"];\n" \
" </script>\n" \
" <script src=\"https://developers.google.com/maps/documentation/javascript/examples/markerclusterer/markerclusterer.js\">\n" \
" </script>\n" \
" <!-- <script async defer\n" \
" src=\"https://maps.googleapis.com/maps/api/js?v=3.exp&sensor=false&callback=initMap\">\n" \
" </script> -->\n" \
" </body>\n" \
"</html>"
#endif

View File

@ -99,6 +99,8 @@
#include <mutex>
#include <sstream>
#include "geoip-html.h"
using namespace ZeroTier;
using json = nlohmann::json;
@ -111,6 +113,32 @@ using json = nlohmann::json;
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
/**
* RootPeer is a normal peer known to this root
*
* This can also be a sibling root, which is itself a peer. Sibling roots
* are sent HELLO while for other peers we only listen for HELLO.
*/
struct RootPeer
{
ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastSync(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1) {}
ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
Identity id; // Identity
uint8_t key[32]; // Shared secret key
InetAddress ip4,ip6; // IPv4 and IPv6 addresses
int64_t lastSend; // Time of last send (any packet)
int64_t lastReceive; // Time of last receive (any packet)
int64_t lastSync; // Time of last data synchronization with LF or other root state backend (currently unused)
int64_t lastEcho; // Time of last received ECHO
int64_t lastHello; // Time of last received HELLO
int vProto; // Protocol version
int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown
std::mutex lock;
AtomicCounter __refCount;
};
// Hashers for std::unordered_map
struct IdentityHasher { ZT_ALWAYS_INLINE std::size_t operator()(const Identity &id) const { return (std::size_t)id.hashCode(); } };
struct AddressHasher { ZT_ALWAYS_INLINE std::size_t operator()(const Address &a) const { return (std::size_t)a.toInt(); } };
@ -136,33 +164,6 @@ struct RendezvousKey
struct Hasher { ZT_ALWAYS_INLINE std::size_t operator()(const RendezvousKey &k) const { return (std::size_t)(k.a.toInt() ^ k.b.toInt()); } };
};
/**
* RootPeer is a normal peer known to this root
*
* This can also be a sibling root, which is itself a peer. Sibling roots
* are sent HELLO while for other peers we only listen for HELLO.
*/
struct RootPeer
{
ZT_ALWAYS_INLINE RootPeer() : lastSend(0),lastReceive(0),lastSync(0),lastEcho(0),lastHello(0),vProto(-1),vMajor(-1),vMinor(-1),vRev(-1),sibling(false) {}
ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
Identity id; // Identity
uint8_t key[32]; // Shared secret key
InetAddress ip4,ip6; // IPv4 and IPv6 addresses
int64_t lastSend; // Time of last send (any packet)
int64_t lastReceive; // Time of last receive (any packet)
int64_t lastSync; // Time of last data synchronization with LF or other root state backend (currently unused)
int64_t lastEcho; // Time of last received ECHO
int64_t lastHello; // Time of last received HELLO
int vProto; // Protocol version
int vMajor,vMinor,vRev; // Peer version or -1,-1,-1 if unknown
bool sibling; // If true, this is a sibling root that will get forwards we don't know where to send
std::mutex lock;
AtomicCounter __refCount;
};
struct RendezvousStats
{
RendezvousStats() : count(0),ts(0) {}
@ -178,28 +179,29 @@ struct ForwardingStats
Meter bps;
};
static int64_t s_startTime; // Time service was started
static std::vector<int> s_ports; // Ports to bind for UDP traffic
static int s_relayMaxHops = 0; // Max relay hops
static Identity s_self; // My identity (including secret)
static std::atomic_bool s_run; // Remains true until shutdown is ordered
static json s_config; // JSON config file contents
static std::string s_statsRoot; // Root to write stats, peers, etc.
static int64_t s_startTime; // Time service was started
static std::vector<int> s_ports; // Ports to bind for UDP traffic
static int s_relayMaxHops = 0; // Max relay hops
static Identity s_self; // My identity (including secret)
static std::atomic_bool s_run; // Remains true until shutdown is ordered
static json s_config; // JSON config file contents
static std::string s_statsRoot; // Root to write stats, peers, etc.
static std::atomic_bool s_geoInit; // True if geoIP data is initialized
static Meter s_inputRate;
static Meter s_outputRate;
static Meter s_forwardRate;
static Meter s_siblingForwardRate;
static Meter s_discardedForwardRate;
static std::string s_planet;
static std::vector< SharedPtr<RootPeer> > s_siblings;
static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > s_multicastSubscriptions;
static std::unordered_map< Identity,SharedPtr<RootPeer>,IdentityHasher > s_peersByIdentity;
static std::unordered_map< Address,std::set< SharedPtr<RootPeer> >,AddressHasher > s_peersByVirtAddr;
static std::unordered_map< InetAddress,std::set< SharedPtr<RootPeer> >,InetAddressHasher > s_peersByPhysAddr;
static std::unordered_map< RendezvousKey,RendezvousStats,RendezvousKey::Hasher > s_lastSentRendezvous;
static std::unordered_map< RendezvousKey,RendezvousStats,RendezvousKey::Hasher > s_rendezvousTracking;
static std::unordered_map< Address,ForwardingStats,AddressHasher > s_lastForwardedTo;
static std::map< std::pair< uint32_t,uint32_t >,std::pair< float,float > > s_geoIp4;
static std::map< std::pair< std::array< uint64_t,2 >,std::array< uint64_t,2 > >,std::pair< float,float > > s_geoIp6;
static std::mutex s_planet_l;
static std::mutex s_siblings_l;
@ -207,12 +209,26 @@ static std::mutex s_multicastSubscriptions_l;
static std::mutex s_peersByIdentity_l;
static std::mutex s_peersByVirtAddr_l;
static std::mutex s_peersByPhysAddr_l;
static std::mutex s_lastSentRendezvous_l;
static std::mutex s_rendezvousTracking_l;
static std::mutex s_lastForwardedTo_l;
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
static uint32_t ip4ToH32(const InetAddress &ip)
{
return Utils::ntoh((uint32_t)(((const struct sockaddr_in *)&ip)->sin_addr.s_addr));
}
static std::array< uint64_t,2 > ip6ToH128(const InetAddress &ip)
{
std::array<uint64_t,2> i128;
memcpy(i128.data(),ip.rawIpData(),16);
i128[0] = Utils::ntoh(i128[0]);
i128[1] = Utils::ntoh(i128[1]);
return i128;
}
static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip,Packet &pkt)
{
char ipstr[128],ipstr2[128],astr[32],astr2[32],tmpstr[256];
@ -498,8 +514,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
if (hops == 1) {
RendezvousKey rk(source,dest);
std::lock_guard<std::mutex> l(s_lastSentRendezvous_l);
RendezvousStats &lr = s_lastSentRendezvous[rk];
std::lock_guard<std::mutex> l(s_rendezvousTracking_l);
RendezvousStats &lr = s_rendezvousTracking[rk];
if ((now - lr.ts) >= 30000) {
++lr.count;
lr.ts = now;
@ -522,18 +538,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
}
}
}
if (toAddrs.empty()) {
std::lock_guard<std::mutex> sib_l(s_siblings_l);
for(auto s=s_siblings.begin();s!=s_siblings.end();++s) {
if (((now - (*s)->lastReceive) < (ZT_PEER_PING_PERIOD * 2))&&((*s)->sibling)) {
if ((*s)->ip4) {
toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*s)->ip4),*s));
} else if ((*s)->ip6) {
toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*s)->ip6),*s));
}
}
}
}
if (toAddrs.empty()) {
s_discardedForwardRate.log(now,pkt.size());
return;
@ -621,8 +625,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
if (sendto(i->first->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))) > 0) {
s_outputRate.log(now,pkt.size());
s_forwardRate.log(now,pkt.size());
if (i->second->sibling)
s_siblingForwardRate.log(now,pkt.size());
i->second->lastSend = now;
}
}
@ -639,17 +641,17 @@ static int bindSocket(struct sockaddr *const bindAddr)
return -1;
}
int f = 1048576;
while (f > 16384) {
int f = 4194304;
while (f > 131072) {
if (setsockopt(s,SOL_SOCKET,SO_RCVBUF,(const char *)&f,sizeof(f)) == 0)
break;
f -= 16384;
f -= 131072;
}
f = 1048576;
while (f > 16384) {
f = 4194304;
while (f > 131072) {
if (setsockopt(s,SOL_SOCKET,SO_SNDBUF,(const char *)&f,sizeof(f)) == 0)
break;
f -= 16384;
f -= 131072;
}
if (bindAddr->sa_family == AF_INET6) {
@ -694,6 +696,10 @@ static void shutdownSigHandler(int sig) { s_run = false; }
int main(int argc,char **argv)
{
std::vector<std::thread> threads;
std::vector<int> sockets;
int v4Sock = -1,v6Sock = -1;
signal(SIGTERM,shutdownSigHandler);
signal(SIGINT,shutdownSigHandler);
signal(SIGQUIT,shutdownSigHandler);
@ -703,6 +709,7 @@ int main(int argc,char **argv)
signal(SIGCHLD,SIG_IGN);
s_startTime = OSUtils::now();
s_geoInit = false;
if (argc < 3) {
printf("Usage: zerotier-root <identity.secret> <config path>" ZT_EOL_S);
@ -799,7 +806,7 @@ int main(int argc,char **argv)
s_relayMaxHops = ZT_RELAY_MAX_HOPS;
try {
s_relayMaxHops = s_config["s_relayMaxHops"];
s_relayMaxHops = s_config["relayMaxHops"];
if (s_relayMaxHops > ZT_PROTO_MAX_HOPS)
s_relayMaxHops = ZT_PROTO_MAX_HOPS;
else if (s_relayMaxHops < 0)
@ -809,68 +816,54 @@ int main(int argc,char **argv)
}
try {
if (s_config.count("s_siblings") > 0) {
auto sibs = s_config["s_siblings"];
if (sibs.is_array()) {
for(long i=0;i<(long)sibs.size();++i) {
auto sib = sibs[i];
if (sib.is_object()) {
std::string idStr = sib["id"];
std::string ipStr = sib["ip"];
Identity id;
if (!id.fromString(idStr.c_str())) {
printf("FATAL: invalid JSON while parsing s_siblings section in config file: invalid identity in sibling entry" ZT_EOL_S);
return 1;
std::string geoIpPath = s_config["geoIp"];
if (geoIpPath.length() > 0) {
FILE *gf = fopen(geoIpPath.c_str(),"rb");
if (gf) {
threads.emplace_back(std::thread([gf]() {
try {
char line[1024];
while (fgets(line,sizeof(line),gf)) {
InetAddress start,end;
float lat,lon;
int field = 0;
for(char *saveptr=nullptr,*f=Utils::stok(line,",\r\n",&saveptr);(f);f=Utils::stok(nullptr,",\r\n",&saveptr)) {
switch(field++) {
case 0:
start.fromString(f);
break;
case 1:
end.fromString(f);
break;
case 2:
lat = strtof(f,nullptr);
break;
case 3:
lon = strtof(f,nullptr);
break;
}
}
if ((start)&&(end)&&(start.ss_family == end.ss_family)&&(lat >= -90.0F)&&(lat <= 90.0F)&&(lon >= -180.0F)&&(lon <= 180.0F)) {
if (start.ss_family == AF_INET) {
s_geoIp4[std::pair< uint32_t,uint32_t >(ip4ToH32(start),ip4ToH32(end))] = std::pair< float,float >(lat,lon);
} else if (start.ss_family == AF_INET6) {
s_geoIp6[std::pair< std::array< uint64_t,2 >,std::array< uint64_t,2 > >(ip6ToH128(start),ip6ToH128(end))] = std::pair< float,float >(lat,lon);
}
}
}
InetAddress ip;
if (!ip.fromString(ipStr.c_str())) {
printf("FATAL: invalid JSON while parsing s_siblings section in config file: invalid IP address in sibling entry" ZT_EOL_S);
return 1;
}
ip.setPort((unsigned int)sib["port"]);
SharedPtr<RootPeer> rp(new RootPeer);
rp->id = id;
if (!s_self.agree(id,rp->key)) {
printf("FATAL: invalid JSON while parsing s_siblings section in config file: invalid identity in sibling entry (unable to execute key agreement)" ZT_EOL_S);
return 1;
}
if (ip.isV4()) {
rp->ip4 = ip;
} else if (ip.isV6()) {
rp->ip6 = ip;
} else {
printf("FATAL: invalid JSON while parsing s_siblings section in config file: invalid IP address in sibling entry" ZT_EOL_S);
return 1;
}
rp->sibling = true;
s_siblings.push_back(rp);
s_peersByIdentity[id] = rp;
s_peersByVirtAddr[id.address()].insert(rp);
s_peersByPhysAddr[ip].insert(rp);
} else {
printf("FATAL: invalid JSON while parsing s_siblings section in config file: sibling entry is not a JSON object" ZT_EOL_S);
return 1;
}
}
} else {
printf("FATAL: invalid JSON while parsing s_siblings section in config file: s_siblings is not a JSON array" ZT_EOL_S);
return 1;
s_geoInit = true;
} catch ( ... ) {}
fclose(gf);
}));
}
}
} catch ( ... ) {
printf("FATAL: invalid JSON while parsing s_siblings section in config file: parse error" ZT_EOL_S);
return 1;
}
} catch ( ... ) {}
unsigned int ncores = std::thread::hardware_concurrency();
if (ncores == 0) ncores = 1;
s_run = true;
std::vector<std::thread> threads;
std::vector<int> sockets;
int v4Sock = -1,v6Sock = -1;
for(auto port=s_ports.begin();port!=s_ports.end();++port) {
for(unsigned int tn=0;tn<ncores;++tn) {
struct sockaddr_in6 in6;
@ -959,6 +952,7 @@ int main(int argc,char **argv)
o << "Physical Addresses: " << s_peersByPhysAddr.size() << ZT_EOL_S;
res.set_content(o.str(),"text/plain");
});
apiServ.Get("/peer",[](const httplib::Request &req,httplib::Response &res) {
char tmp[256];
std::ostringstream o;
@ -1007,13 +1001,55 @@ int main(int argc,char **argv)
o << ']';
res.set_content(o.str(),"application/json");
});
apiServ.Get("/map",[](const httplib::Request &req,httplib::Response &res) {
char tmp[128];
if (!s_geoInit) {
res.set_content("Not enabled or GeoIP CSV file not finished reading.","text/plain");
return;
}
std::ostringstream o;
o << ZT_GEOIP_HTML_HEAD;
{
bool firstCoord = true;
std::pair< uint32_t,uint32_t > k4(0,0);
std::pair< std::array< uint64_t,2 >,std::array< uint64_t,2 > > k6;
k6.second[0] = 0; k6.second[1] = 0;
std::lock_guard<std::mutex> l(s_peersByPhysAddr_l);
for(auto p=s_peersByPhysAddr.begin();p!=s_peersByPhysAddr.end();++p) {
if (!p->second.empty()) {
k4.first = ip4ToH32(p->first);
auto geo = s_geoIp4.lower_bound(k4);
while ((geo != s_geoIp4.end())&&(geo->first.first == k4.first)) {
if (geo->first.second >= k4.first) {
if (!firstCoord)
o << ',';
firstCoord = false;
o << "{lat:" << geo->second.first << ",lng:" << geo->second.second << ",_l:\"";
bool firstAddr = true;
for(auto a=p->second.begin();a!=p->second.end();++a) {
if (!firstAddr)
o << ',';
o << (*a)->id.address().toString(tmp);
firstAddr = false;
}
o << "\"}";
break;
}
}
}
}
}
o << ZT_GEOIP_HTML_TAIL;
res.set_content(o.str(),"text/html");
});
apiServ.listen("127.0.0.1",httpPort,0);
}));
// In the main thread periodically clean stuff up
int64_t lastCleaned = 0;
int64_t lastWroteStats = 0;
int64_t lastPingedSiblings = 0;
while (s_run) {
//s_peersByIdentity_l.lock();
//s_peersByPhysAddr_l.lock();
@ -1024,36 +1060,6 @@ int main(int argc,char **argv)
const int64_t now = OSUtils::now();
// Send HELLO to sibling roots
if ((now - lastPingedSiblings) >= ZT_PEER_PING_PERIOD) {
lastPingedSiblings = now;
std::lock_guard<std::mutex> l(s_siblings_l);
for(auto s=s_siblings.begin();s!=s_siblings.end();++s) {
const InetAddress *ip = nullptr;
socklen_t sl = 0;
Packet outp((*s)->id.address(),s_self.address(),Packet::VERB_HELLO);
outp.append((uint8_t)ZT_PROTO_VERSION);
outp.append((uint8_t)0);
outp.append((uint8_t)0);
outp.append((uint16_t)0);
outp.append((uint64_t)now);
s_self.serialize(outp,false);
if ((*s)->ip4) {
(*s)->ip4.serialize(outp);
ip = &((*s)->ip4);
sl = sizeof(struct sockaddr_in);
} else if ((*s)->ip6) {
(*s)->ip6.serialize(outp);
ip = &((*s)->ip6);
sl = sizeof(struct sockaddr_in6);
}
if (ip) {
outp.armor((*s)->key,false);
sendto(ip->isV4() ? v4Sock : v6Sock,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,sl);
}
}
}
if ((now - lastCleaned) > 120000) {
lastCleaned = now;
@ -1081,7 +1087,7 @@ int main(int argc,char **argv)
{
std::lock_guard<std::mutex> pbi_l(s_peersByIdentity_l);
for(auto p=s_peersByIdentity.begin();p!=s_peersByIdentity.end();) {
if (((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT)&&(!p->second->sibling)) {
if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
std::lock_guard<std::mutex> pbv_l(s_peersByVirtAddr_l);
std::lock_guard<std::mutex> pbp_l(s_peersByPhysAddr_l);
@ -1116,10 +1122,10 @@ int main(int argc,char **argv)
// Remove old rendezvous and last forwarded tracking entries
{
std::lock_guard<std::mutex> l(s_lastSentRendezvous_l);
for(auto lr=s_lastSentRendezvous.begin();lr!=s_lastSentRendezvous.end();) {
std::lock_guard<std::mutex> l(s_rendezvousTracking_l);
for(auto lr=s_rendezvousTracking.begin();lr!=s_rendezvousTracking.end();) {
if ((now - lr->second.ts) > ZT_PEER_ACTIVITY_TIMEOUT)
s_lastSentRendezvous.erase(lr++);
s_rendezvousTracking.erase(lr++);
else ++lr;
}
}
@ -1219,23 +1225,22 @@ int main(int argc,char **argv)
s_peersByPhysAddr_l.lock();
fprintf(sf,"Physical Endpoints : %llu" ZT_EOL_S,(unsigned long long)s_peersByPhysAddr.size());
s_peersByPhysAddr_l.unlock();
s_lastSentRendezvous_l.lock();
s_rendezvousTracking_l.lock();
uint64_t unsuccessfulp2p = 0;
for(auto lr=s_lastSentRendezvous.begin();lr!=s_lastSentRendezvous.end();++lr) {
for(auto lr=s_rendezvousTracking.begin();lr!=s_rendezvousTracking.end();++lr) {
if (lr->second.count > 6) // 6 == two attempts per edge, one for each direction
++unsuccessfulp2p;
}
fprintf(sf,"Recent P2P Graph Edges : %llu" ZT_EOL_S,(unsigned long long)s_lastSentRendezvous.size());
if (s_lastSentRendezvous.empty()) {
fprintf(sf,"Estimated P2P Success Rate : 100.0000%%" ZT_EOL_S);
fprintf(sf,"Recent P2P Graph Edges : %llu" ZT_EOL_S,(unsigned long long)s_rendezvousTracking.size());
if (s_rendezvousTracking.empty()) {
fprintf(sf,"Recent P2P Success Rate : 100.0000%%" ZT_EOL_S);
} else {
fprintf(sf,"Estimated P2P Success Rate : %.4f%%" ZT_EOL_S,(1.0 - ((double)unsuccessfulp2p / (double)s_lastSentRendezvous.size())) * 100.0);
fprintf(sf,"Recent P2P Success Rate : %.4f%%" ZT_EOL_S,(1.0 - ((double)unsuccessfulp2p / (double)s_rendezvousTracking.size())) * 100.0);
}
s_lastSentRendezvous_l.unlock();
s_rendezvousTracking_l.unlock();
fprintf(sf,"Input (MiB/s) : %.4f" ZT_EOL_S,s_inputRate.perSecond(now) / 1048576.0);
fprintf(sf,"Output (MiB/s) : %.4f" ZT_EOL_S,s_outputRate.perSecond(now) / 1048576.0);
fprintf(sf,"Forwarded (MiB/s) : %.4f" ZT_EOL_S,s_forwardRate.perSecond(now) / 1048576.0);
fprintf(sf,"Sibling Forwarded (MiB/s) : %.4f" ZT_EOL_S,s_siblingForwardRate.perSecond(now) / 1048576.0);
fprintf(sf,"Discarded Forward (MiB/s) : %.4f" ZT_EOL_S,s_discardedForwardRate.perSecond(now) / 1048576.0);
fclose(sf);