diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index 389be1430..55e010027 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -57,7 +57,9 @@ #define APPLICATION_POLL_FREQ 2 #define ZT_LWIP_TCP_TIMER_INTERVAL 5 #define STATUS_TMR_INTERVAL 1000 // How often we check connection statuses (in ms) -#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 * 2 + +#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 * 2 + namespace ZeroTier { @@ -160,6 +162,7 @@ NetconEthernetTap::NetconEthernetTap( { char sockPath[4096],lwipPath[4096]; rpc_counter = -1; + rcqidx = 0; Utils::snprintf(sockPath,sizeof(sockPath),"%s%snc_%.16llx",homePath,ZT_PATH_SEPARATOR_S,_nwid,ZT_PATH_SEPARATOR_S,(unsigned long long)nwid); _dev = sockPath; // in netcon mode, set device to be just the network ID @@ -349,7 +352,7 @@ void NetconEthernetTap::threadMain() status_remaining = STATUS_TMR_INTERVAL - since_status; - dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size()); + //dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", tcp_connections.size(), jobmap.size()); for(size_t i=0; i<tcp_connections.size(); i++) { // No TCP connections are associated, this is a candidate for removal @@ -360,7 +363,7 @@ void NetconEthernetTap::threadMain() fcntl(fd, F_SETFL, O_NONBLOCK); unsigned char tmpbuf[BUF_SZ]; int n = read(fd,&tmpbuf,BUF_SZ); - dwr(MSG_DEBUG," tap_thread(): <%x> conn->idx = %d\n", tcp_connections[i]->sock, tcp_connections[i]->idx); + //dwr(MSG_DEBUG," tap_thread(): <%x> conn->idx = %d\n", tcp_connections[i]->sock, tcp_connections[i]->idx); if(tcp_connections[i]->pcb->state == SYN_SENT) { dwr(MSG_DEBUG," tap_thread(): <%x> state = SYN_SENT, candidate for removal\n", tcp_connections[i]->sock); } @@ -502,6 +505,31 @@ void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, memcpy(CANARY, &buf[IDX_PAYLOAD+1], CANARY_SIZE); } + +void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr) +{ + TcpConnection *conn = getConnection(sock); + int len = rcqidx; + int n = _phy.streamSend(conn->sock, rcq, len); + if(n > 0) { + if(n < len) { + dwr(MSG_INFO,"\n phyOnUnixWritable(): unable to write entire \"block\" to stream\n"); + } + memcpy(rcq, rcq+n, rcqidx-n); + rcqidx -= n; + lwipstack->_tcp_recved(conn->pcb, n); + if(rcqidx == 0) + _phy.setNotifyWritable(conn->sock, false); // Nothing more to be notified about + dwr(MSG_DEBUG," phyOnUnixWritable(): wrote %d bytes from RX buffer to <%x> (idx = %d)\n", n, conn->sock, rcqidx); + } + else { + perror("\n"); + fprintf(stderr, "errno = %d\n", errno); + dwr(MSG_INFO," phyOnUnixWritable(): No data written to stream <%x>\n", conn->sock); + } +} + + /* * Processes incoming data on a client-specific RPC connection */ @@ -793,7 +821,8 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err) err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err) { Larg *l = (Larg*)arg; - int n; + NetconEthernetTap *tap = l->tap; + int tot = 0; struct pbuf* q = p; if(!l->conn) { @@ -803,10 +832,10 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf if(p == NULL) { if(l->conn && !l->conn->listening) { dwr(MSG_INFO," nc_recved(): closing connection\n"); - if(l->tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK) { - dwr(MSG_ERROR," closeConnection(): Error while calling tcp_close()\n"); + if(tap->lwipstack->_tcp_close(l->conn->pcb) != ERR_OK) { + dwr(MSG_ERROR," nc_recved(): Error while calling tcp_close()\n"); } - l->tap->closeConnection(l->conn->sock); + tap->closeConnection(l->conn->sock); return ERR_ABRT; } else { @@ -814,26 +843,25 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf } return err; } - q = p; - while(p != NULL) { // Cycle through pbufs and write them to the socket + // Cycle through pbufs and write them to the RX buffer + // The RX buffer will be emptied via phyOnUnixWritable() + while(p != NULL) { if(p->len <= 0) - break; - else - dwr(MSG_DEBUG," nc_recved(): p->len = %d\n", p->len); - if((n = l->tap->_phy.streamSend(l->conn->sock,p->payload, p->len)) > 0) { - if(n < p->len) { - dwr(MSG_INFO," nc_recved(): unable to write entire pbuf to stream\n"); - } - l->tap->lwipstack->_tcp_recved(tpcb, n); - dwr(MSG_DEBUG," nc_recved(): wrote %d bytes to <%x>\n", n, l->conn->sock); - } - else{ - perror("\n"); - dwr(MSG_INFO," nc_recved(): No data written to stream <%x>\n", l->conn->sock); + break; + int avail = DEFAULT_READ_BUFFER_SIZE - tap->rcqidx; + int len = p->len; + if(avail < len) { + dwr(MSG_DEBUG," nc_recv(): not enough room (%d bytes) on RX buffer\n", avail); + exit(1); } + memcpy(tap->rcq + (tap->rcqidx), p->payload, len); + tap->rcqidx += len; + tap->_phy.setNotifyWritable(l->conn->sock, true); // Signal that we're interested in knowing when we can write p = p->next; + tot += len; } - l->tap->lwipstack->_pbuf_free(q); // free pbufs + dwr(MSG_DEBUG," nc_recv(): wrote %d bytes to RX buffer (idx = %d)\n", tot, tap->rcqidx); + tap->lwipstack->_pbuf_free(q); return ERR_OK; } @@ -1305,7 +1333,7 @@ void NetconEthernetTap::handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpC void NetconEthernetTap::handle_write(TcpConnection *conn) { - dwr(MSG_DEBUG,"handle_write(): conn->idx = %d, conn->sock = %x\n", conn->idx, conn->sock); + dwr(MSG_DEBUG_EXTRA,"handle_write(): conn->idx = %d, conn->sock = %x\n", conn->idx, conn->sock); if(!conn) { dwr(MSG_ERROR," handle_write(): invalid connection\n"); return; diff --git a/netcon/NetconEthernetTap.hpp b/netcon/NetconEthernetTap.hpp index 02ffb40a9..26a224a39 100644 --- a/netcon/NetconEthernetTap.hpp +++ b/netcon/NetconEthernetTap.hpp @@ -46,6 +46,8 @@ #include "netif/etharp.h" +#define DEFAULT_READ_BUFFER_SIZE 1024 * 1024 * 2 + struct tcp_pcb; struct socket_st; struct listen_st; @@ -134,6 +136,7 @@ private: void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN); void phyOnUnixClose(PhySocket *sock,void **uptr); void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len); + void phyOnUnixWritable(PhySocket *sock,void **uptr); void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable); TcpConnection *getConnection(PhySocket *sock); @@ -159,6 +162,9 @@ private: std::vector<TcpConnection*> tcp_connections; std::map<PhySocket*, pid_t> pidmap; + + char rcq[DEFAULT_READ_BUFFER_SIZE]; + int rcqidx; std::map<uint64_t, std::pair<PhySocket*, void*> > jobmap; pid_t rpc_counter; diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index 8b6db36b3..7dc0b5ddd 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -974,6 +974,11 @@ public: case ZT_PHY_SOCKET_UNIX_IN: { #ifdef __UNIX_LIKE__ ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable + if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { + try { + _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); + } catch ( ... ) {} + } if (FD_ISSET(sock,&rfds)) { long n = (long)::read(sock,buf,sizeof(buf)); if (n <= 0) { @@ -984,11 +989,6 @@ public: } catch ( ... ) {} } } - if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { - try { - //_handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); - } catch ( ... ) {} - } #endif // __UNIX_LIKE__ } break;