Transfer speed increate + stability fixes

This commit is contained in:
Joseph Henry 2016-01-20 13:38:14 -08:00
parent 357cb92f2e
commit bcaf42e075
4 changed files with 27 additions and 23 deletions

View File

@ -33,6 +33,7 @@
#include <utility> #include <utility>
#include <string> #include <string>
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/syscall.h>
#include "NetconEthernetTap.hpp" #include "NetconEthernetTap.hpp"
@ -304,13 +305,14 @@ void NetconEthernetTap::threadMain()
// Connection prunning // Connection prunning
if (since_status >= STATUS_TMR_INTERVAL) { if (since_status >= STATUS_TMR_INTERVAL) {
prev_status_time = now; prev_status_time = now;
for(size_t i=0;i<_TcpConnections.size();++i) { for(size_t i=0;i<_TcpConnections.size();++i) {
if(!_TcpConnections[i]->sock) if(!_TcpConnections[i]->sock)
continue; continue;
int fd = _phy.getDescriptor(_TcpConnections[i]->sock); int fd = _phy.getDescriptor(_TcpConnections[i]->sock);
dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size()); dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size());
// If there's anything on the RX buf, set to notify in case we stalled
if(_TcpConnections[i]->rxsz > 0)
_phy.setNotifyWritable(_TcpConnections[i]->sock, true);
fcntl(fd, F_SETFL, O_NONBLOCK); fcntl(fd, F_SETFL, O_NONBLOCK);
unsigned char tmpbuf[BUF_SZ]; unsigned char tmpbuf[BUF_SZ];
@ -417,30 +419,31 @@ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) {
closeConnection(sock); closeConnection(sock);
} }
void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr) void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked)
{ {
Mutex::Lock _l(_tcpconns_m); if(!lwip_invoked) {
Mutex::Lock _l2(_rx_buf_m); _tcpconns_m.lock();
_rx_buf_m.lock();
}
TcpConnection *conn = getConnection(sock); TcpConnection *conn = getConnection(sock);
int len = conn->rxsz; if(!conn->rxsz)
int n = _phy.streamSend(conn->sock, conn->rxbuf, len); return;
int n = _phy.streamSend(conn->sock, conn->rxbuf, conn->rxsz);
if(n > 0) { if(n > 0) {
if(n < len) { if(conn->rxsz-n > 0)
dwr(MSG_ERROR,"\n phyOnUnixWritable(): unable to write entire \"block\" to stream\n"); memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxsz-n);
}
if(len-n)
memcpy(conn->rxbuf, conn->rxbuf+n, len-n);
conn->rxsz -= n; conn->rxsz -= n;
float max = (float)DEFAULT_BUF_SZ; float max = (float)DEFAULT_BUF_SZ;
dwr(MSG_TRANSFER," <--- RX :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n", dwr(MSG_TRANSFER," <--- RX :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n",
(float)conn->txsz / max, (float)conn->rxsz / max, sock, n); (float)conn->txsz / max, (float)conn->rxsz / max, sock, n);
lwipstack->_tcp_recved(conn->pcb, n); lwipstack->_tcp_recved(conn->pcb, n);
if(conn->rxsz == 0){
_phy.setNotifyWritable(conn->sock, false); // Nothing more to be notified about
}
} else { } else {
perror("\n"); dwr(MSG_ERROR," phyOnUnixWritable(): errno = %d, rxsz = %d\n", errno, conn->rxsz);
dwr(MSG_ERROR," phyOnUnixWritable(): errno = %d\n", errno); _phy.setNotifyWritable(conn->sock, false);
}
if(!lwip_invoked) {
_tcpconns_m.unlock();
_rx_buf_m.unlock();
} }
} }
@ -673,7 +676,6 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *
Larg *l = (Larg*)arg; Larg *l = (Larg*)arg;
int tot = 0; int tot = 0;
struct pbuf* q = p; struct pbuf* q = p;
Mutex::Lock _l(l->tap->_tcpconns_m); Mutex::Lock _l(l->tap->_tcpconns_m);
if(!l->conn) { if(!l->conn) {
@ -702,8 +704,10 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf *
p = p->next; p = p->next;
tot += len; tot += len;
} }
if(tot) if(tot) {
l->tap->phyOnUnixWritable(l->conn->sock, NULL, true);
l->tap->_phy.setNotifyWritable(l->conn->sock, true); l->tap->_phy.setNotifyWritable(l->conn->sock, true);
}
l->tap->lwipstack->_pbuf_free(q); l->tap->lwipstack->_pbuf_free(q);
return ERR_OK; return ERR_OK;
} }

View File

@ -58,7 +58,7 @@ struct accept_st;
#define APPLICATION_POLL_FREQ 2 #define APPLICATION_POLL_FREQ 2
#define ZT_LWIP_TCP_TIMER_INTERVAL 5 #define ZT_LWIP_TCP_TIMER_INTERVAL 5
#define STATUS_TMR_INTERVAL 500 // How often we check connection statuses (in ms) #define STATUS_TMR_INTERVAL 250 // How often we check connection statuses (in ms)
#define DEFAULT_BUF_SZ 1024 * 1024 * 2 #define DEFAULT_BUF_SZ 1024 * 1024 * 2
#define DEFAULT_BUF_SOFTMAX DEFAULT_BUF_SZ / 2 #define DEFAULT_BUF_SOFTMAX DEFAULT_BUF_SZ / 2
@ -405,7 +405,7 @@ private:
/* /*
* Notifies us that we can write to an application's socket * Notifies us that we can write to an application's socket
*/ */
void phyOnUnixWritable(PhySocket *sock,void **uptr); void phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked);
/* /*
* Returns a pointer to a TcpConnection associated with a given PhySocket * Returns a pointer to a TcpConnection associated with a given PhySocket

View File

@ -976,7 +976,7 @@ public:
ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable
if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) {
try { try {
_handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr),false);
} catch ( ... ) {} } catch ( ... ) {}
} }
if (FD_ISSET(sock,&rfds)) { if (FD_ISSET(sock,&rfds)) {

View File

@ -1120,7 +1120,7 @@ public:
inline void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {} inline void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {}
inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} inline void phyOnUnixClose(PhySocket *sock,void **uptr) {}
inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {}
inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} inline void phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked) {}
inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,void **nuptr,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwc) inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,void **nuptr,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwc)
{ {