From 99afc740215b84858d272642c62cacbc7f07e990 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Sat, 12 Dec 2015 01:28:59 -0800 Subject: [PATCH] FDs transferred over listen socket + other fixes --- netcon/Intercept.c | 63 +++++++++----------------- netcon/NetconEthernetTap.cpp | 86 ++++++++++++++++-------------------- netcon/common.inc.c | 8 ++-- 3 files changed, 64 insertions(+), 93 deletions(-) diff --git a/netcon/Intercept.c b/netcon/Intercept.c index 9ecc7d40c..6c400eddc 100644 --- a/netcon/Intercept.c +++ b/netcon/Intercept.c @@ -195,7 +195,6 @@ int get_new_fd(int oversock) int send_cmd(int rpc_fd, char *cmd) { - //dwr(MSG_DEBUG, "\n---send_cmd[start]\n"); pthread_mutex_lock(&lock); char metabuf[BUF_SZ]; // portion of buffer which contains RPC metadata for debugging #ifdef VERBOSE @@ -223,7 +222,7 @@ int send_cmd(int rpc_fd, char *cmd) #endif /* Combine command flag+payload with RPC metadata */ memcpy(&metabuf[IDX_PAYLOAD], cmd, PAYLOAD_SZ); - usleep(10000); + usleep(1000); int n_write = write(rpc_fd, &metabuf, BUF_SZ); if(n_write < 0){ dwr(MSG_DEBUG,"Error writing command to service (CMD = %d)\n", cmd[0]); @@ -235,14 +234,12 @@ int send_cmd(int rpc_fd, char *cmd) if(n_write > 0) { if(cmd[0]==RPC_SOCKET) { - dwr(MSG_DEBUG," waiting on get_new_fd()...\n"); ret = get_new_fd(fdret_sock); } if(cmd[0]==RPC_MAP) { ret = n_write; } if(cmd[0]==RPC_MAP_REQ || cmd[0]==RPC_CONNECT || cmd[0]==RPC_BIND) { - dwr(MSG_DEBUG," waiting on get_retval()...\n"); ret = get_retval(); } if(cmd[0]==RPC_LISTEN || cmd[0]==RPC_GETSOCKNAME) { @@ -253,7 +250,6 @@ int send_cmd(int rpc_fd, char *cmd) ret = -1; } pthread_mutex_unlock(&lock); - //dwr(MSG_DEBUG, "---send_cmd[end]\n\n"); return ret; } @@ -529,10 +525,8 @@ int socket(SOCKET_SIG) memset(cmd, '\0', BUF_SZ); cmd[0] = RPC_MAP; memcpy(&cmd[1], &newfd, sizeof(newfd)); - /* send fd mapping and get confirmation */ - dwr(MSG_DEBUG, "pre-send_cmd\n"); - err = send_cmd(fdret_sock, cmd); - dwr(MSG_DEBUG, "post-send_cmd\n"); + /* send fd mapping and get confirmation */ + err = send_cmd(fdret_sock, cmd); if(err > -1) { errno = ERR_OK; @@ -611,7 +605,6 @@ int connect(CONNECT_SIG) } /* Assemble and send RPC */ - int err; char cmd[BUF_SZ]; memset(cmd, '\0', BUF_SZ); struct connect_st rpc_st; @@ -621,7 +614,6 @@ int connect(CONNECT_SIG) memcpy(&rpc_st.__len, &__len, sizeof(socklen_t)); cmd[0] = RPC_CONNECT; memcpy(&cmd[1], &rpc_st, sizeof(struct connect_st)); - return send_cmd(fdret_sock, cmd); } @@ -672,7 +664,6 @@ int bind(BIND_SIG) } #endif - int err; /* make sure we don't touch any standard outputs */ if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO) return(realbind(sockfd, addr, addrlen)); @@ -807,42 +798,31 @@ int accept(ACCEPT_SIG) /* The following line is required for libuv/nodejs to accept connections properly, however, this has the side effect of causing certain webservers to max out the CPU in an accept loop */ - //fcntl(sockfd, F_SETFL, O_NONBLOCK); + fcntl(sockfd, F_SETFL, O_NONBLOCK); + int new_conn_socket = get_new_fd(sockfd); - char c[1]; - int new_conn_socket; - int n = read(sockfd, c, sizeof(c)); /* Read signal byte */ - - if(n > 0) + if(new_conn_socket > 0) { - new_conn_socket = get_new_fd(fdret_sock); + //new_conn_socket = get_new_fd(fdret_sock); dwr(MSG_DEBUG, " accept(): RX: fd = (%d) over (%d)\n", new_conn_socket, fdret_sock); - if(new_conn_socket > 0) { - /* Send our local-fd number back to service so it can complete its mapping table */ - memset(cmd, '\0', BUF_SZ); - cmd[0] = RPC_MAP; - memcpy(&cmd[1], &new_conn_socket, sizeof(new_conn_socket)); + /* Send our local-fd number back to service so it can complete its mapping table */ + memset(cmd, '\0', BUF_SZ); + cmd[0] = RPC_MAP; + memcpy(&cmd[1], &new_conn_socket, sizeof(new_conn_socket)); - dwr(MSG_DEBUG, "accept(): sending perceived fd (%d) to service.\n", new_conn_socket); - int n_write = send_cmd(fdret_sock, cmd); + dwr(MSG_DEBUG, "accept(): sending perceived fd (%d) to service.\n", new_conn_socket); + int n_write = send_cmd(fdret_sock, cmd); - if(n_write < 0) { - errno = ECONNABORTED; /* TODO: Closest match, service unreachable */ - handle_error("accept", "ECONNABORTED - Error sending perceived FD to service", -1); - return -1; - } - errno = ERR_OK; - dwr(MSG_DEBUG,"*accept()=%d\n", new_conn_socket); - handle_error("accept", "", new_conn_socket); - return new_conn_socket; /* OK */ - } - else { - errno = ECONNABORTED; /* TODO: Closest match, service unreachable */ - handle_error("accept", "ECONNABORTED - Error receiving new FD from service", -1); + if(n_write < 0) { + errno = ECONNABORTED; + handle_error("accept", "ECONNABORTED - Error sending perceived FD to service", -1); return -1; } + errno = ERR_OK; + dwr(MSG_DEBUG,"accept()=%d\n", new_conn_socket); + handle_error("accept", "", new_conn_socket); + return new_conn_socket; /* OK */ } - errno = EAGAIN; /* necessary? */ handle_error("accept", "EAGAIN - Error reading signal byte from service", -1); return -EAGAIN; @@ -1024,8 +1004,7 @@ int getsockname(GETSOCKNAME_SIG) dwr(MSG_ERROR, "getsockname(): SYMBOL NOT FOUND.\n"); return -1; } - return realgetsockname(sockfd, addr, addrlen); - +return realgetsockname(sockfd, addr, addrlen); /* assemble command */ char cmd[BUF_SZ]; struct getsockname_st rpc_st; diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index 9d9c4e05b..7d68f528d 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -832,46 +832,38 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err) int listening_fd = tap->_phy.getDescriptor(conn->dataSock); if(conn) { - ZT_PHY_SOCKFD_TYPE fds[2]; - if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { - if(errno < 0) { - l->tap->send_return_value(conn, -1, errno); - dwr(MSG_ERROR, " nc_accept(): unable to create socketpair\n"); - return ERR_MEM; - } + ZT_PHY_SOCKFD_TYPE fds[2]; + if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { + if(errno < 0) { + l->tap->send_return_value(conn, -1, errno); + dwr(MSG_ERROR, " nc_accept(): unable to create socketpair\n"); + return ERR_MEM; } - TcpConnection *new_tcp_conn = new TcpConnection(); - new_tcp_conn->dataSock = tap->_phy.wrapSocket(fds[0], new_tcp_conn); - new_tcp_conn->rpcSock = conn->rpcSock; - new_tcp_conn->pcb = newpcb; - new_tcp_conn->their_fd = fds[1]; - tap->tcp_connections.push_back(new_tcp_conn); - dwr(MSG_DEBUG, " nc_accept(): socketpair = {%d, %d}\n", fds[0], fds[1]); - int n, send_fd = tap->_phy.getDescriptor(conn->rpcSock); - if((n = send(listening_fd, "z", 1, MSG_NOSIGNAL)) < 0) { - dwr(MSG_ERROR, " nc_accept(): Error: [send(listening_fd,...) = MSG_NOSIGNAL].\n"); - return -1; - } - else if(n > 0) { - if(sock_fd_write(send_fd, fds[1]) > 0) { - close(fds[1]); // close other end of socketpair - new_tcp_conn->pending = true; - } - else { - dwr(MSG_ERROR, " nc_accept(%d): unable to send fd to client\n", listening_fd); - } - } - else { - dwr(MSG_ERROR, " nc_accept(%d): error writing signal byte (send_fd = %d, perceived_fd = %d)\n", listening_fd, send_fd, fds[1]); - return -1; - } + } + TcpConnection *new_tcp_conn = new TcpConnection(); + new_tcp_conn->dataSock = tap->_phy.wrapSocket(fds[0], new_tcp_conn); + new_tcp_conn->rpcSock = conn->rpcSock; + new_tcp_conn->pcb = newpcb; + new_tcp_conn->their_fd = fds[1]; + tap->tcp_connections.push_back(new_tcp_conn); + dwr(MSG_DEBUG, " nc_accept(): socketpair = {%d, %d}\n", fds[0], fds[1]); + int send_fd = tap->_phy.getDescriptor(conn->rpcSock); + + if(sock_fd_write(listening_fd, fds[1]) < 0){ + dwr(MSG_ERROR, " nc_accept(%d): error writing signal byte (listen_fd = %d, perceived_fd = %d)\n", listening_fd, send_fd, fds[1]); + return -1; + } + else { + close(fds[1]); // close other end of socketpair + new_tcp_conn->pending = true; + } tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn)); tap->lwipstack->_tcp_recv(newpcb, nc_recved); tap->lwipstack->_tcp_err(newpcb, nc_err); tap->lwipstack->_tcp_sent(newpcb, nc_sent); tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1); tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections - return ERR_OK; + return ERR_OK; } else { dwr(MSG_ERROR, " nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd); @@ -904,10 +896,10 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf return ERR_OK; // ? } if(p == NULL) { - if(l->conn) { - dwr(MSG_INFO, " nc_recved(): closing connection\n"); - l->tap->closeConnection(l->conn); - return ERR_ABRT; + if(l->conn && !l->conn->listening) { + dwr(MSG_INFO, " nc_recved(): closing connection\n"); + // l->tap->closeConnection(l->conn); + return ERR_ABRT; } else { dwr(MSG_ERROR, " nc_recved(): can't locate connection via (arg)\n"); @@ -1092,11 +1084,11 @@ err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload) { unsigned char *buf = (unsigned char*)data; - memcpy(&pid, &buf[IDX_PID], sizeof(pid_t)); - memcpy(&tid, &buf[IDX_TID], sizeof(pid_t)); - memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int)); - memcpy(timestamp, &buf[IDX_TIME], 20); - memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char)); + memcpy(&pid, &buf[IDX_PID], sizeof(pid_t)); + memcpy(&tid, &buf[IDX_TID], sizeof(pid_t)); + memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int)); + memcpy(timestamp, &buf[IDX_TIME], 20); + memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char)); } /* @@ -1182,23 +1174,23 @@ void NetconEthernetTap::handle_getsockname(PhySocket *sock, void **uptr, struct TcpConnection *conn = getConnectionByTheirFD(sock, getsockname_rpc->sockfd); dwr(MSG_DEBUG, "handle_getsockname(): sockfd = %d\n", getsockname_rpc->sockfd); - /* +/* + int port = conn->addr->sin_port; int ip = conn->addr->sin_addr.s_addr; unsigned char d[4]; d[0] = ip & 0xFF; d[1] = (ip >> 8) & 0xFF; d[2] = (ip >> 16) & 0xFF; d[3] = (ip >> 24) & 0xFF; - int port = conn->addr->sin_port; dwr(MSG_ERROR, " handle_getsockname(): returning address: %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], port); - */ +*/ // Assemble address "command" to send to intercept char retmsg[sizeof(struct sockaddr)]; memset(&retmsg, '\0', sizeof(retmsg)); dwr(MSG_ERROR, " handle_getsockname(): %d\n", sizeof(retmsg)); - memcpy(&retmsg, conn->addr, sizeof(struct sockaddr)); - + if(conn == NULL) return; + memcpy(&retmsg, conn->addr, 1); // Get connection's RPC fd and send structure containing bound address int fd = _phy.getDescriptor(conn->rpcSock); write(fd, &retmsg, sizeof(struct sockaddr)); diff --git a/netcon/common.inc.c b/netcon/common.inc.c index 2a852b679..7eab8880d 100644 --- a/netcon/common.inc.c +++ b/netcon/common.inc.c @@ -171,17 +171,17 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd) size = recvmsg (sock, &msg, 0); if (size < 0) { perror ("recvmsg"); - exit(1); + return -1; } cmsg = CMSG_FIRSTHDR(&msg); if (cmsg && cmsg->cmsg_len == CMSG_LEN(sizeof(int))) { if (cmsg->cmsg_level != SOL_SOCKET) { fprintf (stderr, "invalid cmsg_level %d\n",cmsg->cmsg_level); - exit(1); + return -1; } if (cmsg->cmsg_type != SCM_RIGHTS) { fprintf (stderr, "invalid cmsg_type %d\n",cmsg->cmsg_type); - exit(1); + return -1; } *fd = *((int *) CMSG_DATA(cmsg)); @@ -190,7 +190,7 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd) size = read (sock, buf, bufsize); if (size < 0) { perror("read"); - exit(1); + return -1; } } return size;