Improved RPC connection closure logic

This commit is contained in:
Joseph Henry 2015-10-23 13:37:41 -07:00
parent 236e474553
commit b48ed824e6
4 changed files with 47 additions and 23 deletions

View File

@ -663,7 +663,7 @@ int accept(ACCEPT_SIG)
} }
//if(opt & O_NONBLOCK) //if(opt & O_NONBLOCK)
fcntl(sockfd, F_SETFL, O_NONBLOCK); //fcntl(sockfd, F_SETFL, O_NONBLOCK);
char rbuf[16], c[1]; char rbuf[16], c[1];
int new_conn_socket; int new_conn_socket;

View File

@ -267,13 +267,13 @@ TcpConnection *NetconEthernetTap::getConnectionByTheirFD(PhySocket *sock, int fd
*/ */
void NetconEthernetTap::closeConnection(TcpConnection *conn) void NetconEthernetTap::closeConnection(TcpConnection *conn)
{ {
lwipstack->_tcp_arg(conn->pcb, NULL); //lwipstack->_tcp_arg(conn->pcb, NULL);
lwipstack->_tcp_sent(conn->pcb, NULL); //lwipstack->_tcp_sent(conn->pcb, NULL);
lwipstack->_tcp_recv(conn->pcb, NULL); //lwipstack->_tcp_recv(conn->pcb, NULL);
lwipstack->_tcp_err(conn->pcb, NULL); //lwipstack->_tcp_err(conn->pcb, NULL);
lwipstack->_tcp_poll(conn->pcb, NULL, 0); //lwipstack->_tcp_poll(conn->pcb, NULL, 0);
lwipstack->_tcp_close(conn->pcb); //lwipstack->_tcp_close(conn->pcb);
close(conn->their_fd); //close(conn->their_fd);
if(conn->dataSock) { if(conn->dataSock) {
close(_phy.getDescriptor(conn->dataSock)); close(_phy.getDescriptor(conn->dataSock));
_phy.close(conn->dataSock,false); _phy.close(conn->dataSock,false);
@ -359,20 +359,38 @@ void NetconEthernetTap::threadMain()
do not currently have any data connection associated with them. If they are do not currently have any data connection associated with them. If they are
unused, then we will try to read from them, if they fail, we can safely assume unused, then we will try to read from them, if they fail, we can safely assume
that the client has closed their end and we can close ours */ that the client has closed their end and we can close ours */
for(size_t i=0, associated = 0; i<rpc_sockets.size(); i++, associated = 0) { for(size_t i = 0; i<tcp_connections.size(); i++) {
for(size_t j=0; j<tcp_connections.size(); j++) { if(tcp_connections[i]->listening) {
if(tcp_connections[j]->rpcSock == rpc_sockets[i]) char c;
associated++; if (read(_phy.getDescriptor(tcp_connections[i]->dataSock), &c, 1) < 0) {
} // Still in listening state
if(!associated){ }
// No TCP connections are associated, this is a candidate for removal else {
char c; // Here we should handle the case there there is incoming data (?)
if(read(_phy.getDescriptor(rpc_sockets[i]),&c,1) < 0) { fprintf(stderr, "Listening socketpair closed. Removing RPC connection (%d)\n",
closeClient(rpc_sockets[i]); _phy.getDescriptor(tcp_connections[i]->dataSock));
} closeConnection(tcp_connections[i]);
}
}
}
}
fprintf(stderr, "tcp_conns = %d, rpc_socks = %d\n", tcp_connections.size(), rpc_sockets.size());
for(size_t i=0, associated = 0; i<rpc_sockets.size(); i++, associated = 0) {
for(size_t j=0; j<tcp_connections.size(); j++) {
if (tcp_connections[j]->rpcSock == rpc_sockets[i])
associated++;
}
if(!associated){
// No TCP connections are associated, this is a candidate for removal
char c;
if(read(_phy.getDescriptor(rpc_sockets[i]),&c,1) < 0) {
closeClient(rpc_sockets[i]);
}
else {
// Handle RPC call, this is rare
// phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len)
} }
} }
fprintf(stderr, "tcp_conns = %d, rpc_socks = %d\n", tcp_connections.size(), rpc_sockets.size());
} }
} }
if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) { if (since_tcp >= ZT_LWIP_TCP_TIMER_INTERVAL) {
@ -582,6 +600,7 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err)
int n = write(larg_fd, "z", 1); // accept() in library waits for this byte int n = write(larg_fd, "z", 1); // accept() in library waits for this byte
if(n > 0) { if(n > 0) {
if(sock_fd_write(send_fd, fds[1]) > 0) { if(sock_fd_write(send_fd, fds[1]) > 0) {
close(fds[1]); // close other end of socketpair
new_tcp_conn->pending = true; new_tcp_conn->pending = true;
} }
else { else {
@ -958,7 +977,9 @@ void NetconEthernetTap::handle_listen(PhySocket *sock, void **uptr, struct liste
lwipstack->tcp_arg(listening_pcb, new Larg(this, conn)); lwipstack->tcp_arg(listening_pcb, new Larg(this, conn));
/* we need to wait for the client to send us the fd allocated on their end /* we need to wait for the client to send us the fd allocated on their end
for this listening socket */ for this listening socket */
conn->pending = true; fcntl(_phy.getDescriptor(conn->dataSock), F_SETFL, O_NONBLOCK);
conn->listening = true;
conn->pending = true;
send_return_value(conn, ERR_OK, ERR_OK); send_return_value(conn, ERR_OK, ERR_OK);
} }
else { else {
@ -1031,6 +1052,7 @@ void NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, struct socke
new_conn->their_fd = fds[1]; new_conn->their_fd = fds[1];
tcp_connections.push_back(new_conn); tcp_connections.push_back(new_conn);
sock_fd_write(_phy.getDescriptor(sock), fds[1]); sock_fd_write(_phy.getDescriptor(sock), fds[1]);
close(fds[1]); // close other end of socketpair
// Once the client tells us what its fd is on the other end, we can then complete the mapping // Once the client tells us what its fd is on the other end, we can then complete the mapping
new_conn->pending = true; new_conn->pending = true;
} }
@ -1171,9 +1193,10 @@ void NetconEthernetTap::handle_write(TcpConnection *conn)
now space on the buffer */ now space on the buffer */
if(sndbuf == 0) { if(sndbuf == 0) {
_phy.setNotifyReadable(conn->dataSock, false); _phy.setNotifyReadable(conn->dataSock, false);
lwipstack->_tcp_output(conn->pcb);
return; return;
} }
if(!conn->listening)
lwipstack->_tcp_output(conn->pcb);
if(conn->dataSock) { if(conn->dataSock) {
int read_fd = _phy.getDescriptor(conn->dataSock); int read_fd = _phy.getDescriptor(conn->dataSock);
@ -1186,7 +1209,7 @@ void NetconEthernetTap::handle_write(TcpConnection *conn)
// NOTE: this assumes that lwipstack->_lock is locked, either // NOTE: this assumes that lwipstack->_lock is locked, either
// because we are in a callback or have locked it manually. // because we are in a callback or have locked it manually.
int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY); int err = lwipstack->_tcp_write(conn->pcb, &conn->buf, r, TCP_WRITE_FLAG_COPY);
//lwipstack->_tcp_output(conn->pcb); lwipstack->_tcp_output(conn->pcb);
if(err != ERR_OK) { if(err != ERR_OK) {
fprintf(stderr, "handle_write(): error while writing to PCB, (err = %d)\n", err); fprintf(stderr, "handle_write(): error while writing to PCB, (err = %d)\n", err);
return; return;

View File

@ -55,6 +55,7 @@ namespace ZeroTier {
int perceived_fd; int perceived_fd;
int their_fd; int their_fd;
bool pending; bool pending;
bool listening;
PhySocket *rpcSock; PhySocket *rpcSock;
PhySocket *dataSock; PhySocket *dataSock;

Binary file not shown.