Implement WHOIS

This commit is contained in:
Adam Ierymenko 2019-08-29 14:27:41 -07:00
parent f6d747a5a0
commit b27a38e55e
No known key found for this signature in database
GPG Key ID: C8877CF2D7A5D7F3

View File

@ -19,6 +19,7 @@
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -60,6 +61,12 @@
using namespace ZeroTier;
using json = nlohmann::json;
#ifdef MSG_DONTWAIT
#define SENDTO_FLAGS MSG_DONTWAIT
#else
#define SENDTO_FLAGS 0
#endif
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
@ -88,20 +95,25 @@ struct RendezvousKey
struct RootPeer
{
ZT_ALWAYS_INLINE RootPeer() : lastReceive(0),lastSync(0),lastEcho(0),lastHello(0) {}
ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
Identity id;
uint8_t key[32];
InetAddress ip4,ip6;
int64_t lastReceive;
int64_t lastSync;
int64_t lastEcho;
int64_t lastHello;
std::mutex lock;
AtomicCounter __refCount;
ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
};
static Identity self;
static std::atomic_bool run;
static json config;
static std::string statsRoot;
static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > multicastSubscriptions;
static std::unordered_map< Identity,SharedPtr<RootPeer>,IdentityHasher > peersByIdentity;
@ -126,8 +138,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
const Address dest(pkt.destination());
const int64_t now = OSUtils::now();
// See if this is destined for us and isn't a fragment / fragmented. (No packets
// understood by the root are fragments/fragmented.)
if ((!fragment)&&(!pkt.fragmented())&&(dest == self.address())) {
SharedPtr<RootPeer> peer;
@ -161,7 +171,6 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
}
peer->id = id;
peer->lastReceive = now;
peer->lastSync = 0;
peersByIdentity.emplace(id,peer);
peersByVirtAddr[id.address()].emplace(peer);
} else {
@ -199,6 +208,8 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
// If we found the peer, update IP and/or time and handle certain key packet types that the
// root must concern itself with.
if (peer) {
std::lock_guard<std::mutex> pl(peer->lock);
InetAddress *const peerIp = ip->isV4() ? &(peer->ip4) : &(peer->ip6);
if (*peerIp != ip) {
std::lock_guard<std::mutex> pbp_l(peersByPhysAddr_l);
@ -220,25 +231,71 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
switch(pkt.verb()) {
case Packet::VERB_HELLO:
try {
const uint64_t origId = pkt.packetId();
const uint64_t ts = pkt.template at<uint64_t>(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP);
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_HELLO);
pkt.append(origId);
pkt.append(ts);
pkt.append((uint8_t)ZT_PROTO_VERSION);
pkt.append((uint8_t)0);
pkt.append((uint8_t)0);
pkt.append((uint16_t)0);
ip->serialize(pkt);
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
//printf("%s <- OK(HELLO)" ZT_EOL_S,ip->toString(ipstr));
if ((now - peer->lastHello) > 1000) {
peer->lastHello = now;
const uint64_t origId = pkt.packetId();
const uint64_t ts = pkt.template at<uint64_t>(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP);
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_HELLO);
pkt.append(origId);
pkt.append(ts);
pkt.append((uint8_t)ZT_PROTO_VERSION);
pkt.append((uint8_t)0);
pkt.append((uint8_t)0);
pkt.append((uint16_t)0);
ip->serialize(pkt);
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
}
} catch ( ... ) {
printf("* unexpected exception handling HELLO from %s" ZT_EOL_S,ip->toString(ipstr));
}
break;
case Packet::VERB_ECHO:
try {
if ((now - peer->lastEcho) > 1000) {
peer->lastEcho = now;
Packet outp(source,self.address(),Packet::VERB_OK);
outp.append((uint8_t)Packet::VERB_ECHO);
outp.append(pkt.packetId());
outp.append(((const uint8_t *)pkt.data()) + ZT_PACKET_IDX_PAYLOAD,pkt.size() - ZT_PACKET_IDX_PAYLOAD);
outp.compress();
outp.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
}
} catch ( ... ) {
printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr));
}
case Packet::VERB_WHOIS:
try {
std::vector< SharedPtr<RootPeer> > results;
{
std::lock_guard<std::mutex> l(peersByVirtAddr_l);
for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+ZT_ADDRESS_LENGTH)<=pkt.size();ptr+=ZT_ADDRESS_LENGTH) {
auto peers = peersByVirtAddr.find(Address(pkt.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH));
if (peers != peersByVirtAddr.end()) {
for(auto p=peers->second.begin();p!=peers->second.end();++p)
results.push_back(*p);
}
}
}
if (!results.empty()) {
const uint64_t origId = pkt.packetId();
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_WHOIS);
pkt.append(origId);
for(auto p=results.begin();p!=results.end();++p)
(*p)->id.serialize(pkt,false);
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
}
} catch ( ... ) {
printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr));
}
case Packet::VERB_MULTICAST_LIKE:
try {
std::lock_guard<std::mutex> l(multicastSubscriptions_l);
@ -289,7 +346,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
if (l > 0) {
pkt.setAt<uint16_t>(countAt,(uint16_t)l);
pkt.armor(peer->key,true);
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
sendto(ip->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
//printf("%s %s gathered %u subscribers to %s/%.8lx on network %.16llx" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),l,mg.mac().toString(tmpstr),(unsigned long)mg.adi(),(unsigned long long)nwid);
}
}
@ -312,7 +369,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
// sending a RENDEZVOUS message.
bool introduce = false;
{
if (!fragment) {
RendezvousKey rk(source,dest);
std::lock_guard<std::mutex> l(lastRendezvous_l);
int64_t &lr = lastRendezvous[rk];
@ -328,10 +385,10 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
auto peers = peersByVirtAddr.find(dest);
if (peers != peersByVirtAddr.end()) {
for(auto p=peers->second.begin();p!=peers->second.end();++p) {
if ((*p)->ip6) {
toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
} else if ((*p)->ip4) {
if ((*p)->ip4) {
toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
} else if ((*p)->ip6) {
toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
}
}
}
@ -358,7 +415,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)16);
outp.append((const uint8_t *)b->second->ip6.rawIpData(),16);
outp.armor((*a)->key,true);
sendto(v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&((*a)->ip6),(socklen_t)sizeof(struct sockaddr_in6));
sendto(v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip6),(socklen_t)sizeof(struct sockaddr_in6));
// Introduce destination to source (V6)
outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS);
@ -368,8 +425,9 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)16);
outp.append((const uint8_t *)ip->rawIpData(),16);
outp.armor(b->second->key,true);
sendto(v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6));
} else if (((*a)->ip4)&&(b->second->ip4)) {
sendto(v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6));
}
if (((*a)->ip4)&&(b->second->ip4)) {
//printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip4.toString(ipstr2),dest.toString(astr2));
// Introduce source to destination (V4)
@ -380,7 +438,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)4);
outp.append((const uint8_t *)b->second->ip4.rawIpData(),4);
outp.armor((*a)->key,true);
sendto(v4s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&((*a)->ip4),(socklen_t)sizeof(struct sockaddr_in));
sendto(v4s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&((*a)->ip4),(socklen_t)sizeof(struct sockaddr_in));
// Introduce destination to source (V4)
outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS);
@ -390,7 +448,7 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
outp.append((uint8_t)4);
outp.append((const uint8_t *)ip->rawIpData(),4);
outp.armor(b->second->key,true);
sendto(v4s,pkt.data(),pkt.size(),0,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in));
sendto(v4s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in));
}
}
}
@ -410,8 +468,10 @@ static void handlePacket(const int v4s,const int v6s,const InetAddress *const ip
}
for(auto i=toAddrs.begin();i!=toAddrs.end();++i) {
//printf("%s -> %s for %s -> %s" ZT_EOL_S,ip->toString(ipstr),i->first->toString(ipstr2),source.toString(astr),dest.toString(astr2));
sendto(i->first->isV4() ? v4s : v6s,pkt.data(),pkt.size(),0,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
//printf("%s -> %s for %s -> %s (%u bytes)" ZT_EOL_S,ip->toString(ipstr),i->first->toString(ipstr2),source.toString(astr),dest.toString(astr2),pkt.size());
if (sendto(i->first->isV4() ? v4s : v6s,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)i->first,(socklen_t)(i->first->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))) <= 0) {
printf("* write error forwarding packet to %s: %s" ZT_EOL_S,i->first->toString(ipstr),strerror(errno));
}
}
}
@ -448,8 +508,11 @@ static int bindSocket(struct sockaddr *bindAddr)
f = 0; setsockopt(s,IPPROTO_IPV6,IPV6_DONTFRAG,&f,sizeof(f));
#endif
}
f = 1; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
#ifdef SO_REUSEPORT
f = 1; setsockopt(s,SOL_SOCKET,SO_REUSEPORT,(void *)&f,sizeof(f));
#else
f = 1; setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(void *)&f,sizeof(f));
#endif
f = 1; setsockopt(s,SOL_SOCKET,SO_BROADCAST,(void *)&f,sizeof(f));
#ifdef IP_DONTFRAG
f = 0; setsockopt(s,IPPROTO_IP,IP_DONTFRAG,&f,sizeof(f));
@ -534,6 +597,15 @@ int main(int argc,char **argv)
} catch ( ... ) {
port = ZT_DEFAULT_PORT;
}
try {
statsRoot = config["statsRoot"];
while ((statsRoot.length() > 0)&&(statsRoot[statsRoot.length()-1] == ZT_PATH_SEPARATOR))
statsRoot = statsRoot.substr(0,statsRoot.length()-1);
if (statsRoot.length() > 0)
OSUtils::mkdir(statsRoot);
} catch ( ... ) {
statsRoot = "";
}
unsigned int ncores = std::thread::hardware_concurrency();
if (ncores == 0) ncores = 1;
@ -567,7 +639,7 @@ int main(int argc,char **argv)
sockets.push_back(s6);
sockets.push_back(s4);
threads.push_back(std::thread([s4,s6]() {
threads.push_back(std::thread([s6,s4]() {
struct sockaddr_in6 in6;
Packet pkt;
memset(&in6,0,sizeof(in6));
@ -590,7 +662,7 @@ int main(int argc,char **argv)
}
}));
threads.push_back(std::thread([s4,s6]() {
threads.push_back(std::thread([s6,s4]() {
struct sockaddr_in in4;
Packet pkt;
memset(&in4,0,sizeof(in4));
@ -616,12 +688,13 @@ int main(int argc,char **argv)
int64_t lastCleanedMulticastSubscriptions = 0;
int64_t lastCleanedPeers = 0;
int64_t lastWroteStats = 0;
while (run) {
peersByIdentity_l.lock();
peersByPhysAddr_l.lock();
printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)peersByIdentity.size(),(unsigned long)peersByPhysAddr.size());
peersByPhysAddr_l.unlock();
peersByIdentity_l.unlock();
//peersByIdentity_l.lock();
//peersByPhysAddr_l.lock();
//printf("*** have %lu peers at %lu physical endpoints" ZT_EOL_S,(unsigned long)peersByIdentity.size(),(unsigned long)peersByPhysAddr.size());
//peersByPhysAddr_l.unlock();
//peersByIdentity_l.unlock();
sleep(1);
const int64_t now = OSUtils::now();
@ -650,38 +723,92 @@ int main(int argc,char **argv)
if ((now - lastCleanedPeers) > 120000) {
lastCleanedPeers = now;
std::lock_guard<std::mutex> pbi_l(peersByIdentity_l);
for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();) {
if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
std::lock_guard<std::mutex> pbv_l(peersByVirtAddr_l);
std::lock_guard<std::mutex> pbp_l(peersByPhysAddr_l);
{
std::lock_guard<std::mutex> pbi_l(peersByIdentity_l);
for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();) {
if ((now - p->second->lastReceive) > ZT_PEER_ACTIVITY_TIMEOUT) {
std::lock_guard<std::mutex> pbv_l(peersByVirtAddr_l);
std::lock_guard<std::mutex> pbp_l(peersByPhysAddr_l);
auto pbv = peersByVirtAddr.find(p->second->id.address());
if (pbv != peersByVirtAddr.end()) {
pbv->second.erase(p->second);
if (pbv->second.empty())
peersByVirtAddr.erase(pbv);
}
if (p->second->ip4) {
auto pbp = peersByPhysAddr.find(p->second->ip4);
if (pbp != peersByPhysAddr.end()) {
pbp->second.erase(p->second);
if (pbp->second.empty())
peersByPhysAddr.erase(pbp);
auto pbv = peersByVirtAddr.find(p->second->id.address());
if (pbv != peersByVirtAddr.end()) {
pbv->second.erase(p->second);
if (pbv->second.empty())
peersByVirtAddr.erase(pbv);
}
}
if (p->second->ip6) {
auto pbp = peersByPhysAddr.find(p->second->ip6);
if (pbp != peersByPhysAddr.end()) {
pbp->second.erase(p->second);
if (pbp->second.empty())
peersByPhysAddr.erase(pbp);
}
}
peersByIdentity.erase(p++);
} else ++p;
if (p->second->ip4) {
auto pbp = peersByPhysAddr.find(p->second->ip4);
if (pbp != peersByPhysAddr.end()) {
pbp->second.erase(p->second);
if (pbp->second.empty())
peersByPhysAddr.erase(pbp);
}
}
if (p->second->ip6) {
auto pbp = peersByPhysAddr.find(p->second->ip6);
if (pbp != peersByPhysAddr.end()) {
pbp->second.erase(p->second);
if (pbp->second.empty())
peersByPhysAddr.erase(pbp);
}
}
peersByIdentity.erase(p++);
} else ++p;
}
}
{
std::lock_guard<std::mutex> l(lastRendezvous_l);
for(auto lr=lastRendezvous.begin();lr!=lastRendezvous.end();) {
if ((now - lr->second) > ZT_PEER_ACTIVITY_TIMEOUT)
lastRendezvous.erase(lr++);
else ++lr;
}
}
}
if (((now - lastWroteStats) > 15000)&&(statsRoot.length() > 0)) {
lastWroteStats = now;
std::string peersFilePath(statsRoot);
peersFilePath.append("/peers.tmp");
FILE *pf = fopen(peersFilePath.c_str(),"wb");
if (pf) {
std::vector< SharedPtr<RootPeer> > sp;
{
std::lock_guard<std::mutex> pbi_l(peersByIdentity_l);
sp.reserve(peersByIdentity.size());
for(auto p=peersByIdentity.begin();p!=peersByIdentity.end();++p) {
sp.push_back(p->second);
}
}
std::sort(sp.begin(),sp.end(),[](const SharedPtr<RootPeer> &a,const SharedPtr<RootPeer> &b) { return (a->id < b->id); });
char ip4[128],ip6[128];
for(auto p=sp.begin();p!=sp.end();++p) {
if ((*p)->ip4) {
(*p)->ip4.toString(ip4);
} else {
ip4[0] = '-';
ip4[1] = 0;
}
if ((*p)->ip6) {
(*p)->ip6.toString(ip6);
} else {
ip6[0] = '-';
ip6[1] = 0;
}
fprintf(pf,"%.10llx %21s %45s %5.4f" ZT_EOL_S,(unsigned long long)(*p)->id.address().toInt(),ip4,ip6,fabs((double)(now - (*p)->lastReceive) / 1000.0));
}
fclose(pf);
std::string peersFilePath2(statsRoot);
peersFilePath2.append("/peers");
OSUtils::rm(peersFilePath2);
OSUtils::rename(peersFilePath.c_str(),peersFilePath2.c_str());
}
}
}