mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-02-22 10:20:52 +00:00
Revert a change to LinuxEthernetTap threading to eliminate out of order packet issues on some systems.
This commit is contained in:
parent
7b0d11b187
commit
0461b24db3
@ -185,16 +185,13 @@ LinuxEthernetTap::LinuxEthernetTap(
|
|||||||
|
|
||||||
(void)::pipe(_shutdownSignalPipe);
|
(void)::pipe(_shutdownSignalPipe);
|
||||||
|
|
||||||
_thread_init_l.lock();
|
_tapReaderThread = std::thread([this]{
|
||||||
for(unsigned int t=0;t<2;++t) {
|
uint8_t b[ZT_TAP_BUF_SIZE];
|
||||||
_tapReaderThread[t] = std::thread([this, t]{
|
|
||||||
fd_set readfds,nullfds;
|
fd_set readfds,nullfds;
|
||||||
int n,nfds,r;
|
int n,nfds,r;
|
||||||
void *buf = nullptr;
|
|
||||||
std::vector<void *> buffers;
|
std::vector<void *> buffers;
|
||||||
|
|
||||||
if (t == 0) {
|
|
||||||
struct ifreq ifr;
|
struct ifreq ifr;
|
||||||
|
|
||||||
memset(&ifr,0,sizeof(ifr));
|
memset(&ifr,0,sizeof(ifr));
|
||||||
strcpy(ifr.ifr_name,_dev.c_str());
|
strcpy(ifr.ifr_name,_dev.c_str());
|
||||||
|
|
||||||
@ -240,12 +237,6 @@ LinuxEthernetTap::LinuxEthernetTap(
|
|||||||
|
|
||||||
::close(sock);
|
::close(sock);
|
||||||
|
|
||||||
_thread_init_l.unlock();
|
|
||||||
} else {
|
|
||||||
_thread_init_l.lock();
|
|
||||||
_thread_init_l.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_run)
|
if (!_run)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -259,31 +250,12 @@ LinuxEthernetTap::LinuxEthernetTap(
|
|||||||
FD_SET(_fd,&readfds);
|
FD_SET(_fd,&readfds);
|
||||||
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
|
select(nfds,&readfds,&nullfds,&nullfds,(struct timeval *)0);
|
||||||
|
|
||||||
if (FD_ISSET(_shutdownSignalPipe[0],&readfds)) // writes to shutdown pipe terminate thread
|
if (FD_ISSET(_shutdownSignalPipe[0],&readfds))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
if (FD_ISSET(_fd,&readfds)) {
|
if (FD_ISSET(_fd,&readfds)) {
|
||||||
for(;;) { // read until there are no more packets, then return to outer select() loop
|
for(;;) { // read until there are no more packets, then return to outer select() loop
|
||||||
if (!buf) {
|
n = (int)::read(_fd,b + r,ZT_TAP_BUF_SIZE - r);
|
||||||
// To reduce use of the mutex, we keep a local buffer vector and
|
|
||||||
// swap (which is a pointer swap) with the global one when it's
|
|
||||||
// empty. This retrieves a batch of buffers to use.
|
|
||||||
if (buffers.empty()) {
|
|
||||||
std::lock_guard<std::mutex> l(_buffers_l);
|
|
||||||
buffers.swap(_buffers);
|
|
||||||
}
|
|
||||||
if (buffers.empty()) {
|
|
||||||
buf = malloc(ZT_TAP_BUF_SIZE);
|
|
||||||
if (!buf)
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
buf = buffers.back();
|
|
||||||
buffers.pop_back();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n = (int)::read(_fd,reinterpret_cast<uint8_t *>(buf) + r,ZT_TAP_BUF_SIZE - r);
|
|
||||||
|
|
||||||
if (n > 0) {
|
if (n > 0) {
|
||||||
// Some tap drivers like to send the ethernet frame and the
|
// Some tap drivers like to send the ethernet frame and the
|
||||||
// payload in two chunks, so handle that by accumulating
|
// payload in two chunks, so handle that by accumulating
|
||||||
@ -294,8 +266,11 @@ LinuxEthernetTap::LinuxEthernetTap(
|
|||||||
r = _mtu + 14;
|
r = _mtu + 14;
|
||||||
|
|
||||||
if (_enabled) {
|
if (_enabled) {
|
||||||
_tapq.post(std::pair<void *,int>(buf,r));
|
//_tapq.post(std::pair<void *,int>(buf,r));
|
||||||
buf = nullptr;
|
//buf = nullptr;
|
||||||
|
MAC to(b, 6),from(b + 6, 6);
|
||||||
|
unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]);
|
||||||
|
_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(r - 14));
|
||||||
}
|
}
|
||||||
|
|
||||||
r = 0;
|
r = 0;
|
||||||
@ -308,51 +283,16 @@ LinuxEthernetTap::LinuxEthernetTap(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
_tapProcessorThread = std::thread([this] {
|
|
||||||
MAC to,from;
|
|
||||||
std::pair<void *,int> qi;
|
|
||||||
while (_tapq.get(qi)) {
|
|
||||||
uint8_t *const b = reinterpret_cast<uint8_t *>(qi.first);
|
|
||||||
if (b) {
|
|
||||||
to.setTo(b, 6);
|
|
||||||
from.setTo(b + 6, 6);
|
|
||||||
unsigned int etherType = Utils::ntoh(((const uint16_t *)b)[6]);
|
|
||||||
_handler(_arg, nullptr, _nwid, from, to, etherType, 0, (const void *)(b + 14),(unsigned int)(qi.second - 14));
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> l(_buffers_l);
|
|
||||||
if (_buffers.size() < 128)
|
|
||||||
_buffers.push_back(qi.first);
|
|
||||||
else free(qi.first);
|
|
||||||
}
|
|
||||||
} else break;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LinuxEthernetTap::~LinuxEthernetTap()
|
LinuxEthernetTap::~LinuxEthernetTap()
|
||||||
{
|
{
|
||||||
_run = false;
|
_run = false;
|
||||||
|
(void)::write(_shutdownSignalPipe[1],"\0",1);
|
||||||
(void)::write(_shutdownSignalPipe[1],"\0",1); // causes reader thread(s) to exit
|
_tapReaderThread.join();
|
||||||
_tapq.post(std::pair<void *,int>(nullptr,0)); // causes processor thread to exit
|
|
||||||
|
|
||||||
_tapReaderThread[0].join();
|
|
||||||
_tapReaderThread[1].join();
|
|
||||||
_tapProcessorThread.join();
|
|
||||||
|
|
||||||
::close(_fd);
|
::close(_fd);
|
||||||
::close(_shutdownSignalPipe[0]);
|
::close(_shutdownSignalPipe[0]);
|
||||||
::close(_shutdownSignalPipe[1]);
|
::close(_shutdownSignalPipe[1]);
|
||||||
|
|
||||||
for(std::vector<void *>::iterator i(_buffers.begin());i!=_buffers.end();++i)
|
|
||||||
free(*i);
|
|
||||||
std::vector< std::pair<void *,int> > dv(_tapq.drain());
|
|
||||||
for(std::vector< std::pair<void *,int> >::iterator i(dv.begin());i!=dv.end();++i) {
|
|
||||||
if (i->first)
|
|
||||||
free(i->first);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void LinuxEthernetTap::setEnabled(bool en)
|
void LinuxEthernetTap::setEnabled(bool en)
|
||||||
|
@ -26,7 +26,6 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include "../node/MulticastGroup.hpp"
|
#include "../node/MulticastGroup.hpp"
|
||||||
#include "EthernetTap.hpp"
|
#include "EthernetTap.hpp"
|
||||||
#include "BlockingQueue.hpp"
|
|
||||||
|
|
||||||
namespace ZeroTier {
|
namespace ZeroTier {
|
||||||
|
|
||||||
@ -71,12 +70,7 @@ private:
|
|||||||
int _shutdownSignalPipe[2];
|
int _shutdownSignalPipe[2];
|
||||||
std::atomic_bool _enabled;
|
std::atomic_bool _enabled;
|
||||||
std::atomic_bool _run;
|
std::atomic_bool _run;
|
||||||
std::thread _tapReaderThread[2];
|
std::thread _tapReaderThread;
|
||||||
std::thread _tapProcessorThread;
|
|
||||||
std::mutex _buffers_l;
|
|
||||||
std::mutex _thread_init_l;
|
|
||||||
std::vector<void *> _buffers;
|
|
||||||
BlockingQueue< std::pair<void *,int> > _tapq;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace ZeroTier
|
} // namespace ZeroTier
|
||||||
|
Loading…
x
Reference in New Issue
Block a user