TCP tunneling implementation -- not tested yet and no initiation yet.

This commit is contained in:
Adam Ierymenko 2015-04-28 12:43:10 -07:00
parent 7dcde7503a
commit a9a3705877

View File

@ -113,17 +113,20 @@ static const struct http_parser_settings HTTP_PARSER_SETTINGS = {
ShttpOnMessageComplete ShttpOnMessageComplete
}; };
struct HttpConnection struct TcpConnection
{ {
bool server; enum {
bool writing; TCP_HTTP_INCOMING,
TCP_HTTP_OUTGOING, // not currently used
TCP_TUNNEL_OUTGOING // fale-SSL outgoing tunnel -- HTTP-related fields are not used
} type;
bool shouldKeepAlive; bool shouldKeepAlive;
OneServiceImpl *parent; OneServiceImpl *parent;
PhySocket *sock; PhySocket *sock;
InetAddress from; InetAddress from;
http_parser parser; http_parser parser;
unsigned long messageSize; unsigned long messageSize;
unsigned long writePtr;
uint64_t lastActivity; uint64_t lastActivity;
std::string currentHeaderField; std::string currentHeaderField;
@ -132,7 +135,9 @@ struct HttpConnection
std::string url; std::string url;
std::string status; std::string status;
std::map< std::string,std::string > headers; std::map< std::string,std::string > headers;
std::string body; // also doubles as send queue for writes out to the socket std::string body;
std::string writeBuf;
}; };
class OneServiceImpl : public OneService class OneServiceImpl : public OneService
@ -281,8 +286,8 @@ public:
} }
try { try {
while (!_httpConnections.empty()) while (!_tcpConections.empty())
_phy.close(_httpConnections.begin()->first); _phy.close(_tcpConections.begin()->first);
} catch ( ... ) {} } catch ( ... ) {}
{ {
@ -336,13 +341,13 @@ public:
ZT1_ResultCode rc = _node->processWirePacket( ZT1_ResultCode rc = _node->processWirePacket(
OSUtils::now(), OSUtils::now(),
(const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big
0, 0, // desperation == 0, direct UDP
data, data,
len, len,
&_nextBackgroundTaskDeadline); &_nextBackgroundTaskDeadline);
if (ZT1_ResultCode_isFatal(rc)) { if (ZT1_ResultCode_isFatal(rc)) {
char tmp[256]; char tmp[256];
Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket(%d)",(int)rc); Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m); Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR; _termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp; _fatalErrorMessage = tmp;
@ -352,65 +357,162 @@ public:
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)
{ {
// TODO: outgoing HTTP connection success/failure if (!success)
return;
// Outgoing connections are right now only tunnel connections
TcpConnection *tc = &(_tcpConections[sock]);
tc->type = TcpConnection::TCP_TUNNEL_OUTGOING;
tc->shouldKeepAlive = true; // unused
tc->parent = this;
tc->sock = sock;
// from and parser are not used
tc->messageSize = 0; // unused
tc->lastActivity = OSUtils::now();
// HTTP stuff is not used
tc->writeBuf = "";
*uptr = (void *)tc;
// Send "hello" message
tc->writeBuf.push_back((char)0x17);
tc->writeBuf.push_back((char)0x03);
tc->writeBuf.push_back((char)0x03); // fake TLS 1.2 header
tc->writeBuf.push_back((char)0x00);
tc->writeBuf.push_back((char)0x04); // mlen == 4
tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MAJOR);
tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MINOR);
tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff));
tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff));
_phy.tcpSetNotifyWritable(sock,true);
} }
inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from)
{ {
HttpConnection *htc = &(_httpConnections[sockN]); // Incoming connections are TCP HTTP requests
htc->server = true; TcpConnection *tc = &(_tcpConections[sockN]);
htc->writing = false; tc->type = TcpConnection::TCP_HTTP_INCOMING;
htc->shouldKeepAlive = true; tc->shouldKeepAlive = true;
htc->parent = this; tc->parent = this;
htc->sock = sockN; tc->sock = sockN;
htc->from = from; tc->from = from;
http_parser_init(&(htc->parser),HTTP_REQUEST); http_parser_init(&(tc->parser),HTTP_REQUEST);
htc->parser.data = (void *)htc; tc->parser.data = (void *)tc;
htc->messageSize = 0; tc->messageSize = 0;
htc->writePtr = 0; tc->lastActivity = OSUtils::now();
htc->lastActivity = OSUtils::now(); tc->currentHeaderField = "";
htc->currentHeaderField = ""; tc->currentHeaderValue = "";
htc->currentHeaderValue = ""; tc->url = "";
htc->url = ""; tc->status = "";
htc->status = ""; tc->headers.clear();
htc->headers.clear(); tc->body = "";
htc->body = ""; tc->writeBuf = "";
*uptrN = (void *)htc; *uptrN = (void *)tc;
} }
inline void phyOnTcpClose(PhySocket *sock,void **uptr) inline void phyOnTcpClose(PhySocket *sock,void **uptr)
{ {
_httpConnections.erase(sock); _tcpConections.erase(sock);
} }
inline void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) inline void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(*uptr); TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
http_parser_execute(&(htc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len); switch(tc->type) {
if ((htc->parser.upgrade)||(htc->parser.http_errno != HPE_OK)) case TcpConnection::TCP_HTTP_INCOMING:
_phy.close(sock); case TcpConnection::TCP_HTTP_OUTGOING:
http_parser_execute(&(tc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len);
if ((tc->parser.upgrade)||(tc->parser.http_errno != HPE_OK)) {
_phy.close(sock);
return;
}
break;
case TcpConnection::TCP_TUNNEL_OUTGOING:
tc->body.append((const char *)data,len);
if (tc->body.length() > 65535) {
// sanity limit -- a message will never be this big since mlen is 16-bit
_phy.close(sock);
return;
} else if (tc->body.length() >= 5) {
const char *data = tc->body.data();
const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) );
if (tc->body.length() >= (mlen + 5)) {
InetAddress from;
unsigned long plen = mlen; // payload length, modified if there's an IP header
data += 5;
if (mlen == 4) {
// Hello message, which isn't sent by proxy and would be ignored by client
} else if (mlen) {
// Messages should contain IPv4 or IPv6 source IP address data
switch(data[0]) {
case 4: // IPv4
if (plen >= 7) {
from.set((const void *)(data + 1),4,((((unsigned int)data[5]) & 0xff) << 8) | (((unsigned int)data[6]) & 0xff));
data += 7; // type + 4 byte IP + 2 byte port
plen -= 7;
}
break;
case 6: // IPv6
if (plen >= 19) {
from.set((const void *)(data + 1),16,((((unsigned int)data[17]) & 0xff) << 8) | (((unsigned int)data[18]) & 0xff));
data += 19; // type + 16 byte IP + 2 byte port
plen -= 19;
}
break;
case 0: // none/omitted
break;
default: // invalid
_phy.close(sock);
return;
}
if (!from) { // missing IP header
_phy.close(sock);
return;
}
}
ZT1_ResultCode rc = _node->processWirePacket(
OSUtils::now(),
(const struct sockaddr_storage *)&from, // Phy<> uses sockaddr_storage, so it'll always be that big
1, // desperation == 1, TCP tunnel proxy
data,
plen,
&_nextBackgroundTaskDeadline);
if (ZT1_ResultCode_isFatal(rc)) {
char tmp[256];
Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
Mutex::Lock _l(_termReason_m);
_termReason = ONE_UNRECOVERABLE_ERROR;
_fatalErrorMessage = tmp;
this->terminate();
_phy.close(sock);
return;
}
if (tc->body.length() > (mlen + 5))
tc->body = tc->body.substr(mlen + 5);
else tc->body = "";
}
}
break;
}
} }
inline void phyOnTcpWritable(PhySocket *sock,void **uptr) inline void phyOnTcpWritable(PhySocket *sock,void **uptr)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(*uptr); TcpConnection *tc = reinterpret_cast<TcpConnection *>(*uptr);
long sent = _phy.tcpSend(sock,htc->body.data() + htc->writePtr,(unsigned long)htc->body.length() - htc->writePtr,true); if (tc->writeBuf.length()) {
if (sent < 0) { long sent = _phy.tcpSend(sock,tc->writeBuf.data(),tc->writeBuf.length(),true);
return; // close handler will have been called, so everything's dead if (sent > 0) {
} else { tc->lastActivity = OSUtils::now();
htc->lastActivity = OSUtils::now(); if (sent == tc->writeBuf.length()) {
htc->writePtr += sent; tc->writeBuf = "";
if (htc->writePtr >= htc->body.length()) { _phy.tcpSetNotifyWritable(sock,false);
_phy.tcpSetNotifyWritable(sock,false); if (!tc->shouldKeepAlive)
if (htc->shouldKeepAlive) { _phy.close(sock); // will call close handler to delete from _tcpConections
htc->writing = false; } else tc->writeBuf = tc->writeBuf.substr(sent);
htc->writePtr = 0;
htc->body = "";
} else {
_phy.close(sock); // will call close handler to delete from _httpConnections
}
} }
} } else _phy.tcpSetNotifyWritable(sock,false); // sanity check... shouldn't happen
} }
inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc)
@ -586,7 +688,7 @@ public:
_node->processVirtualNetworkFrame(OSUtils::now(),nwid,from.toInt(),to.toInt(),etherType,vlanId,data,len,&_nextBackgroundTaskDeadline); _node->processVirtualNetworkFrame(OSUtils::now(),nwid,from.toInt(),to.toInt(),etherType,vlanId,data,len,&_nextBackgroundTaskDeadline);
} }
inline void onHttpRequestToServer(HttpConnection *htc) inline void onHttpRequestToServer(TcpConnection *tc)
{ {
char tmpn[256]; char tmpn[256];
std::string data; std::string data;
@ -595,7 +697,7 @@ public:
try { try {
if (_controlPlane) if (_controlPlane)
scode = _controlPlane->handleRequest(htc->from,htc->parser.method,htc->url,htc->headers,htc->body,data,contentType); scode = _controlPlane->handleRequest(tc->from,tc->parser.method,tc->url,tc->headers,tc->body,data,contentType);
else scode = 500; else scode = 500;
} catch ( ... ) { } catch ( ... ) {
scode = 500; scode = 500;
@ -615,26 +717,24 @@ public:
} }
Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr); Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr);
htc->body.assign(tmpn); tc->writeBuf.assign(tmpn);
htc->body.append("Content-Type: "); tc->writeBuf.append("Content-Type: ");
htc->body.append(contentType); tc->writeBuf.append(contentType);
Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length());
htc->body.append(tmpn); tc->writeBuf.append(tmpn);
if (!htc->shouldKeepAlive) if (!tc->shouldKeepAlive)
htc->body.append("Connection: close\r\n"); tc->writeBuf.append("Connection: close\r\n");
htc->body.append("\r\n"); tc->writeBuf.append("\r\n");
if (htc->parser.method != HTTP_HEAD) if (tc->parser.method != HTTP_HEAD)
htc->body.append(data); tc->writeBuf.append(data);
htc->writing = true; _phy.tcpSetNotifyWritable(tc->sock,true);
htc->writePtr = 0;
_phy.tcpSetNotifyWritable(htc->sock,true);
} }
inline void onHttpResponseFromClient(HttpConnection *htc) inline void onHttpResponseFromClient(TcpConnection *tc)
{ {
if (!htc->shouldKeepAlive) if (!tc->shouldKeepAlive)
_phy.close(htc->sock); // will call close handler, which deletes from _httpConnections _phy.close(tc->sock); // will call close handler, which deletes from _tcpConections
} }
private: private:
@ -671,7 +771,7 @@ private:
std::map< uint64_t,std::vector<InetAddress> > _tapAssignedIps; // ZeroTier assigned IPs, not user or dhcp assigned std::map< uint64_t,std::vector<InetAddress> > _tapAssignedIps; // ZeroTier assigned IPs, not user or dhcp assigned
Mutex _taps_m; Mutex _taps_m;
std::map< PhySocket *,HttpConnection > _httpConnections; // no mutex for this since it's done in the main loop thread only std::map< PhySocket *,TcpConnection > _tcpConections; // no mutex for this since it's done in the main loop thread only
ReasonForTermination _termReason; ReasonForTermination _termReason;
std::string _fatalErrorMessage; std::string _fatalErrorMessage;
@ -699,83 +799,83 @@ static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC
static int ShttpOnMessageBegin(http_parser *parser) static int ShttpOnMessageBegin(http_parser *parser)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
htc->currentHeaderField = ""; tc->currentHeaderField = "";
htc->currentHeaderValue = ""; tc->currentHeaderValue = "";
htc->messageSize = 0; tc->messageSize = 0;
htc->url = ""; tc->url = "";
htc->status = ""; tc->status = "";
htc->headers.clear(); tc->headers.clear();
htc->body = ""; tc->body = "";
return 0; return 0;
} }
static int ShttpOnUrl(http_parser *parser,const char *ptr,size_t length) static int ShttpOnUrl(http_parser *parser,const char *ptr,size_t length)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
htc->messageSize += (unsigned long)length; tc->messageSize += (unsigned long)length;
if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
return -1; return -1;
htc->url.append(ptr,length); tc->url.append(ptr,length);
return 0; return 0;
} }
static int ShttpOnStatus(http_parser *parser,const char *ptr,size_t length) static int ShttpOnStatus(http_parser *parser,const char *ptr,size_t length)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
htc->messageSize += (unsigned long)length; tc->messageSize += (unsigned long)length;
if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
return -1; return -1;
htc->status.append(ptr,length); tc->status.append(ptr,length);
return 0; return 0;
} }
static int ShttpOnHeaderField(http_parser *parser,const char *ptr,size_t length) static int ShttpOnHeaderField(http_parser *parser,const char *ptr,size_t length)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
htc->messageSize += (unsigned long)length; tc->messageSize += (unsigned long)length;
if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
return -1; return -1;
if ((htc->currentHeaderField.length())&&(htc->currentHeaderValue.length())) { if ((tc->currentHeaderField.length())&&(tc->currentHeaderValue.length())) {
htc->headers[htc->currentHeaderField] = htc->currentHeaderValue; tc->headers[tc->currentHeaderField] = tc->currentHeaderValue;
htc->currentHeaderField = ""; tc->currentHeaderField = "";
htc->currentHeaderValue = ""; tc->currentHeaderValue = "";
} }
for(size_t i=0;i<length;++i) for(size_t i=0;i<length;++i)
htc->currentHeaderField.push_back(OSUtils::toLower(ptr[i])); tc->currentHeaderField.push_back(OSUtils::toLower(ptr[i]));
return 0; return 0;
} }
static int ShttpOnValue(http_parser *parser,const char *ptr,size_t length) static int ShttpOnValue(http_parser *parser,const char *ptr,size_t length)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
htc->messageSize += (unsigned long)length; tc->messageSize += (unsigned long)length;
if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
return -1; return -1;
htc->currentHeaderValue.append(ptr,length); tc->currentHeaderValue.append(ptr,length);
return 0; return 0;
} }
static int ShttpOnHeadersComplete(http_parser *parser) static int ShttpOnHeadersComplete(http_parser *parser)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
if ((htc->currentHeaderField.length())&&(htc->currentHeaderValue.length())) if ((tc->currentHeaderField.length())&&(tc->currentHeaderValue.length()))
htc->headers[htc->currentHeaderField] = htc->currentHeaderValue; tc->headers[tc->currentHeaderField] = tc->currentHeaderValue;
return 0; return 0;
} }
static int ShttpOnBody(http_parser *parser,const char *ptr,size_t length) static int ShttpOnBody(http_parser *parser,const char *ptr,size_t length)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
htc->messageSize += (unsigned long)length; tc->messageSize += (unsigned long)length;
if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE)
return -1; return -1;
htc->body.append(ptr,length); tc->body.append(ptr,length);
return 0; return 0;
} }
static int ShttpOnMessageComplete(http_parser *parser) static int ShttpOnMessageComplete(http_parser *parser)
{ {
HttpConnection *htc = reinterpret_cast<HttpConnection *>(parser->data); TcpConnection *tc = reinterpret_cast<TcpConnection *>(parser->data);
htc->shouldKeepAlive = (http_should_keep_alive(parser) != 0); tc->shouldKeepAlive = (http_should_keep_alive(parser) != 0);
htc->lastActivity = OSUtils::now(); tc->lastActivity = OSUtils::now();
if (htc->server) { if (tc->type == TcpConnection::TCP_HTTP_INCOMING) {
htc->parent->onHttpRequestToServer(htc); tc->parent->onHttpRequestToServer(tc);
} else { } else {
htc->parent->onHttpResponseFromClient(htc); tc->parent->onHttpResponseFromClient(tc);
} }
return 0; return 0;
} }