Added RPC ACK

This commit is contained in:
Joseph Henry 2016-01-11 10:35:02 -08:00
parent 3e65ecb93d
commit 39d79f34ab
2 changed files with 9 additions and 17 deletions

View File

@ -377,7 +377,7 @@ void NetconEthernetTap::threadMain()
int fd = _phy.getDescriptor(tcp_connections[i]->sock); int fd = _phy.getDescriptor(tcp_connections[i]->sock);
if(tcp_connections[i]->idx > 0){ if(tcp_connections[i]->idx > 0){
dwr(MSG_DEBUG, "----------------------writing from poll\n"); dwr(MSG_DEBUG, "writing from poll\n");
lwipstack->_lock.lock(); lwipstack->_lock.lock();
handle_write(tcp_connections[i]); handle_write(tcp_connections[i]);
lwipstack->_lock.unlock(); lwipstack->_lock.unlock();
@ -431,6 +431,7 @@ void NetconEthernetTap::phyOnTcpWritable(PhySocket *sock,void **uptr) {}
*/ */
void NetconEthernetTap::closeConnection(PhySocket *sock) void NetconEthernetTap::closeConnection(PhySocket *sock)
{ {
dwr(MSG_DEBUG,"closeConnection(%x)",sock);
TcpConnection *conn = getConnection(sock); TcpConnection *conn = getConnection(sock);
if(conn) { if(conn) {
if(!conn->pcb) if(!conn->pcb)
@ -492,7 +493,6 @@ void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid,
*/ */
void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len)
{ {
//usleep(5000);
//dwr(MSG_DEBUG,"\n\n\n<%x> phyOnUnixData(): len = %d\n", sock, len); //dwr(MSG_DEBUG,"\n\n\n<%x> phyOnUnixData(): len = %d\n", sock, len);
uint64_t magic_num; uint64_t magic_num;
pid_t pid, tid; pid_t pid, tid;
@ -524,7 +524,7 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
pidmap[sock] = pid; pidmap[sock] = pid;
new_conn->pid = pid; new_conn->pid = pid;
} }
return; // Don't close the socket, we'll use this later for data //return; // Don't close the socket, we'll use this later for data
} }
else { // All RPCs other than RPC_SOCKET else { // All RPCs other than RPC_SOCKET
streamsock = sockmap[magic_num]; streamsock = sockmap[magic_num];
@ -539,6 +539,7 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data); jobmap[magic_num] = std::make_pair<PhySocket*, void*>(sock, data);
} }
} }
write(_phy.getDescriptor(sock), "z", 1); // RPC ACK byte to maintain RPC->Stream order
} }
// STREAM // STREAM
@ -563,28 +564,19 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
dwr(MSG_DEBUG," <%x> creating sockmap entry for %llu\n", sock, magic_num); dwr(MSG_DEBUG," <%x> creating sockmap entry for %llu\n", sock, magic_num);
sockmap[magic_num] = sock; sockmap[magic_num] = sock;
} }
else { else
dwr(MSG_DEBUG," <%x> found_job\n", sock);
found_job = true; found_job = true;
} }
} }
}
conn = getConnection(sock); conn = getConnection(sock);
if(!conn) if(!conn)
return; return;
if(padding_pos == -1) // [DATA] if(padding_pos == -1) { // [DATA]
{
dwr(MSG_DEBUG, "copy everything... wlen = %d, conn = %x, conn->buf = %x, buf = %x\n", wlen, conn, conn->buf, buf);
dwr(MSG_DEBUG, " copy everything... conn->idx = %d, sizeof(conn->buf) = %d\n", conn->idx, sizeof(conn->buf));
memcpy(&conn->buf[conn->idx], buf, wlen); memcpy(&conn->buf[conn->idx], buf, wlen);
dwr(MSG_DEBUG, "finished\n");
} }
else { // Padding found, implies a token is present else { // Padding found, implies a token is present
dwr(MSG_DEBUG, " <%x> token_pos = %d, GRABBING DATA\n", sock, token_pos);
// [TOKEN] // [TOKEN]
if(len == TOKEN_SIZE && token_pos == 0) { if(len == TOKEN_SIZE && token_pos == 0) {
wlen = 0; // Nothing to write wlen = 0; // Nothing to write

View File

@ -98,7 +98,7 @@ int rpc_join(const char * sockname)
int rpc_send_command(int cmd, int forfd, void *data, int len) int rpc_send_command(int cmd, int forfd, void *data, int len)
{ {
pthread_mutex_lock(&lock); pthread_mutex_lock(&lock);
char padding[] = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89}; char c, padding[] = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89};
char cmdbuf[BUF_SZ], magic[TOKEN_SIZE], metabuf[BUF_SZ]; char cmdbuf[BUF_SZ], magic[TOKEN_SIZE], metabuf[BUF_SZ];
memcpy(magic+MAGIC_SIZE, padding, TOKEN_SIZE); memcpy(magic+MAGIC_SIZE, padding, TOKEN_SIZE);
uint64_t magic_num; uint64_t magic_num;
@ -151,8 +151,8 @@ int rpc_send_command(int cmd, int forfd, void *data, int len)
errno = 0; errno = 0;
} }
// Write token to corresponding data stream // Write token to corresponding data stream
if(n_write > 0 && forfd > -1){ read(rpc_sock, &c, 1);
usleep(5000); if(c == 'z' && n_write > 0 && forfd > -1){
int w = send(forfd, &magic, TOKEN_SIZE, 0); int w = send(forfd, &magic, TOKEN_SIZE, 0);
} }
// Process response from service // Process response from service