diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp index fa0062b05..41fa7ff51 100644 --- a/node/Multicaster.hpp +++ b/node/Multicaster.hpp @@ -206,7 +206,7 @@ public: } /** - * Choose peers to send a propagating multicast to + * Choose peers for multicast propagation via random selection * * @param prng Random source * @param topology Topology object or mock thereof @@ -223,7 +223,67 @@ public: * @tparam P Type of peers, which is SharedPtr in running code or a mock in simulation (mock must behave like a pointer type) */ template - inline unsigned int pickNextPropagationPeers( + inline unsigned int pickRandomPropagationPeers( + CMWC4096 &prng, + T &topology, + uint64_t nwid, + const MulticastGroup &mg, + const Address &originalSubmitter, + const Address &upstream, + MulticastBloomFilter &bf, + unsigned int max, + P *peers, + uint64_t now) + { + unsigned int chosen = 0; + Mutex::Lock _l(_multicastMemberships_m); + std::map< MulticastChannel,std::vector >::iterator mm(_multicastMemberships.find(MulticastChannel(nwid,mg))); + if ((mm != _multicastMemberships.end())&&(!mm->second.empty())) { + for(unsigned int stries=0;((striessecond[prng.next32() % mm->second.size()]; + unsigned int sum = m.first.sum(); + if ( + ((now - m.second) < ZT_MULTICAST_LIKE_EXPIRE)&& /* LIKE is not expired */ + (!bf.contains(sum))&& /* Not in propagation bloom */ + (m.first != originalSubmitter)&& /* Not the original submitter */ + (m.first != upstream) ) { /* Not where the frame came from */ + P peer(topology.getPeer(m.first)); + if (peer) { + unsigned int chk = 0; + while (chk < chosen) { + if (peers[chk++] == peer) + break; + } + if (chk == chosen) { /* not already picked */ + peers[chosen++] = peer; + bf.set(sum); + } + } + } + } + } + return 0; + } + + /** + * Choose peers for multicast propagation via implicit social switching + * + * @param prng Random source + * @param topology Topology object or mock thereof + * @param nwid Network ID + * @param mg Multicast group + * @param originalSubmitter Original submitter of multicast message to network + * @param upstream Address from which message originated, or null (0) address if none + * @param bf Bloom filter, updated in place with sums of addresses in chosen peers and/or decay + * @param max Maximum number of peers to pick + * @param peers Array of objects of type P to fill with up to [max] peers + * @param now Current timestamp + * @return Number of peers actually stored in peers array + * @tparam T Type of topology, which is Topology in running code or a mock in simulation + * @tparam P Type of peers, which is SharedPtr in running code or a mock in simulation (mock must behave like a pointer type) + */ + template + inline unsigned int pickSocialPropagationPeers( CMWC4096 &prng, T &topology, uint64_t nwid, diff --git a/node/PacketDecoder.cpp b/node/PacketDecoder.cpp index 14f0d5021..b4bc21e00 100644 --- a/node/PacketDecoder.cpp +++ b/node/PacketDecoder.cpp @@ -496,6 +496,24 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared SharedPtr network(_r->nc->network(at(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_NETWORK_ID))); if ((network)&&(network->isAllowed(source()))) { Address originalSubmitterAddress(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SUBMITTER_ADDRESS,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); + + if (originalSubmitterAddress.isReserved()) { + TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): invalid original submitter address",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str()); + return true; + } + if (originalSubmitterAddress == _r->identity.address()) { + TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): boomerang!",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str()); + return true; + } + + SharedPtr originalSubmitter(_r->topology->getPeer(originalSubmitterAddress)); + if (!originalSubmitter) { + TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str()); + _r->sw->requestWhois(originalSubmitterAddress); + _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP; + return false; // try again if/when we get OK(WHOIS) + } + MAC fromMac(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SOURCE_MAC,6)); MulticastGroup mg(MAC(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_DESTINATION_MAC,6)),at(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_ADI)); unsigned int hops = (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT]; @@ -504,94 +522,100 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared unsigned int signaturelen = at(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_SIGNATURE_LENGTH); unsigned char *dataAndSignature = field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_PAYLOAD,datalen + signaturelen); + if (!Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) { + LOG("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): FAILED SIGNATURE CHECK (spoofed original submitter?)",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str()); + return true; + } + + if (++hops >= ZT_MULTICAST_PROPAGATION_DEPTH) { + TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): max depth reached",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str()); + return true; + } + uint64_t mccrc = Multicaster::computeMulticastDedupCrc(network->id(),fromMac,mg,etherType,dataAndSignature,datalen); uint64_t now = Utils::now(); bool isDuplicate = _r->multicaster->checkDuplicate(mccrc,now); - if (originalSubmitterAddress == _r->identity.address()) { - // Technically should not happen, since the original submitter is - // excluded from consideration as a propagation recipient. - TRACE("dropped boomerang MULTICAST_FRAME received from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str()); - } else if ((!isDuplicate)||(_r->topology->amSupernode())) { - // - // If I am a supernode, I will repeatedly propagate duplicates. That's - // because supernodes are used to bridge sparse multicast groups. Non- - // supernodes will ignore duplicates completely. - // - // TODO: supernodes should keep a local bloom filter too and OR it with - // the bloom from the packet in order to pick different recipients each - // time a multicast returns to them for repropagation. - // - - SharedPtr originalSubmitter(_r->topology->getPeer(originalSubmitterAddress)); - if (!originalSubmitter) { - TRACE("requesting WHOIS on original multicast frame submitter %s",originalSubmitterAddress.toString().c_str()); - _r->sw->requestWhois(originalSubmitterAddress); - _step = DECODE_WAITING_FOR_MULTICAST_FRAME_ORIGINAL_SENDER_LOOKUP; - return false; // try again if/when we get OK(WHOIS) - } else if (Multicaster::verifyMulticastPacket(originalSubmitter->identity(),network->id(),fromMac,mg,etherType,dataAndSignature,datalen,dataAndSignature + datalen,signaturelen)) { - // In checking the multicast rate, we don't re-check if this is - // a duplicate. That's because if isDuplicate is true it means - // we're a supernode and it's a second pass relay. - if ((isDuplicate)||(network->multicastRateGate(originalSubmitter->address(),datalen))) { - _r->multicaster->addToDedupHistory(mccrc,now); - - // Even if we are a supernode, we still don't repeatedly inject - // duplicates into our own tap. - if (!isDuplicate) - network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen); - - if (++hops < ZT_MULTICAST_PROPAGATION_DEPTH) { - Address upstream(source()); // save this since we mangle it - - Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES)); - SharedPtr propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; - unsigned int np = _r->multicaster->pickNextPropagationPeers( - *(_r->prng), - *(_r->topology), - network->id(), - mg, - originalSubmitterAddress, - upstream, - bloom, - ZT_MULTICAST_PROPAGATION_BREADTH, - propPeers, - now); - - // In a bit of a hack, we re-use this packet to repeat it - // to our multicast propagation recipients. Afterwords we - // return true just to be sure this is the end of this - // packet's life cycle, since it is now mangled. - - setSource(_r->identity.address()); - (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops; - memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES); - compress(); - - for(unsigned int i=0;i %s",originalSubmitterAddress.toString().c_str(),upstream.toString().c_str(),propPeers[i]->address().toString().c_str()); - // Re-use this packet to re-send multicast frame to everyone - // downstream from us. - newInitializationVector(); - setDestination(propPeers[i]->address()); - _r->sw->send(*this,true); - } - - // Return here just to be safe, since this packet's state is no - // longer valid. - return true; - } else { - //TRACE("terminating MULTICAST_FRAME propagation from %s(%s): max depth reached",source().toString().c_str(),_remoteAddress.toString().c_str()); - } - } else { - LOG("dropped MULTICAST_FRAME from original sender %s: rate limit overrun",originalSubmitter->address().toString().c_str()); - } + if (!isDuplicate) { + if (network->multicastRateGate(originalSubmitterAddress,datalen)) { + network->tap().put(fromMac,mg.mac(),etherType,dataAndSignature,datalen); } else { - TRACE("rejected MULTICAST_FRAME forwarded by %s(%s): failed signature check (falsely claims origin %s)",source().toString().c_str(),_remoteAddress.toString().c_str(),originalSubmitterAddress.toString().c_str()); + TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): sender rate limit exceeded",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str()); + return true; } - } else { - TRACE("dropped duplicate MULTICAST_FRAME from %s(%s)",source().toString().c_str(),_remoteAddress.toString().c_str()); + + /* It's important that we do this *after* rate limit checking, + * otherwise supernodes could be used to execute a flood by + * first bouncing a multicast off a supernode and then flooding + * it with retransmits. */ + _r->multicaster->addToDedupHistory(mccrc,now); } + + Address upstream(source()); // save this since we might mangle it below + Multicaster::MulticastBloomFilter bloom(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES)); + SharedPtr propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; + unsigned int np = 0; + + if (_r->topology->amSupernode()) { + /* Supernodes behave differently here from ordinary nodes, as their + * role in the network is to bridge gaps between unconnected islands + * in a multicast propagation graph. Instead of using the ordinary + * multicast peer picker, supernodes propagate to random unvisited + * peers. They will also repeatedly propagate duplicate multicasts to + * new peers, while regular nodes simply discard them. This allows + * such gaps to be bridged more than once by ping-ponging off the + * same supernode -- a simple way to implement this without requiring + * that supernodes maintain a lot of state at the cost of a small + * amount of bandwidth. */ + np = _r->multicaster->pickRandomPropagationPeers( + *(_r->prng), + *(_r->topology), + network->id(), + mg, + originalSubmitterAddress, + upstream, + bloom, + ZT_MULTICAST_PROPAGATION_BREADTH, + propPeers, + now); + } else if (isDuplicate) { + TRACE("dropped MULTICAST_FRAME from original submitter %s, received from %s(%s): duplicate",originalSubmitterAddress.toString().c_str(),source().toString().c_str(),_remoteAddress.toString().c_str()); + return true; + } else { + /* Regular peers only propagate non-duplicate packets, and do so + * according to ordinary propagation priority rules. */ + np = _r->multicaster->pickSocialPropagationPeers( + *(_r->prng), + *(_r->topology), + network->id(), + mg, + originalSubmitterAddress, + upstream, + bloom, + ZT_MULTICAST_PROPAGATION_BREADTH, + propPeers, + now); + } + + /* Re-use *this* packet to repeat it to our propagation + * recipients, which invalidates its current contents and + * state. */ + + if (np) { + setSource(_r->identity.address()); + (*this)[ZT_PROTO_VERB_MULTICAST_FRAME_IDX_HOP_COUNT] = hops; + memcpy(field(ZT_PROTO_VERB_MULTICAST_FRAME_IDX_BLOOM_FILTER,ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES),bloom.data(),ZT_PROTO_VERB_MULTICAST_FRAME_BLOOM_FILTER_SIZE_BYTES); + compress(); + for(unsigned int i=0;iaddress()); + _r->sw->send(*this,true); + } + } + + /* Just to be safe, return true here to terminate processing as we + * have thoroughly destroyed our state by doing the above. */ + return true; } else { TRACE("dropped MULTICAST_FRAME from %s(%s): network %.16llx unknown or sender not allowed",source().toString().c_str(),_remoteAddress.toString().c_str(),(unsigned long long)network->id()); } diff --git a/node/Switch.cpp b/node/Switch.cpp index 04984f48d..aed37307e 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -108,7 +108,7 @@ void Switch::onLocalEthernet(const SharedPtr &network,const MAC &from,c Multicaster::MulticastBloomFilter bloom; SharedPtr propPeers[ZT_MULTICAST_PROPAGATION_BREADTH]; - unsigned int np = _r->multicaster->pickNextPropagationPeers( + unsigned int np = _r->multicaster->pickSocialPropagationPeers( *(_r->prng), *(_r->topology), network->id(),