This commit is contained in:
Adam Ierymenko 2021-09-20 15:04:25 -04:00
parent bcf8c30ce0
commit 104fade420

View File

@ -273,11 +273,6 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
printf("%s HELLO rejected: identity address collision!" ZT_EOL_S,ip->toString(ipstr)); printf("%s HELLO rejected: identity address collision!" ZT_EOL_S,ip->toString(ipstr));
// TODO: send error // TODO: send error
return; return;
} else {
printf("* invalid identity found and discarded: %s" ZT_EOL_S,id.toString(false, tmpstr));
std::lock_guard<std::mutex> p_l(s_peersByVirtAddr_l);
s_peersByVirtAddr.erase(source);
peer.zero();
} }
} }
} }
@ -327,195 +322,197 @@ static void handlePacket(const int sock,const InetAddress *const ip,Packet &pkt)
peer = p->second; peer = p->second;
} }
} }
if (!pkt.dearmor(peer->key)) { if (peer) {
printf("%s HELLO rejected: packet authentication failed" ZT_EOL_S,ip->toString(ipstr)); if (!pkt.dearmor(peer->key)) {
return; printf("%s HELLO rejected: packet authentication failed" ZT_EOL_S,ip->toString(ipstr));
} return;
if (!pkt.uncompress()) { }
printf("%s packet rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr)); if (!pkt.uncompress()) {
printf("%s packet rejected: decompression failed" ZT_EOL_S,ip->toString(ipstr));
return;
}
} else {
return; return;
} }
} }
if (likely(peer)) { const int64_t now = OSUtils::now();
const int64_t now = OSUtils::now();
if (ip->isV4()) { if (ip->isV4()) {
peer->ip4 = ip; peer->ip4 = ip;
peer->v4s = sock; peer->v4s = sock;
peer->lastReceiveV4 = now; peer->lastReceiveV4 = now;
if ((now - peer->lastReceiveV6) > ZT_PEER_ACTIVITY_TIMEOUT) if ((now - peer->lastReceiveV6) > ZT_PEER_ACTIVITY_TIMEOUT)
peer->v6s = -1; peer->v6s = -1;
} else if (ip->isV6()) { } else if (ip->isV6()) {
peer->ip6 = ip; peer->ip6 = ip;
peer->v6s = sock; peer->v6s = sock;
peer->lastReceiveV6 = now; peer->lastReceiveV6 = now;
if ((now - peer->lastReceiveV4) > ZT_PEER_ACTIVITY_TIMEOUT) if ((now - peer->lastReceiveV4) > ZT_PEER_ACTIVITY_TIMEOUT)
peer->v4s = -1; peer->v4s = -1;
} }
peer->lastReceive = now; peer->lastReceive = now;
switch(pkt.verb()) { switch(pkt.verb()) {
case Packet::VERB_HELLO: case Packet::VERB_HELLO:
try { try {
if ((now - peer->lastHello) > 250) { if ((now - peer->lastHello) > 250) {
peer->lastHello = now; peer->lastHello = now;
peer->vProto = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_PROTOCOL_VERSION]; peer->vProto = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_PROTOCOL_VERSION];
peer->vMajor = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_MAJOR_VERSION]; peer->vMajor = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_MAJOR_VERSION];
peer->vMinor = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_MINOR_VERSION]; peer->vMinor = (int)pkt[ZT_PROTO_VERB_HELLO_IDX_MINOR_VERSION];
peer->vRev = (int)pkt.template at<uint16_t>(ZT_PROTO_VERB_HELLO_IDX_REVISION); peer->vRev = (int)pkt.template at<uint16_t>(ZT_PROTO_VERB_HELLO_IDX_REVISION);
const uint64_t origId = pkt.packetId(); const uint64_t origId = pkt.packetId();
const uint64_t ts = pkt.template at<uint64_t>(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP); const uint64_t ts = pkt.template at<uint64_t>(ZT_PROTO_VERB_HELLO_IDX_TIMESTAMP);
pkt.reset(source,s_self.address(),Packet::VERB_OK); pkt.reset(source,s_self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_HELLO); pkt.append((uint8_t)Packet::VERB_HELLO);
pkt.append(origId); pkt.append(origId);
pkt.append(ts); pkt.append(ts);
pkt.append((uint8_t)ZT_PROTO_VERSION); pkt.append((uint8_t)ZT_PROTO_VERSION);
pkt.append((uint8_t)0); pkt.append((uint8_t)0);
pkt.append((uint8_t)0); pkt.append((uint8_t)0);
pkt.append((uint16_t)0); pkt.append((uint16_t)0);
ip->serialize(pkt); ip->serialize(pkt);
if (peer->vProto < 20) { // send planet file for pre-2.x peers if (peer->vProto < 20) { // send planet file for pre-2.x peers
std::lock_guard<std::mutex> pl(s_planet_l); std::lock_guard<std::mutex> pl(s_planet_l);
if (s_planet.length() > 0) { if (s_planet.length() > 0) {
pkt.append((uint16_t)s_planet.size()); pkt.append((uint16_t)s_planet.size());
pkt.append((const uint8_t *)s_planet.data(),s_planet.size()); pkt.append((const uint8_t *)s_planet.data(),s_planet.size());
}
}
pkt.armor(peer->key,true);
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,pkt.size());
peer->lastSend = now;
}
} catch ( ... ) {
printf("* unexpected exception handling HELLO from %s" ZT_EOL_S,ip->toString(ipstr));
}
break;
case Packet::VERB_ECHO:
try {
if ((now - peer->lastEcho) > 500) {
peer->lastEcho = now;
Packet outp(source,s_self.address(),Packet::VERB_OK);
outp.append((uint8_t)Packet::VERB_ECHO);
outp.append(pkt.packetId());
outp.append(((const uint8_t *)pkt.data()) + ZT_PACKET_IDX_PAYLOAD,pkt.size() - ZT_PACKET_IDX_PAYLOAD);
outp.compress();
outp.armor(peer->key,true);
sendto(sock,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,outp.size());
peer->lastSend = now;
}
} catch ( ... ) {
printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr));
}
case Packet::VERB_WHOIS:
try {
std::vector< SharedPtr<RootPeer> > results;
results.reserve(4);
{
std::lock_guard<std::mutex> l(s_peersByVirtAddr_l);
for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+ZT_ADDRESS_LENGTH)<=pkt.size();ptr+=ZT_ADDRESS_LENGTH) {
auto p = s_peersByVirtAddr.find(Address(pkt.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH));
if (p != s_peersByVirtAddr.end()) {
results.push_back(p->second);
}
} }
} }
pkt.armor(peer->key,true);
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
if (!results.empty()) { s_outputRate.log(now,pkt.size());
const uint64_t origId = pkt.packetId(); peer->lastSend = now;
pkt.reset(source,s_self.address(),Packet::VERB_OK); }
pkt.append((uint8_t)Packet::VERB_WHOIS); } catch ( ... ) {
pkt.append(origId); printf("* unexpected exception handling HELLO from %s" ZT_EOL_S,ip->toString(ipstr));
for(auto p=results.begin();p!=results.end();++p) }
(*p)->id.serialize(pkt,false); break;
pkt.armor(peer->key,true);
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,pkt.size()); case Packet::VERB_ECHO:
peer->lastSend = now; try {
if ((now - peer->lastEcho) > 500) {
peer->lastEcho = now;
Packet outp(source,s_self.address(),Packet::VERB_OK);
outp.append((uint8_t)Packet::VERB_ECHO);
outp.append(pkt.packetId());
outp.append(((const uint8_t *)pkt.data()) + ZT_PACKET_IDX_PAYLOAD,pkt.size() - ZT_PACKET_IDX_PAYLOAD);
outp.compress();
outp.armor(peer->key,true);
sendto(sock,outp.data(),outp.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,outp.size());
peer->lastSend = now;
}
} catch ( ... ) {
printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr));
}
case Packet::VERB_WHOIS:
try {
std::vector< SharedPtr<RootPeer> > results;
results.reserve(4);
{
std::lock_guard<std::mutex> l(s_peersByVirtAddr_l);
for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+ZT_ADDRESS_LENGTH)<=pkt.size();ptr+=ZT_ADDRESS_LENGTH) {
auto p = s_peersByVirtAddr.find(Address(pkt.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH));
if (p != s_peersByVirtAddr.end()) {
results.push_back(p->second);
}
} }
} catch ( ... ) {
printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr));
} }
case Packet::VERB_MULTICAST_LIKE: if (!results.empty()) {
try {
std::lock_guard<std::mutex> l(s_multicastSubscriptions_l);
for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+18)<=pkt.size();ptr+=18) {
const uint64_t nwid = pkt.template at<uint64_t>(ptr);
const MulticastGroup mg(MAC(pkt.field(ptr + 8,6),6),pkt.template at<uint32_t>(ptr + 14));
s_multicastSubscriptions[nwid][mg][source] = now;
}
} catch ( ... ) {
printf("* unexpected exception handling MULTICAST_LIKE from %s" ZT_EOL_S,ip->toString(ipstr));
}
break;
case Packet::VERB_MULTICAST_GATHER:
try {
const uint64_t nwid = pkt.template at<uint64_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID);
//const unsigned int flags = pkt[ZT_PROTO_VERB_MULTICAST_GATHER_IDX_FLAGS];
const MulticastGroup mg(MAC(pkt.field(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC,6),6),pkt.template at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI));
unsigned int gatherLimit = pkt.template at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT);
if (gatherLimit > 255)
gatherLimit = 255;
const uint64_t origId = pkt.packetId(); const uint64_t origId = pkt.packetId();
pkt.reset(source,s_self.address(),Packet::VERB_OK); pkt.reset(source,s_self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_MULTICAST_GATHER); pkt.append((uint8_t)Packet::VERB_WHOIS);
pkt.append(origId); pkt.append(origId);
pkt.append(nwid); for(auto p=results.begin();p!=results.end();++p)
mg.mac().appendTo(pkt); (*p)->id.serialize(pkt,false);
pkt.append((uint32_t)mg.adi()); pkt.armor(peer->key,true);
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)((ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
{ s_outputRate.log(now,pkt.size());
std::lock_guard<std::mutex> l(s_multicastSubscriptions_l); peer->lastSend = now;
auto forNet = s_multicastSubscriptions.find(nwid); }
if (forNet != s_multicastSubscriptions.end()) { } catch ( ... ) {
auto forGroup = forNet->second.find(mg); printf("* unexpected exception handling ECHO from %s" ZT_EOL_S,ip->toString(ipstr));
if (forGroup != forNet->second.end()) { }
pkt.append((uint32_t)forGroup->second.size());
const unsigned int countAt = pkt.size();
pkt.addSize(2);
unsigned int l = 0; case Packet::VERB_MULTICAST_LIKE:
for(auto g=forGroup->second.begin();((l<gatherLimit)&&(g!=forGroup->second.end()));++g) { try {
if (g->first != source) { std::lock_guard<std::mutex> l(s_multicastSubscriptions_l);
++l; for(unsigned int ptr=ZT_PACKET_IDX_PAYLOAD;(ptr+18)<=pkt.size();ptr+=18) {
g->first.appendTo(pkt); const uint64_t nwid = pkt.template at<uint64_t>(ptr);
} const MulticastGroup mg(MAC(pkt.field(ptr + 8,6),6),pkt.template at<uint32_t>(ptr + 14));
s_multicastSubscriptions[nwid][mg][source] = now;
}
} catch ( ... ) {
printf("* unexpected exception handling MULTICAST_LIKE from %s" ZT_EOL_S,ip->toString(ipstr));
}
break;
case Packet::VERB_MULTICAST_GATHER:
try {
const uint64_t nwid = pkt.template at<uint64_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID);
//const unsigned int flags = pkt[ZT_PROTO_VERB_MULTICAST_GATHER_IDX_FLAGS];
const MulticastGroup mg(MAC(pkt.field(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC,6),6),pkt.template at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI));
unsigned int gatherLimit = pkt.template at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT);
if (gatherLimit > 255)
gatherLimit = 255;
const uint64_t origId = pkt.packetId();
pkt.reset(source,s_self.address(),Packet::VERB_OK);
pkt.append((uint8_t)Packet::VERB_MULTICAST_GATHER);
pkt.append(origId);
pkt.append(nwid);
mg.mac().appendTo(pkt);
pkt.append((uint32_t)mg.adi());
{
std::lock_guard<std::mutex> l(s_multicastSubscriptions_l);
auto forNet = s_multicastSubscriptions.find(nwid);
if (forNet != s_multicastSubscriptions.end()) {
auto forGroup = forNet->second.find(mg);
if (forGroup != forNet->second.end()) {
pkt.append((uint32_t)forGroup->second.size());
const unsigned int countAt = pkt.size();
pkt.addSize(2);
unsigned int l = 0;
for(auto g=forGroup->second.begin();((l<gatherLimit)&&(g!=forGroup->second.end()));++g) {
if (g->first != source) {
++l;
g->first.appendTo(pkt);
} }
}
if (l > 0) { if (l > 0) {
pkt.setAt<uint16_t>(countAt,(uint16_t)l); pkt.setAt<uint16_t>(countAt,(uint16_t)l);
pkt.armor(peer->key,true); pkt.armor(peer->key,true);
sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))); sendto(sock,pkt.data(),pkt.size(),SENDTO_FLAGS,(const struct sockaddr *)ip,(socklen_t)(ip->isV4() ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)));
s_outputRate.log(now,pkt.size()); s_outputRate.log(now,pkt.size());
peer->lastSend = now; peer->lastSend = now;
}
} }
} }
} }
} catch ( ... ) {
printf("* unexpected exception handling MULTICAST_GATHER from %s" ZT_EOL_S,ip->toString(ipstr));
} }
break; } catch ( ... ) {
printf("* unexpected exception handling MULTICAST_GATHER from %s" ZT_EOL_S,ip->toString(ipstr));
}
break;
default: default:
break; break;
}
return;
} }
return;
} }
// If we made it here, we are forwarding this packet to someone else and also possibly // If we made it here, we are forwarding this packet to someone else and also possibly