Fix thinko in multicast changes... works now!

This commit is contained in:
Adam Ierymenko 2014-11-21 11:27:53 -08:00
parent 7619b0ecbd
commit 959f611a65

View File

@ -178,127 +178,127 @@ void Multicaster::send(
indexes[j] = indexes[i];
indexes[i] = tmp;
}
}
if (gs.members.size() >= limit) {
// If we already have enough members, just send and we're done. We can
// skip the TX queue and skip the overhead of maintaining a send log by
// using sendOnly().
OutboundMulticast out;
if (gs.members.size() >= limit) {
// If we already have enough members, just send and we're done. We can
// skip the TX queue and skip the overhead of maintaining a send log by
// using sendOnly().
OutboundMulticast out;
out.init(
RR,
now,
nwid,
com,
limit,
2, // we'll still gather a little from peers to keep multicast list fresh
src,
mg,
etherType,
data,
len);
out.init(
RR,
now,
nwid,
com,
limit,
2, // we'll still gather a little from peers to keep multicast list fresh
src,
mg,
etherType,
data,
len);
unsigned int count = 0;
unsigned int count = 0;
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(*ast));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
out.sendOnly(RR,*ast);
if (++count >= limit)
break;
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(*ast));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
unsigned long idx = 0;
while (count < limit) {
const MulticastGroupMember &m = gs.members[indexes[idx++]];
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(m.address));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) {
out.sendOnly(RR,m.address);
++count;
}
}
} else {
unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) {
gs.lastExplicitGather = now;
SharedPtr<Peer> sn(RR->topology->getBestSupernode());
if (sn) {
TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
outp.append(nwid);
outp.append((uint8_t)0);
mg.mac().appendTo(outp);
outp.append((uint32_t)mg.adi());
outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available
outp.armor(sn->key(),true);
sn->send(RR,outp.data(),outp.size(),now);
}
gatherLimit = 0; // don't need to gather from peers this time since we consulted the core
}
gs.txQueue.push_back(OutboundMulticast());
OutboundMulticast &out = gs.txQueue.back();
out.init(
RR,
now,
nwid,
com,
limit,
gatherLimit,
src,
mg,
etherType,
data,
len);
unsigned int count = 0;
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(*ast));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
out.sendAndLog(RR,*ast);
if (++count >= limit)
break;
}
unsigned long idx = 0;
while ((count < limit)&&(idx < gs.members.size())) {
const MulticastGroupMember &m = gs.members[indexes[idx++]];
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(m.address));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) {
out.sendAndLog(RR,m.address);
++count;
}
}
out.sendOnly(RR,*ast);
if (++count >= limit)
break;
}
if (indexes != idxbuf)
delete [] indexes;
unsigned long idx = 0;
while (count < limit) {
const MulticastGroupMember &m = gs.members[indexes[idx++]];
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(m.address));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) {
out.sendOnly(RR,m.address);
++count;
}
}
} else {
unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) {
gs.lastExplicitGather = now;
SharedPtr<Peer> sn(RR->topology->getBestSupernode());
if (sn) {
TRACE(">>MC GATHER up to %u in %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
Packet outp(sn->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
outp.append(nwid);
outp.append((uint8_t)0);
mg.mac().appendTo(outp);
outp.append((uint32_t)mg.adi());
outp.append((uint32_t)gatherLimit); // +1 just means we'll have an extra in the queue if available
outp.armor(sn->key(),true);
sn->send(RR,outp.data(),outp.size(),now);
}
gatherLimit = 0; // don't need to gather from peers this time since we consulted the core
}
gs.txQueue.push_back(OutboundMulticast());
OutboundMulticast &out = gs.txQueue.back();
out.init(
RR,
now,
nwid,
com,
limit,
gatherLimit,
src,
mg,
etherType,
data,
len);
unsigned int count = 0;
for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(*ast));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
out.sendAndLog(RR,*ast);
if (++count >= limit)
break;
}
unsigned long idx = 0;
while ((count < limit)&&(idx < gs.members.size())) {
const MulticastGroupMember &m = gs.members[indexes[idx++]];
{ // TODO / LEGACY: don't send new multicast frame to old peers (if we know their version)
SharedPtr<Peer> p(RR->topology->getPeer(m.address));
if ((p)&&(p->remoteVersionKnown())&&(p->remoteVersionMajor() < 1))
continue;
}
if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),m.address) == alwaysSendTo.end()) {
out.sendAndLog(RR,m.address);
++count;
}
}
}
if (indexes != idxbuf)
delete [] indexes;
// DEPRECATED / LEGACY / TODO:
// Currently we also always send a legacy P5_MULTICAST_FRAME packet to our
// supernode. Our supernode then takes care of relaying it down to <1.0.0