Oops... turns out we need to differentiate incoming from outgoing TCP and indeed learn incoming TCP paths. Otherwise the recipient of a TCP connection does not know to reply via TCP! Heh.

This commit is contained in:
Adam Ierymenko 2014-03-31 22:23:55 -07:00
parent 595b386afc
commit f13493edb2
11 changed files with 147 additions and 110 deletions

View File

@ -579,7 +579,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
source().toString().c_str(), source().toString().c_str(),
frameLen, frameLen,
startingFifoItems); startingFifoItems);
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
// At this point the frame is basically valid, so we can call it a receive // At this point the frame is basically valid, so we can call it a receive
@ -602,7 +602,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
mctdepth, mctdepth,
(_r->topology->amSupernode() ? 'S' : '-'), (_r->topology->amSupernode() ? 'S' : '-'),
_r->identity.address().toString().c_str()); _r->identity.address().toString().c_str());
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
TRACE("dropped MULTICAST_FRAME from %s(%s): duplicate",source().toString().c_str(),_remoteAddress.toString().c_str()); TRACE("dropped MULTICAST_FRAME from %s(%s): duplicate",source().toString().c_str(),_remoteAddress.toString().c_str());
return true; return true;
@ -648,7 +648,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
mctdepth, mctdepth,
(_r->topology->amSupernode() ? 'S' : '-'), (_r->topology->amSupernode() ? 'S' : '-'),
_r->identity.address().toString().c_str()); _r->identity.address().toString().c_str());
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
TRACE("dropped MULTICAST_FRAME from %s(%s) into %.16llx: source mac %s doesn't belong to %s, and bridging is not supported on network",source().toString().c_str(),_remoteAddress.toString().c_str(),nwid,sourceMac.toString().c_str(),origin.toString().c_str()); TRACE("dropped MULTICAST_FRAME from %s(%s) into %.16llx: source mac %s doesn't belong to %s, and bridging is not supported on network",source().toString().c_str(),_remoteAddress.toString().c_str(),nwid,sourceMac.toString().c_str(),origin.toString().c_str());
return true; return true;
@ -664,7 +664,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
mctdepth, mctdepth,
(_r->topology->amSupernode() ? 'S' : '-'), (_r->topology->amSupernode() ? 'S' : '-'),
_r->identity.address().toString().c_str()); _r->identity.address().toString().c_str());
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
TRACE("dropped MULTICAST_FRAME from %s(%s) into %.16llx: ethertype %u is not allowed",source().toString().c_str(),nwid,_remoteAddress.toString().c_str(),etherType); TRACE("dropped MULTICAST_FRAME from %s(%s) into %.16llx: ethertype %u is not allowed",source().toString().c_str(),nwid,_remoteAddress.toString().c_str(),etherType);
return true; return true;
@ -681,7 +681,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
mctdepth, mctdepth,
(_r->topology->amSupernode() ? 'S' : '-'), (_r->topology->amSupernode() ? 'S' : '-'),
_r->identity.address().toString().c_str()); _r->identity.address().toString().c_str());
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
TRACE("dropped MULTICAST_FRAME from %s(%s): rate limits exceeded for sender %s",source().toString().c_str(),_remoteAddress.toString().c_str(),origin.toString().c_str()); TRACE("dropped MULTICAST_FRAME from %s(%s): rate limits exceeded for sender %s",source().toString().c_str(),_remoteAddress.toString().c_str(),origin.toString().c_str());
return true; return true;
@ -702,7 +702,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
mctdepth, mctdepth,
(_r->topology->amSupernode() ? 'S' : '-'), (_r->topology->amSupernode() ? 'S' : '-'),
_r->identity.address().toString().c_str()); _r->identity.address().toString().c_str());
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
TRACE("not forwarding MULTICAST_FRAME from %s(%s): depth == 0xffff (do not forward)",source().toString().c_str(),_remoteAddress.toString().c_str()); TRACE("not forwarding MULTICAST_FRAME from %s(%s): depth == 0xffff (do not forward)",source().toString().c_str(),_remoteAddress.toString().c_str());
return true; return true;
@ -717,7 +717,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
mctdepth, mctdepth,
(_r->topology->amSupernode() ? 'S' : '-'), (_r->topology->amSupernode() ? 'S' : '-'),
_r->identity.address().toString().c_str()); _r->identity.address().toString().c_str());
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
TRACE("not forwarding MULTICAST_FRAME from %s(%s): max propagation depth reached",source().toString().c_str(),_remoteAddress.toString().c_str()); TRACE("not forwarding MULTICAST_FRAME from %s(%s): max propagation depth reached",source().toString().c_str(),_remoteAddress.toString().c_str());
return true; return true;
@ -776,7 +776,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
mctdepth, mctdepth,
(_r->topology->amSupernode() ? 'S' : '-'), (_r->topology->amSupernode() ? 'S' : '-'),
_r->identity.address().toString().c_str()); _r->identity.address().toString().c_str());
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
//TRACE("not forwarding MULTICAST_FRAME from %s(%s): no next hop",source().toString().c_str(),_remoteAddress.toString().c_str()); //TRACE("not forwarding MULTICAST_FRAME from %s(%s): no next hop",source().toString().c_str(),_remoteAddress.toString().c_str());
return true; return true;
@ -797,7 +797,7 @@ bool PacketDecoder::_doMULTICAST_FRAME(const RuntimeEnvironment *_r,const Shared
origin.toString().c_str(), origin.toString().c_str(),
nextHop.toString().c_str(), nextHop.toString().c_str(),
numAdded); numAdded);
_r->sm->send(ZT_DEFAULTS.multicastTraceWatcher,false,mct,strlen(mct)); _r->sm->sendUdp(ZT_DEFAULTS.multicastTraceWatcher,mct,strlen(mct));
#endif #endif
// Send to next hop, reusing this packet as scratch space // Send to next hop, reusing this packet as scratch space

View File

@ -39,7 +39,7 @@
#include "Utils.hpp" #include "Utils.hpp"
#include "Buffer.hpp" #include "Buffer.hpp"
#define ZT_PATH_SERIALIZATION_VERSION 1 #define ZT_PATH_SERIALIZATION_VERSION 2
namespace ZeroTier { namespace ZeroTier {
@ -49,13 +49,21 @@ namespace ZeroTier {
class Path class Path
{ {
public: public:
enum Type
{
PATH_TYPE_NULL = 0,
PATH_TYPE_UDP = 1,
PATH_TYPE_TCP_OUT = 2,
PATH_TYPE_TCP_IN = 3
};
Path() : Path() :
_lastSend(0), _lastSend(0),
_lastReceived(0), _lastReceived(0),
_lastFirewallOpener(0), _lastFirewallOpener(0),
_lastPing(0), _lastPing(0),
_addr(), _addr(),
_tcp(false), _type(PATH_TYPE_NULL),
_fixed(false) {} _fixed(false) {}
Path(const Path &p) Path(const Path &p)
@ -64,13 +72,13 @@ public:
memcpy(this,&p,sizeof(Path)); memcpy(this,&p,sizeof(Path));
} }
Path(const InetAddress &addr,bool tcp,bool fixed = false) : Path(const InetAddress &addr,Type t,bool fixed = false) :
_lastSend(0), _lastSend(0),
_lastReceived(0), _lastReceived(0),
_lastFirewallOpener(0), _lastFirewallOpener(0),
_lastPing(0), _lastPing(0),
_addr(addr), _addr(addr),
_tcp(tcp), _type(t),
_fixed(fixed) {} _fixed(fixed) {}
inline Path &operator=(const Path &p) inline Path &operator=(const Path &p)
@ -81,13 +89,16 @@ public:
} }
inline const InetAddress &address() const throw() { return _addr; } inline const InetAddress &address() const throw() { return _addr; }
inline bool tcp() const throw() { return _tcp; }
inline Type type() const throw() { return _type; }
inline bool tcp() const throw() { return ((_type == PATH_TYPE_TCP_IN)||(_type == PATH_TYPE_TCP_OUT)); }
inline uint64_t lastSend() const throw() { return _lastSend; } inline uint64_t lastSend() const throw() { return _lastSend; }
inline uint64_t lastReceived() const throw() { return _lastReceived; } inline uint64_t lastReceived() const throw() { return _lastReceived; }
inline uint64_t lastFirewallOpener() const throw() { return _lastFirewallOpener; } inline uint64_t lastFirewallOpener() const throw() { return _lastFirewallOpener; }
inline uint64_t lastPing() const throw() { return _lastPing; } inline uint64_t lastPing() const throw() { return _lastPing; }
inline bool fixed() const throw() { return _fixed; }
inline bool fixed() const throw() { return _fixed; }
inline void setFixed(bool f) throw() { _fixed = f; } inline void setFixed(bool f) throw() { _fixed = f; }
inline void sent(uint64_t t) throw() { _lastSend = t; } inline void sent(uint64_t t) throw() { _lastSend = t; }
@ -111,40 +122,34 @@ public:
inline std::string toString() const inline std::string toString() const
{ {
uint64_t now = Utils::now(); uint64_t now = Utils::now();
char lsago[32],lrago[32],lfoago[32],lpago[32]; char tmp[1024];
Utils::snprintf(lsago,sizeof(lsago),"%lld",(long long)((_lastSend != 0) ? (now - _lastSend) : -1)); const char *t = "";
Utils::snprintf(lrago,sizeof(lrago),"%lld",(long long)((_lastReceived != 0) ? (now - _lastReceived) : -1)); switch(_type) {
Utils::snprintf(lfoago,sizeof(lfoago),"%lld",(long long)((_lastFirewallOpener != 0) ? (now - _lastFirewallOpener) : -1)); case PATH_TYPE_NULL: t = "null"; break;
Utils::snprintf(lpago,sizeof(lfoago),"%lld",(long long)((_lastPing != 0) ? (now - _lastPing) : -1)); case PATH_TYPE_UDP: t = "udp"; break;
return ( _addr.toString() + case PATH_TYPE_TCP_OUT: t = "tcp_out"; break;
"[" + case PATH_TYPE_TCP_IN: t = "tcp_in"; break;
(_tcp ? "tcp" : "udp") + }
";" + Utils::snprintf(tmp,sizeof(tmp),"%s:%s:%lld;%lld;%lld;%lld;%s",
lsago + t,
";" + _addr.toString().c_str(),
lrago + (long long)((_lastSend != 0) ? (now - _lastSend) : -1),
";" + (long long)((_lastReceived != 0) ? (now - _lastReceived) : -1),
lpago + (long long)((_lastFirewallOpener != 0) ? (now - _lastFirewallOpener) : -1),
";" + (long long)((_lastPing != 0) ? (now - _lastPing) : -1),
lfoago + ((_fixed) ? "fixed" : (active(now) ? "active" : "inactive"))
";" + );
(active(now) ? "active" : "inactive") + return std::string(tmp);
";" +
(_fixed ? "fixed" : "learned") +
"]"
);
} }
inline bool operator==(const Path &p) const throw() { return ((_addr == p._addr)&&(_tcp == p._tcp)); } inline bool operator==(const Path &p) const throw() { return ((_addr == p._addr)&&(_type == p._type)); }
inline bool operator!=(const Path &p) const throw() { return ((_addr != p._addr)||(_tcp != p._tcp)); } inline bool operator!=(const Path &p) const throw() { return ((_addr != p._addr)||(_type != p._type)); }
inline bool operator<(const Path &p) const inline bool operator<(const Path &p) const
throw() throw()
{ {
if (_addr == p._addr) { if (_addr == p._addr)
if (!_tcp) // UDP < TCP return ((int)_type < (int)p._type);
return p._tcp; else return (_addr < p._addr);
return false;
} else return (_addr < p._addr);
} }
inline bool operator>(const Path &p) const throw() { return (p < *this); } inline bool operator>(const Path &p) const throw() { return (p < *this); }
inline bool operator<=(const Path &p) const throw() { return !(p < *this); } inline bool operator<=(const Path &p) const throw() { return !(p < *this); }
@ -171,7 +176,7 @@ public:
b.append((uint16_t)_addr.port()); b.append((uint16_t)_addr.port());
break; break;
} }
b.append(_tcp ? (unsigned char)1 : (unsigned char)0); b.append((unsigned char)_type);
b.append(_fixed ? (unsigned char)1 : (unsigned char)0); b.append(_fixed ? (unsigned char)1 : (unsigned char)0);
} }
template<unsigned int C> template<unsigned int C>
@ -199,7 +204,7 @@ public:
_addr.zero(); _addr.zero();
break; break;
} }
_tcp = (b[p++] != 0); _type = (Type)b[p++];
_fixed = (b[p++] != 0); _fixed = (b[p++] != 0);
return (p - startAt); return (p - startAt);
@ -211,7 +216,7 @@ private:
volatile uint64_t _lastFirewallOpener; volatile uint64_t _lastFirewallOpener;
volatile uint64_t _lastPing; volatile uint64_t _lastPing;
InetAddress _addr; InetAddress _addr;
bool _tcp; Type _type;
bool _fixed; bool _fixed;
}; };

View File

@ -69,25 +69,36 @@ void Peer::receive(
Packet::Verb inReVerb, Packet::Verb inReVerb,
uint64_t now) uint64_t now)
{ {
// Update system-wide last packet receive time
*((const_cast<uint64_t *>(&(_r->timeOfLastPacketReceived)))) = now; *((const_cast<uint64_t *>(&(_r->timeOfLastPacketReceived)))) = now;
if (!hops) { // direct packet // Learn paths from direct packets (hops == 0)
if (!hops) {
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
// Update receive time on known paths
bool havePath = false; bool havePath = false;
for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) { for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) {
if ((p->address() == remoteAddr)&&(p->tcp() == (fromSock->type() == Socket::ZT_SOCKET_TYPE_TCP))) { if ((p->address() == remoteAddr)&&(p->tcp() == fromSock->tcp())) {
p->received(now); p->received(now);
havePath = true; havePath = true;
break; break;
} }
} }
// Learn new UDP paths (learning TCP would require an explicit mechanism) if (!havePath) {
if ((!havePath)&&(fromSock->type() != Socket::ZT_SOCKET_TYPE_TCP)) { Path::Type pt = Path::PATH_TYPE_UDP;
_paths.push_back(Path(remoteAddr,false,false)); switch(fromSock->type()) {
case Socket::ZT_SOCKET_TYPE_TCP_IN:
pt = Path::PATH_TYPE_TCP_IN;
break;
case Socket::ZT_SOCKET_TYPE_TCP_OUT:
pt = Path::PATH_TYPE_TCP_OUT;
break;
default:
break;
}
_paths.push_back(Path(remoteAddr,pt,false));
_paths.back().received(now); _paths.back().received(now);
} }
} }
@ -110,26 +121,30 @@ bool Peer::send(const RuntimeEnvironment *_r,const void *data,unsigned int len,u
{ {
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
std::vector<Path>::iterator p(_paths.begin()); for(;;) {
if (p == _paths.end()) { std::vector<Path>::iterator p(_paths.begin());
//TRACE("send to %s failed: no paths available",_id.address().toString().c_str()); if (p == _paths.end())
return false; return false;
}
uint64_t bestPathLastReceived = p->lastReceived(); uint64_t bestPathLastReceived = p->lastReceived();
std::vector<Path>::iterator bestPath = p; std::vector<Path>::iterator bestPath = p;
while (++p != _paths.end()) { while (++p != _paths.end()) {
uint64_t lr = p->lastReceived(); uint64_t lr = p->lastReceived();
if (lr > bestPathLastReceived) { if (lr > bestPathLastReceived) {
bestPathLastReceived = lr; bestPathLastReceived = lr;
bestPath = p; bestPath = p;
}
} }
}
//TRACE("send to %s: using path: %s",_id.address().toString().c_str(),bestPath->toString().c_str()); if (_r->sm->send(bestPath->address(),bestPath->tcp(),bestPath->type() == Path::PATH_TYPE_TCP_OUT,data,len)) {
bestPath->sent(now);
if (_r->sm->send(bestPath->address(),bestPath->tcp(),data,len)) { return true;
bestPath->sent(now); } else {
return true; if (bestPath->fixed())
return false;
_paths.erase(bestPath);
// ... try again and pick a different path
}
} }
return false; return false;
@ -160,23 +175,24 @@ bool Peer::sendPing(const RuntimeEnvironment *_r,uint64_t now,bool firstSinceRes
bool pingTcp; bool pingTcp;
if (!firstSinceReset) { if (!firstSinceReset) {
// Do not use TCP if one of our UDP endpoints has answered recently. uint64_t lastUdp = 0;
uint64_t lastTcp = 0;
uint64_t lastPing = 0; uint64_t lastPing = 0;
uint64_t lastDirectReceive = 0;
for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) { for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) {
lastPing = std::max(lastPing,p->lastPing()); if (p->tcp())
lastDirectReceive = std::max(lastDirectReceive,p->lastReceived()); lastTcp = std::max(p->lastReceived(),lastTcp);
else lastUdp = std::max(p->lastReceived(),lastUdp);
lastPing = std::max(p->lastPing(),lastPing);
} }
uint64_t lastAny = std::max(lastUdp,lastTcp);
pingTcp = ( (lastDirectReceive < lastPing) && ((lastPing - lastDirectReceive) >= ZT_PING_UNANSWERED_AFTER) ); pingTcp = ( ( (lastAny < lastPing) && ((lastPing - lastAny) >= ZT_PING_UNANSWERED_AFTER) ) || (lastTcp > lastUdp) );
} else pingTcp = false; } else pingTcp = false;
TRACE("PING %s (pingTcp==%d)",_id.address().toString().c_str(),(int)pingTcp); TRACE("PING %s (pingTcp==%d)",_id.address().toString().c_str(),(int)pingTcp);
for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) { for(std::vector<Path>::iterator p(_paths.begin());p!=_paths.end();++p) {
if ((pingTcp)||(!p->tcp())) { if ((pingTcp)||(!p->tcp())) {
if (_r->sw->sendHELLO(self,p->address(),p->tcp())) { if (_r->sw->sendHELLO(self,*p)) {
p->sent(now); p->sent(now);
p->pinged(now); p->pinged(now);
sent = true; sent = true;

View File

@ -65,7 +65,8 @@ public:
{ {
ZT_SOCKET_TYPE_UDP_V4, ZT_SOCKET_TYPE_UDP_V4,
ZT_SOCKET_TYPE_UDP_V6, ZT_SOCKET_TYPE_UDP_V6,
ZT_SOCKET_TYPE_TCP ZT_SOCKET_TYPE_TCP_IN, // incoming connection, not listen
ZT_SOCKET_TYPE_TCP_OUT
}; };
virtual ~Socket() {} virtual ~Socket() {}
@ -79,6 +80,15 @@ public:
return _type; return _type;
} }
/**
* @return True if this is a TCP socket
*/
inline bool tcp() const
throw()
{
return ((_type == ZT_SOCKET_TYPE_TCP_IN)||(_type == ZT_SOCKET_TYPE_TCP_OUT));
}
/** /**
* Send a ZeroTier message packet * Send a ZeroTier message packet
* *

View File

@ -347,7 +347,7 @@ SocketManager::~SocketManager()
_closeSockets(); _closeSockets();
} }
bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned int msglen) bool SocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen)
{ {
if (tcp) { if (tcp) {
SharedPtr<Socket> ts; SharedPtr<Socket> ts;
@ -360,6 +360,9 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned
if (ts) if (ts)
return ts->send(to,msg,msglen); return ts->send(to,msg,msglen);
if (!autoConnectTcp)
return false;
#ifdef __WINDOWS__ #ifdef __WINDOWS__
SOCKET s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0); SOCKET s = ::socket(to.isV4() ? AF_INET : AF_INET6,SOCK_STREAM,0);
if (s == INVALID_SOCKET) if (s == INVALID_SOCKET)
@ -394,7 +397,7 @@ bool SocketManager::send(const InetAddress &to,bool tcp,const void *msg,unsigned
} else connecting = true; } else connecting = true;
} }
ts = SharedPtr<Socket>(new TcpSocket(this,s,connecting,to)); ts = SharedPtr<Socket>(new TcpSocket(this,s,Socket::ZT_SOCKET_TYPE_TCP_OUT,connecting,to));
if (!ts->send(to,msg,msglen)) { if (!ts->send(to,msg,msglen)) {
_fdSetLock.lock(); _fdSetLock.lock();
FD_CLR(s,&_readfds); FD_CLR(s,&_readfds);
@ -496,7 +499,7 @@ void SocketManager::poll(unsigned long timeout)
InetAddress fromia((const struct sockaddr *)&from); InetAddress fromia((const struct sockaddr *)&from);
Mutex::Lock _l2(_tcpSockets_m); Mutex::Lock _l2(_tcpSockets_m);
try { try {
_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,Socket::ZT_SOCKET_TYPE_TCP_IN,false,fromia));
#ifdef __WINDOWS__ #ifdef __WINDOWS__
{ u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); }
#ifdef ZT_TCP_NODELAY #ifdef ZT_TCP_NODELAY
@ -536,7 +539,7 @@ void SocketManager::poll(unsigned long timeout)
InetAddress fromia((const struct sockaddr *)&from); InetAddress fromia((const struct sockaddr *)&from);
Mutex::Lock _l2(_tcpSockets_m); Mutex::Lock _l2(_tcpSockets_m);
try { try {
_tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,false,fromia)); _tcpSockets[fromia] = SharedPtr<Socket>(new TcpSocket(this,sockfd,Socket::ZT_SOCKET_TYPE_TCP_IN,false,fromia));
#ifdef __WINDOWS__ #ifdef __WINDOWS__
{ u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); } { u_long iMode=1; ioctlsocket(sockfd,FIONBIO,&iMode); }
#ifdef ZT_TCP_NODELAY #ifdef ZT_TCP_NODELAY

View File

@ -85,17 +85,22 @@ public:
/** /**
* Send a message to a remote peer * Send a message to a remote peer
* *
* If 'tcp' is true an existing TCP socket will be used or an attempt will
* be made to connect if one is not available. The message will be placed
* in the connecting TCP socket's outgoing queue, so if the connection
* succeeds the message will be sent. Otherwise it will be dropped.
*
* @param to Destination address * @param to Destination address
* @param tcp Use TCP? * @param tcp Use TCP?
* @param autoConnectTcp If true, automatically initiate TCP connection if there is none
* @param msg Message to send * @param msg Message to send
* @param msglen Length of message * @param msglen Length of message
*/ */
bool send(const InetAddress &to,bool tcp,const void *msg,unsigned int msglen); bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen);
/**
* Send a message to a remote peer via UDP (shortcut for setting both TCP params to false in send)
*
* @param to Destination address
* @param msg Message to send
* @param msglen Length of message
*/
inline bool sendUdp(const InetAddress &to,const void *msg,unsigned int msglen) { return send(to,false,false,msg,msglen); }
/** /**
* Send a UDP packet with a limited IP TTL * Send a UDP packet with a limited IP TTL

View File

@ -214,7 +214,7 @@ void Switch::sendHELLO(const Address &dest)
send(outp,false); send(outp,false);
} }
bool Switch::sendHELLO(const SharedPtr<Socket> &fromSock,const SharedPtr<Peer> &dest,const InetAddress &remoteAddr) bool Switch::sendHELLO(const SharedPtr<Peer> &dest,const Path &path)
{ {
uint64_t now = Utils::now(); uint64_t now = Utils::now();
Packet outp(dest->address(),_r->identity.address(),Packet::VERB_HELLO); Packet outp(dest->address(),_r->identity.address(),Packet::VERB_HELLO);
@ -225,10 +225,10 @@ bool Switch::sendHELLO(const SharedPtr<Socket> &fromSock,const SharedPtr<Peer> &
outp.append(now); outp.append(now);
_r->identity.serialize(outp,false); _r->identity.serialize(outp,false);
outp.armor(dest->key(),false); outp.armor(dest->key(),false);
return fromSock->send(remoteAddr,outp.data(),outp.size()); return _r->sm->send(path.address(),path.tcp(),path.type() == Path::PATH_TYPE_TCP_OUT,outp.data(),outp.size());
} }
bool Switch::sendHELLO(const SharedPtr<Peer> &dest,const InetAddress &remoteAddr,bool tcp) bool Switch::sendHELLO(const SharedPtr<Peer> &dest,const InetAddress &destUdp)
{ {
uint64_t now = Utils::now(); uint64_t now = Utils::now();
Packet outp(dest->address(),_r->identity.address(),Packet::VERB_HELLO); Packet outp(dest->address(),_r->identity.address(),Packet::VERB_HELLO);
@ -239,7 +239,7 @@ bool Switch::sendHELLO(const SharedPtr<Peer> &dest,const InetAddress &remoteAddr
outp.append(now); outp.append(now);
_r->identity.serialize(outp,false); _r->identity.serialize(outp,false);
outp.armor(dest->key(),false); outp.armor(dest->key(),false);
return _r->sm->send(remoteAddr,tcp,outp.data(),outp.size()); return _r->sm->send(deskUdp,false,false,outp.data(),outp.size());
} }
bool Switch::unite(const Address &p1,const Address &p2,bool force) bool Switch::unite(const Address &p1,const Address &p2,bool force)
@ -354,7 +354,7 @@ unsigned long Switch::doTimerTasks()
for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) { for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) {
if (now >= qi->fireAtTime) { if (now >= qi->fireAtTime) {
TRACE("sending NAT-T HELLO to %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str()); TRACE("sending NAT-T HELLO to %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str());
sendHELLO(qi->peer,qi->inaddr,false); sendHELLO(qi->peer,qi->inaddr);
_contactQueue.erase(qi++); _contactQueue.erase(qi++);
} else { } else {
nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now)); nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now));

View File

@ -122,22 +122,20 @@ public:
/** /**
* Send a HELLO announcement immediately to the indicated address * Send a HELLO announcement immediately to the indicated address
* *
* @param fromSock Send from this local socket
* @param dest Destination peer * @param dest Destination peer
* @param remoteAddr Remote address * @param path Network path to peer
* @return True if send appears successful * @return True if send appears successful
*/ */
bool sendHELLO(const SharedPtr<Socket> &fromSock,const SharedPtr<Peer> &dest,const InetAddress &remoteAddr); bool sendHELLO(const SharedPtr<Peer> &dest,const Path &path);
/** /**
* Send a HELLO announcement immediately to the indicated address * Send a HELLO announcement immediately to the indicated address via UDP
* *
* @param dest Destination peer * @param dest Destination peer
* @param remoteAddr Remote address * @param destUdp UDP inet address
* @param tcp Attempt to use TCP?
* @return True if send appears successful * @return True if send appears successful
*/ */
bool sendHELLO(const SharedPtr<Peer> &dest,const InetAddress &remoteAddr,bool tcp); bool sendHELLO(const SharedPtr<Peer> &dest,const InetAddress &destUdp);
/** /**
* Send RENDEZVOUS to two peers to permit them to directly connect * Send RENDEZVOUS to two peers to permit them to directly connect

View File

@ -72,7 +72,6 @@ bool TcpSocket::send(const InetAddress &to,const void *msg,unsigned int msglen)
return true; // sanity check return true; // sanity check
Mutex::Lock _l(_writeLock); Mutex::Lock _l(_writeLock);
bool writeInProgress = ((_outptr != 0)||(_connecting)); bool writeInProgress = ((_outptr != 0)||(_connecting));
// Ensure that _outbuf is large enough // Ensure that _outbuf is large enough
@ -144,9 +143,10 @@ bool TcpSocket::notifyAvailableForRead(const SharedPtr<Socket> &self,SocketManag
if ((pl)&&(p >= pl)) { if ((pl)&&(p >= pl)) {
Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5); Buffer<ZT_SOCKET_MAX_MESSAGE_LEN> data(_inbuf + 5,pl - 5);
sm->handleReceivedPacket(self,_remote,data); memmove(_inbuf,_inbuf + pl,p -= pl);
memmove(_inbuf,_inbuf + pl,p - pl); try {
p -= pl; sm->handleReceivedPacket(self,_remote,data);
} catch ( ... ) {} // handlers should not throw
pl = 0; pl = 0;
} }
} }

View File

@ -71,11 +71,11 @@ public:
protected: protected:
#ifdef __WINDOWS__ #ifdef __WINDOWS__
TcpSocket(SocketManager *sm,SOCKET s,bool c,const InetAddress &r) : TcpSocket(SocketManager *sm,SOCKET s,Socket::Type t,bool c,const InetAddress &r) :
#else #else
TcpSocket(SocketManager *sm,int s,bool c,const InetAddress &r) : TcpSocket(SocketManager *sm,int s,Socket::Type t,bool c,const InetAddress &r) :
#endif #endif
Socket(Socket::ZT_SOCKET_TYPE_TCP,s), Socket(t,s),
_lastActivity(Utils::now()), _lastActivity(Utils::now()),
_sm(sm), _sm(sm),
_outbuf((unsigned char *)0), _outbuf((unsigned char *)0),

View File

@ -65,7 +65,7 @@ void Topology::setSupernodes(const std::map< Identity,std::vector< std::pair<Ine
if (!p) if (!p)
p = addPeer(SharedPtr<Peer>(new Peer(_r->identity,i->first))); p = addPeer(SharedPtr<Peer>(new Peer(_r->identity,i->first)));
for(std::vector< std::pair<InetAddress,bool> >::const_iterator j(i->second.begin());j!=i->second.end();++j) for(std::vector< std::pair<InetAddress,bool> >::const_iterator j(i->second.begin());j!=i->second.end();++j)
p->addPath(Path(j->first,j->second,true)); p->addPath(Path(j->first,(j->second) ? Path::PATH_TYPE_TCP_OUT : Path::PATH_TYPE_UDP,true));
p->use(now); p->use(now);
_supernodePeers.push_back(p); _supernodePeers.push_back(p);
} }