mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-17 22:58:22 +00:00
Added support for Redis
This commit is contained in:
@ -188,7 +188,7 @@ void NetconEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType
|
||||
return;
|
||||
}
|
||||
memcpy(q->payload,ðhdr,sizeof(ethhdr));
|
||||
memcpy(q->payload + sizeof(ethhdr),dataptr,q->len - sizeof(ethhdr));
|
||||
memcpy((char*)q->payload + sizeof(ethhdr),dataptr,q->len - sizeof(ethhdr));
|
||||
dataptr += q->len - sizeof(ethhdr);
|
||||
|
||||
// Remaining pbufs (if any) get rest of data
|
||||
@ -266,13 +266,13 @@ TcpConnection *NetconEthernetTap::getConnectionByTheirFD(PhySocket *sock, int fd
|
||||
*/
|
||||
void NetconEthernetTap::closeConnection(TcpConnection *conn)
|
||||
{
|
||||
//lwipstack->_tcp_arg(conn->pcb, NULL);
|
||||
//lwipstack->_tcp_sent(conn->pcb, NULL);
|
||||
//lwipstack->_tcp_recv(conn->pcb, NULL);
|
||||
//lwipstack->_tcp_err(conn->pcb, NULL);
|
||||
//lwipstack->_tcp_poll(conn->pcb, NULL, 0);
|
||||
//lwipstack->_tcp_close(conn->pcb);
|
||||
//close(conn->their_fd);
|
||||
fprintf(stderr, "closeConnection(%x, %d)\n", conn->pcb, _phy.getDescriptor(conn->dataSock));
|
||||
lwipstack->_tcp_arg(conn->pcb, NULL);
|
||||
lwipstack->_tcp_sent(conn->pcb, NULL);
|
||||
lwipstack->_tcp_recv(conn->pcb, NULL);
|
||||
lwipstack->_tcp_err(conn->pcb, NULL);
|
||||
lwipstack->_tcp_poll(conn->pcb, NULL, 0);
|
||||
lwipstack->_tcp_close(conn->pcb);
|
||||
if(conn->dataSock) {
|
||||
close(_phy.getDescriptor(conn->dataSock));
|
||||
_phy.close(conn->dataSock,false);
|
||||
@ -318,6 +318,7 @@ void NetconEthernetTap::closeAll()
|
||||
void NetconEthernetTap::threadMain()
|
||||
throw()
|
||||
{
|
||||
//signal(SIGPIPE, SIG_IGN);
|
||||
uint64_t prev_tcp_time = 0;
|
||||
uint64_t prev_status_time = 0;
|
||||
uint64_t prev_etharp_time = 0;
|
||||
@ -351,7 +352,7 @@ void NetconEthernetTap::threadMain()
|
||||
uint64_t etharp_remaining = ARP_TMR_INTERVAL;
|
||||
uint64_t status_remaining = STATUS_TMR_INTERVAL;
|
||||
|
||||
if (since_status >= STATUS_TMR_INTERVAL) {
|
||||
if (since_status >= STATUS_TMR_INTERVAL && true == false) {
|
||||
prev_status_time = now;
|
||||
if(rpc_sockets.size() || tcp_connections.size()) {
|
||||
/* Here we will periodically check the list of rpc_sockets for those that
|
||||
@ -383,12 +384,13 @@ void NetconEthernetTap::threadMain()
|
||||
// No TCP connections are associated, this is a candidate for removal
|
||||
unsigned char tmpbuf[BUF_SZ];
|
||||
if(read(_phy.getDescriptor(rpc_sockets[i]),&tmpbuf,BUF_SZ) < 0) {
|
||||
fprintf(stderr, "run() ---> RPC close(%d)\n", _phy.getDescriptor(rpc_sockets[i]));
|
||||
closeClient(rpc_sockets[i]);
|
||||
}
|
||||
else {
|
||||
// Handle RPC call, this is rare
|
||||
fprintf(stderr, "run(): RPC read during connection check\n");
|
||||
exit(0);
|
||||
exit(0); // FIXME: This should be addressed - Raise APPLICATION_POLL_FREQ to make it less likely
|
||||
phyOnUnixData(rpc_sockets[i],NULL,&tmpbuf,BUF_SZ);
|
||||
}
|
||||
}
|
||||
@ -412,6 +414,14 @@ void NetconEthernetTap::threadMain()
|
||||
dlclose(lwipstack->_libref);
|
||||
}
|
||||
|
||||
// Unused -- no UDP or TCP from this thread/Phy<>
|
||||
void NetconEthernetTap::phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) {}
|
||||
void NetconEthernetTap::phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) {}
|
||||
void NetconEthernetTap::phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) {}
|
||||
void NetconEthernetTap::phyOnTcpClose(PhySocket *sock,void **uptr) {}
|
||||
void NetconEthernetTap::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) {}
|
||||
void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {}
|
||||
|
||||
void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr)
|
||||
{
|
||||
// FIXME: What do?
|
||||
@ -437,14 +447,6 @@ void NetconEthernetTap::phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,
|
||||
}
|
||||
}
|
||||
|
||||
// Unused -- no UDP or TCP from this thread/Phy<>
|
||||
void NetconEthernetTap::phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len) {}
|
||||
void NetconEthernetTap::phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) {}
|
||||
void NetconEthernetTap::phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) {}
|
||||
void NetconEthernetTap::phyOnTcpClose(PhySocket *sock,void **uptr) {}
|
||||
void NetconEthernetTap::phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) {}
|
||||
void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {}
|
||||
|
||||
/*
|
||||
* Add a new PhySocket for the client connection
|
||||
*/
|
||||
@ -478,17 +480,14 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
|
||||
memcpy(&bind_rpc, &buf[1], sizeof(struct bind_st));
|
||||
handle_bind(sock, uptr, &bind_rpc);
|
||||
break;
|
||||
case RPC_KILL_INTERCEPT:
|
||||
fprintf(stderr, "RPC_KILL_INTERCEPT\n");
|
||||
break;
|
||||
case RPC_CONNECT:
|
||||
fprintf(stderr, "RPC_CONNECT\n");
|
||||
struct connect_st connect_rpc;
|
||||
memcpy(&connect_rpc, &buf[1], sizeof(struct connect_st));
|
||||
handle_connect(sock, uptr, &connect_rpc);
|
||||
break;
|
||||
case RPC_FD_MAP_COMPLETION:
|
||||
fprintf(stderr, "RPC_FD_MAP_COMPLETION\n");
|
||||
case RPC_MAP:
|
||||
fprintf(stderr, "RPC_MAP\n");
|
||||
handle_retval(sock, uptr, buf);
|
||||
break;
|
||||
default:
|
||||
@ -596,8 +595,9 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err)
|
||||
new_tcp_conn->pcb = newpcb;
|
||||
new_tcp_conn->their_fd = fds[1];
|
||||
tap->tcp_connections.push_back(new_tcp_conn);
|
||||
|
||||
fprintf(stderr, "socketpair = {%d, %d}\n", fds[0], fds[1]);
|
||||
int send_fd = tap->_phy.getDescriptor(conn->rpcSock);
|
||||
fprintf(stderr, "write(%d,...)\n", larg_fd);
|
||||
int n = write(larg_fd, "z", 1); // accept() in library waits for this byte
|
||||
if(n > 0) {
|
||||
if(sock_fd_write(send_fd, fds[1]) > 0) {
|
||||
@ -854,11 +854,39 @@ err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err
|
||||
void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, unsigned char* buf)
|
||||
{
|
||||
TcpConnection *conn = (TcpConnection*)*uptr;
|
||||
if(conn->pending) {
|
||||
memcpy(&(conn->perceived_fd), &buf[1], sizeof(int));
|
||||
//fprintf(stderr, "handle_retval(): Mapping [our=%d -> their=%d]\n",
|
||||
//_phy.getDescriptor(conn->dataSock), conn->perceived_fd);
|
||||
conn->pending = false;
|
||||
if(!conn->pending)
|
||||
return;
|
||||
|
||||
// Copy data from buffer to TcpConnection object, update status
|
||||
memcpy(&(conn->perceived_fd), &buf[1], sizeof(int));
|
||||
conn->pending = false;
|
||||
|
||||
fprintf(stderr, "handle_retval(): CONN:%x - Mapping [our=%d -> their=%d]\n",conn,
|
||||
_phy.getDescriptor(conn->dataSock), conn->perceived_fd);
|
||||
|
||||
/* Check for pre-existing connection for this socket ---
|
||||
This block is in response to interesting behaviour from redis-server. A
|
||||
socket is created, setsockopt is called and the socket is set to IPV6 but fails (for now),
|
||||
then it is closed and re-opened and consequently remapped. With two pipes mapped
|
||||
to the same socket, makes it possible that we write to the wrong pipe and fail. So
|
||||
this block merely searches for a possible duplicate mapping and erases it
|
||||
*/
|
||||
for(size_t i=0; i<tcp_connections.size(); i++) {
|
||||
if(tcp_connections[i] == conn)
|
||||
continue;
|
||||
if(tcp_connections[i]->rpcSock == conn->rpcSock) {
|
||||
if(tcp_connections[i]->perceived_fd == conn->perceived_fd) {
|
||||
int n;
|
||||
if((n = send(_phy.getDescriptor(tcp_connections[i]->dataSock), "z", 1, MSG_NOSIGNAL)) < 0) {
|
||||
fprintf(stderr, "handle_retval(): CONN:%x - Socket (%d) already mapped (originally CONN:%x)\n", conn, tcp_connections[i]->perceived_fd, tcp_connections[i]);
|
||||
closeConnection(tcp_connections[i]);
|
||||
}
|
||||
else {
|
||||
fprintf(stderr, "handle_retval(): CONN:%x - This socket is mapped to two different pipes (?). Exiting.\n", conn);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -916,6 +944,7 @@ void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st
|
||||
d[2] = (ip >> 16) & 0xFF;
|
||||
d[3] = (ip >> 24) & 0xFF;
|
||||
fprintf(stderr, "handle_bind(): error binding to %d.%d.%d.%d : %d\n", d[0],d[1],d[2],d[3], conn_port);
|
||||
fprintf(stderr, "err = %d\n", err);
|
||||
if(err == ERR_USE)
|
||||
send_return_value(conn, -1, EADDRINUSE);
|
||||
if(err == ERR_MEM)
|
||||
@ -927,7 +956,7 @@ void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st
|
||||
send_return_value(conn, ERR_OK, ERR_OK); // Success
|
||||
}
|
||||
else {
|
||||
fprintf(stderr, "handle_bind(): PCB not in CLOSED state. Ignoring BIND request.\n");
|
||||
fprintf(stderr, "handle_bind(): PCB (%x) not in CLOSED state. Ignoring BIND request.\n", conn->pcb);
|
||||
send_return_value(conn, -1, EINVAL);
|
||||
}
|
||||
}
|
||||
@ -1037,6 +1066,9 @@ void NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, struct socke
|
||||
{
|
||||
int rpc_fd = _phy.getDescriptor(sock);
|
||||
struct tcp_pcb *newpcb = lwipstack->tcp_new();
|
||||
|
||||
fprintf(stderr, "handle_socket(): pcb=%x, (state == CLOSED) = %d\n", newpcb, (newpcb->state==CLOSED));
|
||||
|
||||
if(newpcb != NULL) {
|
||||
ZT_PHY_SOCKFD_TYPE fds[2];
|
||||
if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) {
|
||||
@ -1045,6 +1077,7 @@ void NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, struct socke
|
||||
return;
|
||||
}
|
||||
}
|
||||
fprintf(stderr, "socketpair = {%d, %d}\n", fds[0], fds[1]);
|
||||
TcpConnection *new_conn = new TcpConnection();
|
||||
new_conn->dataSock = _phy.wrapSocket(fds[0], new_conn);
|
||||
*uptr = new_conn;
|
||||
|
Reference in New Issue
Block a user