Updated RPC handling

This commit is contained in:
Joseph Henry
2015-11-12 08:21:05 -08:00
parent 82a60b1e28
commit e5fad005a9
22 changed files with 345 additions and 41 deletions

View File

@ -567,22 +567,39 @@ void NetconEthernetTap::phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void *
rpc_sockets.push_back(sockN);
}
void unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload);
/*
* Processes incoming data on a client-specific RPC connection
*/
void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len)
{
pid_t pid, tid;
int rpc_count;
char timestamp[20];
char cmd;
void *payload;
unload_rpc(data, pid, tid, rpc_count, timestamp, cmd, payload);
dwr("\n\nRX: (pid=%d, tid=%d, rpc_count=%d, timestamp=%s, cmd=%d\n", pid, tid, rpc_count, timestamp, cmd);
unsigned char *buf = (unsigned char*)data;
switch(buf[0])
switch(cmd)
{
case RPC_SOCKET:
dwr(2, "RPC_SOCKET\n");
struct socket_st socket_rpc;
pid_t pid;
memcpy(&pid, &buf[1], sizeof(pid_t)); // PID for client RPC tracking (only for debug)
memcpy(&socket_rpc, &buf[sizeof(pid_t)], sizeof(struct socket_st));
struct socket_st socket_rpc;
memcpy(&socket_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct socket_st));
if(rpc_count==rpc_counter) {
dwr("Detected repeat RPC.\n");
//return;
}
else {
rpc_counter = rpc_count;
}
TcpConnection * new_conn;
if(new_conn = handle_socket(sock, uptr, &socket_rpc)) {
if((new_conn = handle_socket(sock, uptr, &socket_rpc))) {
pidmap[sock] = pid;
new_conn->pid = pid;
}
@ -590,24 +607,30 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
case RPC_LISTEN:
dwr(2, "RPC_LISTEN\n");
struct listen_st listen_rpc;
memcpy(&listen_rpc, &buf[1], sizeof(struct listen_st));
memcpy(&listen_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct listen_st));
handle_listen(sock, uptr, &listen_rpc);
break;
case RPC_BIND:
dwr(2, "RPC_BIND\n");
struct bind_st bind_rpc;
memcpy(&bind_rpc, &buf[1], sizeof(struct bind_st));
memcpy(&bind_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct bind_st));
handle_bind(sock, uptr, &bind_rpc);
break;
case RPC_CONNECT:
dwr(2, "RPC_CONNECT\n");
struct connect_st connect_rpc;
memcpy(&connect_rpc, &buf[1], sizeof(struct connect_st));
memcpy(&connect_rpc, &buf[IDX_PAYLOAD+1], sizeof(struct connect_st));
handle_connect(sock, uptr, &connect_rpc);
break;
case RPC_MAP:
dwr(2, "RPC_MAP\n");
handle_retval(sock, uptr, buf);
dwr(2, "RPC_MAP (len = %d)\n", len);
int newfd;
//memcpy(&pid, &buf[IDX_PAYLOAD+1], sizeof(pid_t)); // PID for client RPC tracking (only for debug)
memcpy(&newfd, &buf[IDX_PAYLOAD+1], sizeof(int));
//dwr("newfd = %d\n", newfd);
handle_retval(sock, uptr, rpc_count, newfd);
break;
case RPC_MAP_REQ:
dwr(2, "RPC_MAP_REQ\n");
@ -616,6 +639,7 @@ void NetconEthernetTap::phyOnUnixData(PhySocket *sock,void **uptr,void *data,uns
default:
break;
}
//memset(data, '\0', 256);
}
/*
@ -973,7 +997,7 @@ void NetconEthernetTap::handle_map_request(PhySocket *sock, void **uptr, unsigne
dwr(4, " handle_map_request()\n");
TcpConnection *conn = (TcpConnection*)*uptr;
int req_fd;
memcpy(&req_fd, &buf[1], sizeof(req_fd));
memcpy(&req_fd, &buf[IDX_PAYLOAD+1], sizeof(req_fd));
for(size_t i=0; i<tcp_connections.size(); i++) {
if(tcp_connections[i]->rpcSock == conn->rpcSock && tcp_connections[i]->perceived_fd == req_fd){
send_return_value(conn, 1, ERR_OK); // True
@ -994,16 +1018,26 @@ void NetconEthernetTap::handle_map_request(PhySocket *sock, void **uptr, unsigne
* @param structure containing the data and parameters for this client's RPC
*
*/
void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, unsigned char* buf)
void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, int rpc_count, int newfd)
{
dwr(4, " handle_retval()\n");
TcpConnection *conn = (TcpConnection*)*uptr;
if(!conn->pending)
return;
// Copy data from buffer to TcpConnection object, update status
memcpy(&(conn->perceived_fd), &buf[1], sizeof(int));
conn->pending = false;
conn->perceived_fd = newfd;
if(rpc_count==rpc_counter) {
dwr("Detected repeat RPC.\n");
//return;
}
else {
rpc_counter = rpc_count;
//dwr("pid = %d\n", rpc_count);
//dwr("rpc_counter = %d\n", rpc_counter);
}
dwr(4, " handle_retval(): CONN:%x - Mapping [our=%d -> their=%d]\n",conn,
_phy.getDescriptor(conn->dataSock), conn->perceived_fd);
@ -1027,7 +1061,7 @@ void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, unsigned cha
}
else {
dwr(" handle_retval(): CONN:%x - This socket is mapped to two different pipes (?). Exiting.\n", conn);
die(0); // FIXME: Print service mapping state and exit
//die(0); // FIXME: Print service mapping state and exit
}
}
}
@ -1070,7 +1104,7 @@ void NetconEthernetTap::handle_retval(PhySocket *sock, void **uptr, unsigned cha
*/
void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st *bind_rpc)
{
dwr(3, " handle_bind()\n");
struct sockaddr_in *connaddr;
connaddr = (struct sockaddr_in *) &bind_rpc->addr;
int conn_port = lwipstack->ntohs(connaddr->sin_port);
@ -1078,6 +1112,8 @@ void NetconEthernetTap::handle_bind(PhySocket *sock, void **uptr, struct bind_st
conn_addr.addr = *((u32_t *)_ips[0].rawIpData());
TcpConnection *conn = getConnectionByTheirFD(sock, bind_rpc->sockfd);
dwr(3, " handle_bind(%d)\n", bind_rpc->sockfd);
if(conn) {
if(conn->pcb->state == CLOSED){
int err = lwipstack->tcp_bind(conn->pcb, &conn_addr, conn_port);
@ -1229,7 +1265,6 @@ TcpConnection * NetconEthernetTap::handle_socket(PhySocket *sock, void **uptr, s
new_conn->their_fd = fds[1];
tcp_connections.push_back(new_conn);
int n = sock_fd_write(_phy.getDescriptor(sock), fds[1]);
dwr("wrote %d bytes\n", n);
close(fds[1]); // close other end of socketpair
// Once the client tells us what its fd is on the other end, we can then complete the mapping
new_conn->pending = true;