Add RENDEZVOUS to high throughput root

This commit is contained in:
Adam Ierymenko 2019-08-29 07:17:18 -07:00
parent 86762d2b40
commit 6e3e09bed4
No known key found for this signature in database
GPG Key ID: C8877CF2D7A5D7F3

View File

@ -74,7 +74,25 @@ struct AddressHasher { ZT_ALWAYS_INLINE std::size_t operator()(const Address &a)
struct InetAddressHasher { ZT_ALWAYS_INLINE std::size_t operator()(const InetAddress &ip) const { return (std::size_t)ip.hashCode(); } };
struct MulticastGroupHasher { ZT_ALWAYS_INLINE std::size_t operator()(const MulticastGroup &mg) const { return (std::size_t)mg.hashCode(); } };
struct PeerInfo
struct RendezvousKey
{
RendezvousKey(const Address aa,const Address bb)
{
if (aa > bb) {
a = aa;
b = bb;
} else {
a = bb;
b = aa;
}
}
Address a,b;
ZT_ALWAYS_INLINE bool operator==(const RendezvousKey &k) const { return ((a == k.a)&&(b == k.b)); }
ZT_ALWAYS_INLINE bool operator!=(const RendezvousKey &k) const { return ((a != k.a)||(b != k.b)); }
struct Hasher { ZT_ALWAYS_INLINE std::size_t operator()(const RendezvousKey &k) const { return (std::size_t)(k.a.toInt() ^ k.b.toInt()); } };
};
struct RootPeer
{
Identity id;
uint8_t key[32];
@ -84,31 +102,36 @@ struct PeerInfo
AtomicCounter __refCount;
ZT_ALWAYS_INLINE ~PeerInfo() { Utils::burn(key,sizeof(key)); }
ZT_ALWAYS_INLINE ~RootPeer() { Utils::burn(key,sizeof(key)); }
};
static Identity self;
static std::atomic_bool run;
static std::unordered_map< uint64_t,std::unordered_map< MulticastGroup,std::unordered_map< Address,int64_t,AddressHasher >,MulticastGroupHasher > > multicastSubscriptions;
static std::unordered_map< Identity,SharedPtr<PeerInfo>,IdentityHasher > peersByIdentity;
static std::unordered_map< Address,std::set< SharedPtr<PeerInfo> >,AddressHasher > peersByVirtAddr;
static std::unordered_map< InetAddress,std::set< SharedPtr<PeerInfo> >,InetAddressHasher > peersByPhysAddr;
static std::unordered_map< Identity,SharedPtr<RootPeer>,IdentityHasher > peersByIdentity;
static std::unordered_map< Address,std::set< SharedPtr<RootPeer> >,AddressHasher > peersByVirtAddr;
static std::unordered_map< InetAddress,std::set< SharedPtr<RootPeer> >,InetAddressHasher > peersByPhysAddr;
static std::unordered_map< RendezvousKey,int64_t,RendezvousKey::Hasher > lastRendezvous;
static std::mutex multicastSubscriptions_l;
static std::mutex peersByIdentity_l;
static std::mutex peersByVirtAddr_l;
static std::mutex peersByPhysAddr_l;
static std::mutex lastRendezvous_l;
static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
{
char ipstr[128],ipstr2[128],astr[32],tmpstr[256];
char ipstr[128],ipstr2[128],astr[32],astr2[32],tmpstr[256];
const bool fragment = pkt[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR;
const Address source(pkt.source());
const Address dest(pkt.destination());
const int64_t now = OSUtils::now();
// See if this is destined for us and isn't a fragment / fragmented. (No packets
// understood by the root are fragments/fragmented.)
if ((!fragment)&&(!pkt.fragmented())&&(pkt.destination() == self.address())) {
SharedPtr<PeerInfo> peer;
if ((!fragment)&&(!pkt.fragmented())&&(dest == self.address())) {
SharedPtr<RootPeer> peer;
// If this is an un-encrypted HELLO, either learn a new peer or verify
// that this is a peer we already know.
@ -120,7 +143,7 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
auto pById = peersByIdentity.find(id);
if (pById != peersByIdentity.end()) {
peer = pById->second;
//printf("%s has %s (known (1))" ZT_EOL_S,ip->toString(ipstr),pkt.source().toString(astr));
//printf("%s has %s (known (1))" ZT_EOL_S,ip->toString(ipstr),source().toString(astr));
}
}
if (peer) {
@ -129,7 +152,7 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
return;
}
} else {
peer.set(new PeerInfo);
peer.set(new RootPeer);
if (self.agree(id,peer->key)) {
if (pkt.dearmor(peer->key)) {
peer->id = id;
@ -158,12 +181,12 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
// short ZT address successfully decrypt the packet.
if (!peer) {
std::lock_guard<std::mutex> pbv_l(peersByVirtAddr_l);
auto peers = peersByVirtAddr.find(pkt.source());
auto peers = peersByVirtAddr.find(source);
if (peers != peersByVirtAddr.end()) {
for(auto p=peers->second.begin();p!=peers->second.end();++p) {
if (pkt.dearmor((*p)->key)) {
peer = (*p);
//printf("%s has %s (known (2))" ZT_EOL_S,ip->toString(ipstr),pkt.source().toString(astr));
//printf("%s has %s (known (2))" ZT_EOL_S,ip->toString(ipstr),source().toString(astr));
break;
}
}
@ -195,7 +218,7 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
try {
const uint64_t origId = pkt.packetId();
const uint64_t ts = pkt.template at<uint64_t>(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP);
pkt.reset(pkt.source(),self.address(),Packet::VERB_OK);
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_HELLO);
pkt.append(origId);
pkt.append(ts);
@ -236,7 +259,7 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
gatherLimit = 255;
const uint64_t origId = pkt.packetId();
pkt.reset(pkt.source(),self.address(),Packet::VERB_OK);
pkt.reset(source,self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_MULTICAST_GATHER);
pkt.append(origId);
pkt.append(nwid);
@ -275,42 +298,106 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
}
}
std::vector<InetAddress> toAddrs;
// If we made it here, we are forwarding this packet to someone else and also possibly
// sending a RENDEZVOUS message.
bool introduce = false;
{
RendezvousKey rk(source,dest);
std::lock_guard<std::mutex> l(lastRendezvous_l);
int64_t &lr = lastRendezvous[rk];
if ((now - lr) >= 45000) {
lr = now;
introduce = true;
}
}
std::vector< std::pair< InetAddress *,SharedPtr<RootPeer> > > toAddrs;
{
const int64_t now = OSUtils::now();
std::lock_guard<std::mutex> pbv_l(peersByVirtAddr_l);
auto peers = peersByVirtAddr.find(pkt.destination());
auto peers = peersByVirtAddr.find(dest);
if (peers != peersByVirtAddr.end()) {
for(auto p=peers->second.begin();p!=peers->second.end();++p) {
if ((now - (*p)->lastReceive) < ZT_PEER_ACTIVITY_TIMEOUT) {
if ((*p)->ip6)
toAddrs.push_back((*p)->ip6);
else if ((*p)->ip4)
toAddrs.push_back((*p)->ip4);
if ((*p)->ip6) {
toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip6),*p));
} else if ((*p)->ip4) {
toAddrs.push_back(std::pair< InetAddress *,SharedPtr<RootPeer> >(&((*p)->ip4),*p));
}
}
}
}
}
if (toAddrs.empty()) {
//printf("%s not forwarding to %s: no destinations found" ZT_EOL_S,ip->toString(ipstr),pkt.destination().toString(astr));
//printf("%s not forwarding to %s: no destinations found" ZT_EOL_S,ip->toString(ipstr),dest().toString(astr));
return;
}
if (introduce) {
std::lock_guard<std::mutex> l(peersByVirtAddr_l);
auto sources = peersByVirtAddr.find(source);
for(auto a=sources->second.begin();a!=sources->second.end();++a) {
for(auto b=toAddrs.begin();b!=toAddrs.end();++b) {
if (((*a)->ip6 == *ip)&&(b->second->ip6)) {
printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip6.toString(ipstr2),dest.toString(astr2));
Packet outp(source,self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
dest.appendTo(outp);
outp.append((uint16_t)b->second->ip6.port());
outp.append((uint8_t)16);
outp.append((const uint8_t *)b->second->ip6.rawIpData(),16);
outp.armor((*a)->key,true);
sendto(sock,pkt.data(),pkt.size(),0,(const struct sockaddr *)ip,(socklen_t)sizeof(struct sockaddr_in6));
outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
source.appendTo(outp);
outp.append((uint16_t)ip->port());
outp.append((uint8_t)16);
outp.append((const uint8_t *)ip->rawIpData(),16);
outp.armor(b->second->key,true);
sendto(sock,pkt.data(),pkt.size(),0,(const struct sockaddr *)&(b->second->ip6),(socklen_t)sizeof(struct sockaddr_in6));
} else if (((*a)->ip4 == *ip)&&(b->second->ip4)) {
printf("* introducing %s(%s) to %s(%s)" ZT_EOL_S,ip->toString(ipstr),source.toString(astr),b->second->ip4.toString(ipstr2),dest.toString(astr2));
Packet outp(source,self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
dest.appendTo(outp);
outp.append((uint16_t)b->second->ip4.port());
outp.append((uint8_t)4);
outp.append((const uint8_t *)b->second->ip4.rawIpData(),4);
outp.armor((*a)->key,true);
sendto(sock,pkt.data(),pkt.size(),0,(const struct sockaddr *)ip,(socklen_t)sizeof(struct sockaddr_in));
outp.reset(dest,self.address(),Packet::VERB_RENDEZVOUS);
outp.append((uint8_t)0);
source.appendTo(outp);
outp.append((uint16_t)ip->port());
outp.append((uint8_t)4);
outp.append((const uint8_t *)ip->rawIpData(),4);
outp.armor(b->second->key,true);
sendto(sock,pkt.data(),pkt.size(),0,(const struct sockaddr *)&(b->second->ip4),(socklen_t)sizeof(struct sockaddr_in));
}
}
}
}
if (fragment) {
if (reinterpret_cast<Packet::Fragment *>(&pkt)->incrementHops() >= ZT_PROTO_MAX_HOPS) {
printf("%s refused to forward to %s: max hop count exceeded" ZT_EOL_S,ip->toString(ipstr),pkt.destination().toString(astr));
printf("%s refused to forward to %s: max hop count exceeded" ZT_EOL_S,ip->toString(ipstr),dest.toString(astr));
return;
}
} else {
if (pkt.incrementHops() >= ZT_PROTO_MAX_HOPS) {
printf("%s refused to forward to %s: max hop count exceeded" ZT_EOL_S,ip->toString(ipstr),pkt.destination().toString(astr));
printf("%s refused to forward to %s: max hop count exceeded" ZT_EOL_S,ip->toString(ipstr),dest.toString(astr));
return;
}
}
for(auto i=toAddrs.begin();i!=toAddrs.end();++i) {
//printf("%s -> %s for %s" ZT_EOL_S,ip->toString(ipstr),i->toString(ipstr2),pkt.destination().toString(astr));
sendto(sock,pkt.data(),pkt.size(),0,(const struct sockaddr *)&(*i),(socklen_t)((i->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
//printf("%s -> %s for %s" ZT_EOL_S,ip->toString(ipstr),i->toString(ipstr2),dest().toString(astr));
sendto(sock,pkt.data(),pkt.size(),0,(const struct sockaddr *)i->first,(socklen_t)((i->first->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
}
}