Replace TrickComm with new connection_handler library

This commit is contained in:
Jacqueline Deans 2023-02-22 11:35:30 -06:00
parent c2e42f4ef4
commit 0788dcfa9b
42 changed files with 2562 additions and 944 deletions

View File

@ -14,28 +14,31 @@ LIBRARY DEPENDENCIES:
namespace Trick {
class ClientConnection {
public:
// Should this be here? ¯\_(ツ)_/¯
static const unsigned int MAX_CMD_LEN = 200000 ;
enum ConnectionType { TCP, UDP, MCAST, WS } ;
virtual int initialize() = 0;
// Pure virtual methods
virtual int start() = 0;
virtual int write (const std::string& message) = 0;
virtual int write (char * message, int size) = 0;
virtual std::string read (int max_len = MAX_CMD_LEN) = 0;
virtual int setBlockMode (bool blocking) = 0;
virtual int disconnect () = 0;
virtual std::string get_client_tag () = 0;
virtual int set_client_tag(std::string tag) = 0;
virtual int setBlockMode (int mode) = 0;
static const unsigned int MAX_CMD_LEN = 200000 ;
virtual bool isInitialized() = 0;
virtual std::string getClientTag ();
virtual int setClientTag (std::string tag);
virtual int restart() {};
protected:
ConnectionType _connection_type;
// RHEL appears to have an issue with std::atomic
// std::atomic_bool _is_initialized;
std::string _client_tag;
};
}

View File

@ -1,28 +1,39 @@
#ifndef CLIENT_LISTENER_HH
#define CLIENT_LISTENER_HH
#include "trick/tc.h"
#include "trick/TCConnection.hh"
/*
PURPOSE: ( Encapsulate a TCP server. )
*/
#include <string>
#include "trick/SystemInterface.hh"
#include "trick/TCPConnection.hh"
#define LISTENER_ERROR -1
namespace Trick {
class TCConnection;
class TCPConnection;
class ClientListener {
public:
ClientListener ();
ClientListener (SystemInterface * system_interface);
~ClientListener ();
// We'll see if we need separate methods for these
int initialize(std::string hostname, int port);
int initialize();
int setBlockMode(TCCommBlocking mode);
int setBlockMode(bool blocking);
bool checkForNewConnections();
const char * getHostname ();
TCPConnection * setUpNewConnection ();
std::string getHostname ();
int getPort();
@ -34,13 +45,18 @@ namespace Trick {
bool isInitialized();
friend int accept(ClientListener* listener, TCConnection* connection);
int restart();
private:
TCDevice _listen_dev;
std::string saved_source;
int port;
bool initialized;
int _listen_socket;
std::string _hostname;
int _port;
std::string _client_tag;
bool _initialized;
SystemInterface * _system_interface; /* ** */
};
}

View File

@ -33,7 +33,7 @@ namespace Trick {
**/
class SysThread : public Trick::ThreadBase {
public:
SysThread(std::string in_name, bool self_deleting = false);
SysThread(std::string in_name);
~SysThread();
static int ensureAllShutdown();
@ -47,7 +47,6 @@ namespace Trick {
static bool shutdown_finished;
bool self_deleting;
} ;
}

View File

@ -0,0 +1,50 @@
#ifndef __SYSTEM_INTERFACE__
#define __SYSTEM_INTERFACE__
#include <iostream>
#include <functional>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netdb.h>
#include <fcntl.h>
class SystemInterface {
public:
virtual int socket (int domain, int type, int protocol) { return ::socket ( domain, type, protocol); }
virtual int setsockopt (int socket, int level, int option_name, const void * option_value, socklen_t option_len) { return ::setsockopt ( socket, level, option_name, option_value, option_len); }
virtual int bind (int socket, struct sockaddr * address, socklen_t address_len) { return ::bind ( socket, address, address_len); }
virtual int getsockname (int socket, struct sockaddr * address, socklen_t * address_len) { return ::getsockname ( socket, address, address_len); }
virtual int listen (int socket, int backlog) { return ::listen ( socket, backlog); }
virtual int select (int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) { return ::select ( nfds, readfds, writefds, errorfds, timeout); }
virtual int close (int fildes) { return ::close ( fildes); }
virtual int getaddrinfo (const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) { return ::getaddrinfo ( hostname, servname, hints, res); }
virtual int fcntl (int fildes, int cmd, int arg) { return ::fcntl ( fildes, cmd, arg); }
virtual int shutdown (int socket, int how) { return ::shutdown ( socket, how); }
virtual int accept (int socket, struct sockaddr * address, socklen_t * address_len) { return ::accept ( socket, address, address_len); }
virtual ssize_t send (int socket, const void * buffer, size_t length, int flags) { return ::send ( socket, buffer, length, flags); }
virtual ssize_t sendto (int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) { return ::sendto ( socket, buffer, length, flags, dest_addr, dest_len); }
virtual ssize_t recv (int socket, void * buffer, size_t length, int flags) { return ::recv ( socket, buffer, length, flags); }
virtual ssize_t recvfrom (int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) { return ::recvfrom ( socket, buffer, length, flags, address, address_len); }
};
#endif

View File

@ -1,48 +0,0 @@
#ifndef TC_CONNECTION_HH
#define TC_CONNECTION_HH
/*
PURPOSE: ( Encapsulate a connection with TrickComm. )
*/
#include "trick/ClientConnection.hh"
#include "trick/ClientListener.hh"
#include "tc.h"
namespace Trick {
class ClientListener;
class TCConnection : public ClientConnection {
public:
TCConnection ();
int initialize() override;
int write (const std::string& message) override;
int write (char * message, int size) override;
std::string read (int max_len) override;
int disconnect () override;
std::string get_client_tag () override;
int set_client_tag(std::string tag) override;
int get_socket();
int setBlockMode(int block_mode) override;
int setErrorReporting (bool on);
static const unsigned int MAX_CMD_LEN = 200000 ;
friend int accept(ClientListener* listener, TCConnection* connection);
private:
TCDevice _device; /**< trick_io(**) */
};
int accept(ClientListener* listener, TCConnection* connection);
}
#endif

View File

@ -0,0 +1,46 @@
#ifndef TCP_CONNECTION_HH
#define TCP_CONNECTION_HH
/*
PURPOSE: ( Encapsulate a TCP connection. )
*/
#include "trick/ClientConnection.hh"
#include "trick/SystemInterface.hh"
namespace Trick {
class TCPConnection : public ClientConnection {
public:
TCPConnection (int listen_socket);
TCPConnection (SystemInterface * system_interface);
TCPConnection (int listen_socket, SystemInterface * system_interface);
int start() override;
int write (const std::string& message) override;
int write (char * message, int size) override;
std::string read (int max_len = MAX_CMD_LEN) override;
int disconnect () override;
bool isInitialized() override;
int setBlockMode(bool blocking) override;
int restart() override;
private:
int _socket;
bool _connected;
// This is needed to be able to accept an incoming connection, after being set up by the listener
int _listen_socket;
SystemInterface * _system_interface; /* ** */
};
}
#endif

View File

@ -131,7 +131,7 @@ namespace Trick {
virtual int cancel_thread() ;
/**
* Cancels thread.
* Joins thread.
* @return always 0
*/
virtual int join_thread() ;
@ -153,6 +153,12 @@ namespace Trick {
*/
virtual void dump( std::ostream & oss = std::cout ) ;
virtual void test_shutdown();
virtual void test_shutdown(void (*exit_handler) (void *), void * exit_arg);
virtual void thread_shutdown();
virtual void thread_shutdown(void (*exit_handler) (void *), void * exit_arg);
protected:
/** optional name of thread */
@ -170,6 +176,14 @@ namespace Trick {
/** Set of cpus to use with thread */
unsigned int max_cpu ; /**< trick_io(**) */
/** Track whether the thread has been created */
bool created ; /**< trick_io(**) */
/** Manage thread shutdown */
bool should_shutdown; /**< trick_io(**) */
bool cancellable; /**< trick_io(**) */
pthread_mutex_t shutdown_mutex; /**< trick_io(**) */
#if __linux
#ifndef SWIG
/** Set of cpus to use with thread */

View File

@ -0,0 +1,56 @@
#ifndef UDP_CONNECTION_HH
#define UDP_CONNECTION_HH
/*
PURPOSE: ( Encapsulate a UDP socket. )
*/
#include "trick/ClientConnection.hh"
#include "trick/SystemInterface.hh"
namespace Trick {
class UDPConnection : public ClientConnection {
public:
UDPConnection ();
UDPConnection (SystemInterface * system_interface);
int start() override;
int write (const std::string& message) override;
int write (char * message, int size) override;
std::string read (int max_len = MAX_CMD_LEN) override;
int disconnect () override;
bool isInitialized() override;
int setBlockMode(bool block_mode) override;
int restart() override;
// Non-override functions
int initialize(const std::string& hostname, int port);
int getPort();
std::string getHostname();
private:
bool _initialized;
bool _started;
int _socket;
int _port;
std::string _hostname;
struct sockaddr_in _remote_serv_addr;
SystemInterface * _system_interface; /* ** */
};
}
#endif

View File

@ -75,10 +75,10 @@ namespace Trick {
bool broadcast ; /**< trick_units(--) */
/** The listen device */
ClientListener listener;
ClientListener listener; /**< trick_io(**) trick_units(--) */
/* Multicast broadcaster */
MulticastManager multicast;
MulticastManager multicast; /**< trick_io(**) trick_units(--) */
/** The mutex to stop accepting new connections during restart\n */
pthread_mutex_t restart_pause ; /**< trick_io(**) */

View File

@ -11,11 +11,12 @@
#include <iostream>
#include <pthread.h>
#include "trick/tc.h"
#include "trick/TCConnection.hh"
#include "trick/SysThread.hh"
#include "trick/VariableServerSession.hh"
#include "trick/variable_server_sync_types.h"
#include "trick/variable_server_message_types.h"
#include "trick/ClientConnection.hh"
#include "trick/ClientListener.hh"
namespace Trick {
@ -39,7 +40,7 @@ namespace Trick {
@brief Constructor.
@param listen_dev - the TCDevice set up in listen()
*/
VariableServerThread(ClientListener * in_listen_dev ) ;
VariableServerThread() ;
virtual ~VariableServerThread() ;
/**
@ -50,6 +51,16 @@ namespace Trick {
void set_client_tag(std::string tag);
/**
@brief Open a UDP socket for this thread
*/
int open_udp_socket(const std::string& hostname, int port);
/**
@brief Open a TCP connection for this thread
*/
int open_tcp_connection(ClientListener * listener);
/**
@brief Block until thread has accepted connection
*/
@ -67,6 +78,8 @@ namespace Trick {
void restart() ;
void cleanup();
protected:
/**
@ -77,12 +90,11 @@ namespace Trick {
/** The Master variable server object. */
static VariableServer * vs ;
/** this is where a lot of this should happen now */
VariableServerSession * session;
/** Manages the variable list */
VariableServerSession * session; /**< trick_io(**) */
/** The listen device from the variable server\n */
ClientListener * listener; /**< trick_io(**) */
TCConnection connection; /**< trick_io(**) */
/** Connection to the client */
ClientConnection * connection; /**< trick_io(**) */
/** Value (1,2,or 3) that causes the variable server to output increasing amounts of debug information.\n */
int debug ; /**< trick_io(**) */
@ -91,8 +103,8 @@ namespace Trick {
bool enabled ; /**< trick_io(**) */
ConnectionStatus connection_status ; /**< trick_io(**) */
pthread_mutex_t connection_status_mutex;
pthread_cond_t connection_status_cv;
pthread_mutex_t connection_status_mutex; /**< trick_io(**) */
pthread_cond_t connection_status_cv; /**< trick_io(**) */
/** The mutex pauses all processing during checkpoint restart */
pthread_mutex_t restart_pause ; /**< trick_io(**) */

View File

@ -205,7 +205,7 @@ class Job(object):
for getting status information. More specific types of Jobs should inherit from
this base clasee.
"""
enums = ['NOT_STARTED', 'RUNNING', 'SUCCESS', 'FAILED']
enums = ['NOT_STARTED', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT']
Status = collections.namedtuple('Status', enums)(*(range(len(enums))))
_success_progress_bar = create_progress_bar(1, 'Success')
@ -220,7 +220,9 @@ class Job(object):
Job.Status.NOT_STARTED: ('NOT RUN', 'DARK_YELLOW'),
Job.Status.SUCCESS: ('OK', 'DARK_GREEN'),
Job.Status.FAILED: ('FAIL:' + str(self._expected_exit_status) + '/' +
str(self._exit_status), 'DARK_RED') }[self.get_status()]
str(self._exit_status), 'DARK_RED'),
Job.Status.TIMEOUT: ('TIMEOUT after ' + str(self._timeout) + ' seconds', 'YELLOW')
}[self.get_status()]
return printer.colorstr(text, color)
def __init__(self, name, command, log_file, expected_exit_status=0):
@ -245,6 +247,7 @@ class Job(object):
self._stop_time = None
self._exit_status = None
self._expected_exit_status = expected_exit_status
self._timeout = None
def start(self):
"""
@ -275,6 +278,8 @@ class Job(object):
This job completed with an exit status of zero.
Status.FAILED
This job completed with a non-zero exit status.
Status.TIMEOUT
This job ran longer than the allotted time given by execute_job
"""
if self._process is None:
return self.Status.NOT_STARTED
@ -283,6 +288,9 @@ class Job(object):
if self._exit_status is None:
return self.Status.RUNNING
if self._timeout is not None:
return self.Status.TIMEOUT
if self._stop_time is None:
self._stop_time = time.time()
@ -368,6 +376,17 @@ class Job(object):
"""
return self._done_string()
def _timeout_string(self):
"""
Get a string to display when this Job has failed.
Returns
-------
str
A string to be displayed when in the FAILED state.
"""
return self._done_string()
def _done_string(self):
"""
This class uses the same string for SUCCESS and FAILED, but
@ -573,7 +592,7 @@ class WorkflowCommon:
else:
return 'unknown'
def execute_jobs(self, jobs, max_concurrent=None, header=None):
def execute_jobs(self, jobs, max_concurrent=None, header=None, job_timeout=None):
"""
Run jobs, blocking until all have returned.
@ -694,6 +713,15 @@ class WorkflowCommon:
for i in range(min(len(waitingJobs), available_cpus)):
waitingJobs[i].start()
if job_timeout is not None:
# Check if any jobs have timed out
runningJobs = [job for job in jobs if job.get_status() is job.Status.RUNNING]
for runningJob in runningJobs:
if time.time() - runningJob._start_time > job_timeout:
runningJob._timeout = job_timeout
runningJob._stop_time = time.time()
runningJob.die()
# display the status if enabled
if stdscr:
status_pad.erase()
@ -802,8 +830,8 @@ class WorkflowCommon:
text, color = {
job.Status.NOT_STARTED: ('was not run', 'GREY40'),
job.Status.SUCCESS: ('succeeded', 'DARK_GREEN'),
job.Status.FAILED: ('failed', 'DARK_RED')
}[job.get_status()]
job.Status.FAILED: ('failed', 'DARK_RED'),
job.Status.TIMEOUT: ('timed out', 'YELLOW') }[job.get_status()]
text = job.name + ' ' + text

View File

@ -5,7 +5,7 @@ from trick.unit_test import *
def main():
trick.var_server_set_port(4000)
trick.var_server_set_port(40000)
trick.var_ascii()
trick.real_time_enable()
trick.exec_set_software_frame(0.01)
@ -15,7 +15,7 @@ def main():
trick.var_server_create_udp_socket('', 48000)
trick.var_server_create_multicast_socket('224.10.10.10','', 47000)
trick.exec_set_terminate_time(1000.0)
trick.exec_set_terminate_time(100.0)
varServerPort = trick.var_server_get_port()
test_output = ( os.getenv("TRICK_HOME") + "/trick_test/SIM_test_varserv.xml" )

View File

@ -12,7 +12,7 @@
#include <cmath>
#include <ctype.h>
#include <pwd.h>
#include <netdb.h>
#include <gtest/gtest.h>
#include "trick/var_binary_parser.hh"
@ -44,20 +44,14 @@ class Socket {
return -1;
}
int value = 1;
if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &value, (socklen_t) sizeof(value)) < 0) {
std::cout << "init_multicast: Socket option failed" << std::endl;
return -1;
}
if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEPORT, (char *) &value, sizeof(value)) < 0) {
perror("setsockopt: reuseport");
}
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port); // convert to weird network byte format
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0) {
std::cout << "Invalid address/ Address not supported" << std::endl;
return -1;
}
tries = 0;
int connection_status;
@ -81,7 +75,8 @@ class Socket {
_port = port;
int tries = 0;
_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
while ((_socket_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 && tries < max_retries) tries++;
if (_socket_fd < 0) {
std::cout << "init_multicast: Socket open failed" << std::endl;
return -1;
@ -93,10 +88,6 @@ class Socket {
return -1;
}
if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEPORT, (char *) &value, sizeof(value)) < 0) {
perror("setsockopt: reuseport");
}
struct ip_mreq mreq;
// Use setsockopt() to request that the kernel join a multicast group
mreq.imr_multiaddr.s_addr = inet_addr(_hostname.c_str());
@ -109,10 +100,9 @@ class Socket {
struct sockaddr_in sockin ;
// Set up local interface
// We must bind to the multicast address
// Set up destination address
sockin.sin_family = AF_INET;
sockin.sin_addr.s_addr = inet_addr(_hostname.c_str());
sockin.sin_addr.s_addr = htonl(INADDR_ANY);
sockin.sin_port = htons(_port);
if ( bind(_socket_fd, (struct sockaddr *) &sockin, (socklen_t) sizeof(sockin)) < 0 ) {
@ -120,14 +110,6 @@ class Socket {
return -1;
}
char loopch = 1;
if(setsockopt(_socket_fd, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch)) < 0)
{
perror("Setting IP_MULTICAST_LOOP error");
return -1;
}
_initialized = true;
return 0;
}
@ -163,25 +145,6 @@ class Socket {
ret = receive();
}
std::vector<unsigned char> receive_bytes() {
unsigned char buffer[SOCKET_BUF_SIZE];
int numBytes = recv(_socket_fd, buffer, SOCKET_BUF_SIZE, 0);
if (numBytes < 0) {
std::cout << "Failed to read from socket" << std::endl;
}
std::vector<unsigned char> bytes;
for (int i = 0; i < numBytes; i++) {
bytes.push_back(buffer[i]);
}
return bytes;
}
void operator>> (std::string& ret) {
ret = receive();
}
std::vector<unsigned char> receive_bytes() {
unsigned char buffer[SOCKET_BUF_SIZE];
int numBytes = recv(_socket_fd, buffer, SOCKET_BUF_SIZE, 0);
@ -228,6 +191,7 @@ class Socket {
int _socket_fd;
bool _initialized;
bool _multicast_socket;
};
class VariableServerTest : public ::testing::Test {
@ -303,42 +267,12 @@ class VariableServerTestAltListener : public ::testing::Test {
static int numSession;
};
#ifndef __APPLE__
class VariableServerTestMulticast : public ::testing::Test {
protected:
VariableServerTestMulticast() {
socket_status = socket.init("", 47000, SOCK_DGRAM);
multicast_listener.init_multicast("224.10.10.10", 47000);
if (socket_status == 0) {
std::stringstream request;
request << "trick.var_set_client_tag(\"multicast_VSTest";
request << numSession++;
request << "\") \n";
socket << request.str();
}
}
~VariableServerTestMulticast() {
socket.close();
multicast_listener.close();
}
Socket socket;
Socket multicast_listener;
int socket_status;
static int numSession;
};
int VariableServerTestMulticast::numSession = 0;
#endif
int VariableServerTest::numSession = 0;
int VariableServerUDPTest::numSession = 0;
int VariableServerTestAltListener::numSession = 0;
/**********************************************************/
/* Helpful constants and functions */
/**********************************************************/
@ -411,108 +345,6 @@ void load_checkpoint (Socket& socket, const std::string& checkpoint_name) {
socket << "trick.var_unpause()\n";
}
/*****************************************/
/* Multicast Test */
/*****************************************/
#ifndef __APPLE__
TEST_F (VariableServerTestMulticast, Strings) {
if (socket_status != 0) {
FAIL();
}
std::string reply;
socket << "trick.var_send_once(\"vsx.vst.o\")\n";
std::string expected("5\tYou will rejoice to hear that no disaster has accompanied the commencement of an enterprise which you have regarded with such evil forebodings. I arrived here yesterday, and my first task is to assure my dear sister of my welfare and increasing confidence in the success of my undertaking.");
multicast_listener >> reply;
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0);
expected = std::string("5\tI am already far north of London, and as I walk in the streets of Petersburgh, I feel a cold northern breeze play upon my cheeks, which braces my nerves and fills me with delight. Do you understand this feeling?");
socket << "trick.var_send_once(\"vsx.vst.p\")\n";
multicast_listener >> reply;
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0);
// TODO: Does wchar actually work?
// expected = std::string("5\tThis breeze, which has travelled from the regions towards which I am advancing, gives me a foretaste of those icy climes. Inspirited by this wind of promise, my daydreams become more fervent and vivid.");
// socket << "trick.var_send_once(\"vsx.vst.q\")\n";
// socket >> reply;
// std::cout << "\tExpected: " << expected << "\n\tActual: " << reply << std::endl;
// EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0);
}
TEST_F (VariableServerTestMulticast, AddRemove) {
if (socket_status != 0) {
FAIL();
}
std::string reply;
std::string expected;
int max_tries = 3;
int tries = 0;
socket << "trick.var_add(\"vsx.vst.c\")\n";
multicast_listener >> reply;
expected = std::string("0 -1234");
tries = 0;
while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) {
multicast_listener >> reply;
}
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply;
multicast_listener >> reply;
tries = 0;
while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) {
multicast_listener >> reply;
}
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply;
socket << "trick.var_add(\"vsx.vst.m\")\n";
multicast_listener >> reply;
expected = std::string("0 -1234 1");
tries = 0;
while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) {
multicast_listener >> reply;
}
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply;
socket << "trick.var_remove(\"vsx.vst.m\")\n";
multicast_listener >> reply;
expected = std::string("0 -1234");
tries = 0;
while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) {
multicast_listener >> reply;
}
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply;
socket << "trick.var_add(\"vsx.vst.n\")\n";
multicast_listener >> reply;
expected = std::string("0 -1234 0,1,2,3,4");
tries = 0;
while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) {
multicast_listener >> reply;
}
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply;
socket << "trick.var_exit()\n";
}
#endif
/************************************/
/* UDP Tests */
/************************************/
@ -601,7 +433,6 @@ TEST_F (VariableServerTestAltListener, Strings) {
socket >> reply;
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0);
}
TEST_F (VariableServerTestAltListener, AddRemove) {
@ -679,10 +510,6 @@ TEST_F (VariableServerTestAltListener, RestartAndSet) {
/* Normal case tests */
/*********************************************/
void spin (Socket& socket, int wait_cycles = 5) {
socket.receive();
}
TEST_F (VariableServerTest, Strings) {
if (socket_status != 0) {
FAIL();
@ -749,42 +576,7 @@ TEST_F (VariableServerTest, NoExtraTab) {
EXPECT_STREQ(reply.c_str(), expected.c_str());
}
TEST_F (VariableServerTest, NoExtraTab) {
if (socket_status != 0) {
FAIL();
}
std::string reply;
std::string expected;
socket << "trick.var_add(\"vsx.vst.c\")\n";
socket >> reply;
expected = std::string("0\t-1234\n");
EXPECT_STREQ(reply.c_str(), expected.c_str());
socket >> reply;
EXPECT_STREQ(reply.c_str(), expected.c_str());
socket << "trick.var_add(\"vsx.vst.m\")\n";
socket >> reply;
expected = std::string("0\t-1234\t1\n");
EXPECT_STREQ(reply.c_str(), expected.c_str());
socket << "trick.var_remove(\"vsx.vst.m\")\n";
socket >> reply;
expected = std::string("0\t-1234\n");
socket << "trick.var_add(\"vsx.vst.n\")\n";
socket >> reply;
expected = std::string("0\t-1234\t0,1,2,3,4\n");
EXPECT_STREQ(reply.c_str(), expected.c_str());
}
TEST_F (VariableServerTest, DISABLED_AddRemove) {
TEST_F (VariableServerTest, AddRemove) {
if (socket_status != 0) {
FAIL();
}
@ -811,10 +603,6 @@ TEST_F (VariableServerTest, DISABLED_AddRemove) {
socket << "trick.var_remove(\"vsx.vst.m\")\n";
socket >> reply;
expected = std::string("0 -1234");
socket << "trick.var_add(\"vsx.vst.n\")\n";
socket >> reply;
expected = std::string("0 -1234 0,1,2,3,4");
EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0);
@ -1115,7 +903,7 @@ TEST_F (VariableServerTest, Multicast) {
FAIL() << "Multicast Socket failed to initialize.";
}
int max_multicast_tries = 100;
int max_multicast_tries = 10000;
int tries = 0;
bool found = false;
@ -1263,18 +1051,7 @@ TEST_F (VariableServerTest, Freeze) {
ASSERT_EQ(mode, MODE_RUN);
}
<<<<<<< HEAD
<<<<<<< HEAD
=======
<<<<<<< HEAD
>>>>>>> 6fe76071 (Add multicast test)
TEST_F (VariableServerTest, DISABLED_CopyAndWriteModes) {
=======
=======
>>>>>>> d158199e (Added var_send_list_size test, various other fixes)
TEST_F (VariableServerTest, CopyAndWriteModes) {
>>>>>>> f9665aba (Fix and test var_send_stdio)
if (socket_status != 0) {
FAIL();
}
@ -1357,6 +1134,8 @@ TEST_F (VariableServerTest, CopyAndWriteModes) {
command = "trick.var_clear()\n" + test_vars_command + "trick.var_set_write_mode(1)\ntrick.var_add(\"vsx.vst.e\")\ntrick.var_add(\"vsx.vst.f\")\ntrick.var_unpause()\n";
socket << command;
spin(socket);
parse_message(socket.receive());
expected = "-123456 123456";
EXPECT_EQ(strcmp_IgnoringWhiteSpace(vars, expected), 0) << "Received: " << vars << " Expected: " << expected;
@ -1384,7 +1163,7 @@ TEST_F (VariableServerTest, CopyAndWriteModes) {
socket << command;
// Same issue as copy mode 1 write mode 0
// spin();
spin(socket);
parse_message(socket.receive());
expected = "-1234567 123456789";
EXPECT_EQ(strcmp_IgnoringWhiteSpace(vars, expected), 0) << "Received: " << vars << " Expected: " << expected;
@ -1410,6 +1189,8 @@ TEST_F (VariableServerTest, CopyAndWriteModes) {
command = "trick.var_clear()\n" + test_vars_command + "trick.var_set_copy_mode(2)\ntrick.var_set_write_mode(1)\ntrick.var_set_frame_multiple(" + std::to_string(frame_multiple) + ")\ntrick.var_set_frame_offset(" + std::to_string(frame_offset) + ")\ntrick.var_add(\"vsx.vst.i\")\ntrick.var_add(\"vsx.vst.j\")\ntrick.var_unpause()\n";
socket << command;
spin(socket);
expected = "1234.5677 -1234.56789";
parse_message(socket.receive());
EXPECT_EQ(strcmp_IgnoringWhiteSpace(vars, expected), 0) << "Received: " << vars << " Expected: " << expected;
@ -1472,38 +1253,6 @@ TEST_F (VariableServerTest, send_stdio) {
std::stringstream message_stream(message);
std::string token;
<<<<<<< HEAD
=======
std::getline(message_stream, token, ' ');
int message_type = stoi(token);
std::getline(message_stream, token, ' ');
int stream_num = stoi(token);
std::getline(message_stream, token, '\n');
int text_size = stoi(token);
std::string text;
std::getline(message_stream, text);
EXPECT_EQ (message_type, 4);
EXPECT_EQ (stream_num, 1);
EXPECT_EQ (text_size, 41);
EXPECT_EQ(text, std::string("This message should redirect to varserver"));
}
TEST_F (VariableServerTest, RestartAndSet) {
if (socket_status != 0) {
FAIL();
}
std::string reply;
std::string expected;
socket << "trick.var_add(\"vsx.vst.c\")\n";
socket >> reply;
expected = std::string("0\t-1234\n");
EXPECT_EQ(reply, expected);
>>>>>>> f9665aba (Fix and test var_send_stdio)
int message_type;
int stream_num;
@ -1528,234 +1277,6 @@ TEST_F (VariableServerTest, RestartAndSet) {
}
#ifndef __APPLE__
<<<<<<< HEAD
<<<<<<< HEAD
TEST_F (VariableServerTest, MulticastAfterRestart) {
=======
=======
TEST_F (VariableServerTest, MulticastAfterRestart) {
if (socket_status != 0) {
FAIL();
}
socket << "trick.var_server_set_user_tag(\"VSTestServer\")\n";
Socket multicast_socket;
multicast_socket.init_multicast("224.3.14.15", 9265);
int max_multicast_tries = 100;
int tries = 0;
bool found = false;
char expected_hostname[80];
gethostname(expected_hostname, 80);
int expected_port = 4000;
// get expected username
struct passwd *passp = getpwuid(getuid()) ;
char * expected_username;
if ( passp == NULL ) {
expected_username = strdup("unknown") ;
} else {
expected_username = strdup(passp->pw_name) ;
}
// Don't care about PID, just check that it's > 0
char * expected_sim_dir = "trick/test/SIM_test_varserv"; // Compare against the end of the string for this one
// Don't care about cmdline name
char * expected_input_file = "RUN_test/unit_test.py";
// Don't care about trick_version
char * expected_tag = "VSTestServer";
// Variables to be populated by the multicast message
char actual_hostname[80];
unsigned short actual_port = 0;
char actual_username[80];
int actual_pid = 0;
char actual_sim_dir[80];
char actual_cmdline_name[80];
char actual_input_file[80];
char actual_trick_version[80];
char actual_tag[80];
unsigned short actual_duplicate_port = 0;
while (!found && tries++ < max_multicast_tries) {
std::string broadcast_data = multicast_socket.receive();
sscanf(broadcast_data.c_str(), "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , actual_hostname, &actual_port ,
actual_username , &actual_pid , actual_sim_dir , actual_cmdline_name ,
actual_input_file , actual_trick_version , actual_tag, &actual_duplicate_port) ;
if (strcmp(actual_hostname, expected_hostname) == 0 && strcmp(expected_tag, actual_tag) == 0) {
found = true;
EXPECT_STREQ(actual_hostname, expected_hostname);
EXPECT_EQ(actual_port, expected_port);
EXPECT_STREQ(actual_username, expected_username);
EXPECT_GT(actual_pid, 0);
std::string expected_sim_dir_str(expected_sim_dir);
std::string actual_sim_dir_str(actual_sim_dir);
std::string end_of_actual = actual_sim_dir_str.substr(actual_sim_dir_str.length() - expected_sim_dir_str.length(), actual_sim_dir_str.length());
EXPECT_EQ(expected_sim_dir_str, end_of_actual);
EXPECT_STREQ(actual_input_file, expected_input_file);
EXPECT_STREQ(actual_tag, expected_tag);
EXPECT_EQ(actual_duplicate_port, expected_port);
}
}
if (!found)
FAIL() << "Multicast message never received";
}
>>>>>>> 6490e49d (Add more tests, error handling)
TEST_F (VariableServerTest, LargeMessages) {
>>>>>>> f9665aba (Fix and test var_send_stdio)
if (socket_status != 0) {
FAIL();
}
socket << "trick.var_server_set_user_tag(\"VSTestServer\")\n";
<<<<<<< HEAD
Socket multicast_socket;
if (multicast_socket.init_multicast("224.3.14.15", 9265) != 0) {
FAIL() << "Multicast Socket failed to initialize.";
}
int max_multicast_tries = 100;
int tries = 0;
bool found = false;
char expected_hostname[80];
gethostname(expected_hostname, 80);
int expected_port = 40000;
// get expected username
struct passwd *passp = getpwuid(getuid()) ;
char * expected_username;
if ( passp == NULL ) {
expected_username = strdup("unknown") ;
} else {
expected_username = strdup(passp->pw_name) ;
}
// Don't care about PID, just check that it's > 0
char * expected_sim_dir = "trick/test/SIM_test_varserv"; // Compare against the end of the string for this one
// Don't care about cmdline name
char * expected_input_file = "RUN_test/unit_test.py";
// Don't care about trick_version
char * expected_tag = "VSTestServer";
// Variables to be populated by the multicast message
char actual_hostname[80];
unsigned short actual_port = 0;
char actual_username[80];
int actual_pid = 0;
char actual_sim_dir[80];
char actual_cmdline_name[80];
char actual_input_file[80];
char actual_trick_version[80];
char actual_tag[80];
unsigned short actual_duplicate_port = 0;
=======
socket << "trick.var_pause()\n";
for (int i = 0; i < 4000; i++) {
std::string var_add = "trick.var_add(\"vsx.vst.large_arr[" + std::to_string(i) + "]\")\n";
socket << var_add;
}
// Message size limit is 8192
// Message size should be somewhere between the limit and limit-5 due to size of variable
const static int msg_size_limit = 8192;
auto get_last_number_in_string = [](const std::string& s) -> int {
int index = s.size() - 1;
while (!isdigit(s.at(index))) { index--; }
int num = 0;
int exp = 0;
while (isdigit(s.at(index))) {
num += (s.at(index) - '0') * pow(10, exp);
index--;
exp++;
}
return num;
};
auto get_first_number_in_string = [](const std::string& s) -> int {
std::stringstream message_stream(s);
std::string token;
std::getline(message_stream, token, '\t');
if (token.size() == 0) { std::getline(message_stream, token, '\t'); }
int ret = stoi(token);
return ret;
};
socket.clear_buffered_data();
bool ready = false;
while (!ready) {
socket << "trick.var_send_list_size()\n";
socket >> reply;
if (reply == std::string("3\t4000\n"))
ready = true;
}
int new_reply_first = 0;
int prev_reply_last = 0;
socket << "trick.var_send()\n";
socket >> reply;
new_reply_first = get_first_number_in_string(reply);
prev_reply_last = get_last_number_in_string(reply);
EXPECT_EQ(new_reply_first, 0);
EXPECT_TRUE(reply.size() <= msg_size_limit && reply.size() >= msg_size_limit-5);
while (prev_reply_last != 3999) {
socket >> reply;
new_reply_first = get_first_number_in_string(reply);
EXPECT_TRUE(reply.size() <= msg_size_limit);
EXPECT_EQ(prev_reply_last + 1, new_reply_first);
>>>>>>> d158199e (Added var_send_list_size test, various other fixes)
<<<<<<< HEAD
while (!found && tries++ < max_multicast_tries) {
std::string broadcast_data = multicast_socket.receive();
sscanf(broadcast_data.c_str(), "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , actual_hostname, &actual_port ,
actual_username , &actual_pid , actual_sim_dir , actual_cmdline_name ,
actual_input_file , actual_trick_version , actual_tag, &actual_duplicate_port) ;
if (strcmp(actual_hostname, expected_hostname) == 0 && strcmp(expected_tag, actual_tag) == 0) {
found = true;
EXPECT_STREQ(actual_hostname, expected_hostname);
EXPECT_EQ(actual_port, expected_port);
EXPECT_STREQ(actual_username, expected_username);
EXPECT_GT(actual_pid, 0);
std::string expected_sim_dir_str(expected_sim_dir);
std::string actual_sim_dir_str(actual_sim_dir);
std::string end_of_actual = actual_sim_dir_str.substr(actual_sim_dir_str.length() - expected_sim_dir_str.length(), actual_sim_dir_str.length());
EXPECT_EQ(expected_sim_dir_str, end_of_actual);
EXPECT_STREQ(actual_input_file, expected_input_file);
EXPECT_STREQ(actual_tag, expected_tag);
EXPECT_EQ(actual_duplicate_port, expected_port);
}
=======
prev_reply_last = get_last_number_in_string(reply);
>>>>>>> 6490e49d (Add more tests, error handling)
}
if (!found)
FAIL() << "Multicast message never received";
}
<<<<<<< HEAD
#endif
=======
>>>>>>> f9665aba (Fix and test var_send_stdio)
TEST_F (VariableServerTest, Binary) {
if (socket_status != 0) {
FAIL();
@ -1918,7 +1439,7 @@ int main(int argc, char **argv) {
int result = RUN_ALL_TESTS();
Socket socket;
socket.init("localhost", 4000);
socket.init("localhost", 40000);
if (result == 0) {
// Success
@ -1932,4 +1453,4 @@ int main(int argc, char **argv) {
socket << "trick.stop() \n";
return result;
}
}

View File

@ -6,6 +6,7 @@
#include "trick/message_type.h"
#include "trick/tc_proto.h"
#include "trick/exec_proto.h"
#include "trick/SysThread.hh"
/** @par Detailed Design: */
void Trick::MonteCarlo::master_shutdown() {
@ -41,6 +42,8 @@ void Trick::MonteCarlo::master_shutdown() {
except_return = -2 ;
}
SysThread::ensureAllShutdown();
exit(except_return);
}

View File

@ -1,5 +1,7 @@
#include "trick/MonteCarlo.hh"
#include "trick/SysThread.hh"
/** @par Detailed Design: */
void Trick::MonteCarlo::slave_shutdown() {
@ -8,6 +10,8 @@ void Trick::MonteCarlo::slave_shutdown() {
/** <li> Run the shutdown jobs and exit. */
run_queue(&slave_shutdown_queue, "in slave_shutdown queue") ;
SysThread::ensureAllShutdown();
exit(0);
}
@ -15,6 +19,9 @@ void Trick::MonteCarlo::slave_shutdown() {
void Trick::MonteCarlo::slave_die() {
/** <ul><li> Kill any active child process executing a run and exit immediately. */
slave_kill_run();
SysThread::ensureAllShutdown();
exit(1);
}

View File

@ -9,11 +9,13 @@
#endif
#include <signal.h>
#include <algorithm>
#include <time.h>
#include "trick/SysThread.hh"
bool Trick::SysThread::shutdown_finished = false;
// Construct On First Use to avoid the Static Initialization Fiasco
pthread_mutex_t& Trick::SysThread::list_mutex() {
static pthread_mutex_t list_mutex = PTHREAD_MUTEX_INITIALIZER;
@ -30,7 +32,7 @@ std::vector<Trick::SysThread *>& Trick::SysThread::all_sys_threads() {
return all_sys_threads;
}
Trick::SysThread::SysThread(std::string in_name, bool sd) : self_deleting(sd), ThreadBase(in_name) {
Trick::SysThread::SysThread(std::string in_name) : ThreadBase(in_name) {
pthread_mutex_lock(&(list_mutex()));
all_sys_threads().push_back(this);
pthread_mutex_unlock(&(list_mutex()));
@ -41,9 +43,6 @@ Trick::SysThread::~SysThread() {
pthread_mutex_lock(&(list_mutex()));
if (!shutdown_finished) {
all_sys_threads().erase(std::remove(all_sys_threads().begin(), all_sys_threads().end(), this), all_sys_threads().end());
if (all_sys_threads().size() == 0) {
pthread_cond_signal(&(list_empty_cv()));
}
}
pthread_mutex_unlock(&(list_mutex()));
}
@ -51,25 +50,17 @@ Trick::SysThread::~SysThread() {
int Trick::SysThread::ensureAllShutdown() {
pthread_mutex_lock(&(list_mutex()));
// Cancel all threads
for (SysThread * thread : all_sys_threads()) {
thread->cancel_thread();
}
auto it = all_sys_threads().begin();
while (it != all_sys_threads().end()){
SysThread * thread = *it;
if (!(thread->self_deleting)) {
thread->join_thread();
it = all_sys_threads().erase(it);
} else {
++it;
}
}
while (all_sys_threads().size() != 0) {
pthread_cond_wait(&(list_empty_cv()), &(list_mutex()));
// Join all threads
for (SysThread * thread : all_sys_threads()) {
thread->join_thread();
}
// Success!
shutdown_finished = true;
pthread_mutex_unlock(&(list_mutex()));

View File

@ -17,8 +17,12 @@ Trick::ThreadBase::ThreadBase(std::string in_name) :
name(in_name) ,
pthread_id(0) ,
pid(0) ,
rt_priority(0)
rt_priority(0),
created(false),
should_shutdown(false),
cancellable(true)
{
pthread_mutex_init(&shutdown_mutex, NULL);
#if __linux
max_cpu = sysconf( _SC_NPROCESSORS_ONLN ) ;
#ifdef CPU_ALLOC
@ -274,12 +278,19 @@ int Trick::ThreadBase::execute_priority() {
int Trick::ThreadBase::create_thread() {
if (created) {
message_publish(MSG_ERROR, "create_thread called on thread %s (%p) which has already been started.\n", name.c_str(), this);
return 0;
}
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
pthread_create(&pthread_id, &attr, Trick::ThreadBase::thread_helper , (void *)this);
created = true;
#if __linux
#ifdef __GNUC__
#if __GNUC__ >= 4 && __GNUC_MINOR__ >= 2
@ -294,19 +305,56 @@ int Trick::ThreadBase::create_thread() {
}
int Trick::ThreadBase::cancel_thread() {
pthread_mutex_lock(&shutdown_mutex);
should_shutdown = true;
pthread_mutex_unlock(&shutdown_mutex);
if ( pthread_id != 0 ) {
pthread_cancel(pthread_id) ;
if (cancellable)
pthread_cancel(pthread_id) ;
}
return(0) ;
}
int Trick::ThreadBase::join_thread() {
if ( pthread_id != 0 ) {
pthread_join(pthread_id, NULL) ;
if ((errno = pthread_join(pthread_id, NULL)) != 0) {
std::string msg = "Thread " + name + " had an error in join";
perror(msg.c_str());
} else {
pthread_id = 0;
}
}
return(0) ;
}
void Trick::ThreadBase::test_shutdown() {
test_shutdown (NULL, NULL);
}
void Trick::ThreadBase::test_shutdown(void (*exit_handler) (void *), void * exit_arg) {
pthread_mutex_lock(&shutdown_mutex);
if (should_shutdown) {
pthread_mutex_unlock(&shutdown_mutex);
thread_shutdown(exit_handler, exit_arg);
}
pthread_mutex_unlock(&shutdown_mutex);
}
void Trick::ThreadBase::thread_shutdown() {
thread_shutdown (NULL, NULL);
}
void Trick::ThreadBase::thread_shutdown(void (*exit_handler) (void *), void * exit_arg) {
if (exit_handler != NULL) {
exit_handler(exit_arg);
}
pthread_exit(0);
}
void * Trick::ThreadBase::thread_helper( void * context ) {
sigset_t sigs;

View File

@ -1116,7 +1116,7 @@ object_${TRICK_HOST_CPU}/VariableServerSession.o: VariableServerSession.cpp \
${TRICK_HOME}/include/trick/TrickConstant.hh \
${TRICK_HOME}/include/trick/tc_proto.h
object_${TRICK_HOST_CPU}/ClientListener.o: ClientListener.cpp \
${TRICK_HOME}/include/trick/TCConnection.hh \
${TRICK_HOME}/include/trick/TCPConnection.hh \
${TRICK_HOME}/include/trick/ClientConnection.hh \
${TRICK_HOME}/include/trick/ClientListener.hh \
${TRICK_HOME}/include/trick/tc.h \

View File

@ -15,7 +15,7 @@
#include "trick/UdUnits.hh"
#include "trick/bitfield_proto.h"
#include "trick/trick_byteswap.h"
#include "trick/tc_proto.h"
// #include "trick/tc_proto.h"
// Static variables to be addresses that are known to be the error ref address

View File

@ -10,16 +10,22 @@
#include "trick/message_proto.h"
#include "trick/message_type.h"
#define MAX_MACHINE_NAME 80
Trick::VariableServerListenThread::VariableServerListenThread() :
Trick::SysThread("VarServListen"),
requested_port(0),
requested_source_address(""),
user_requested_address(false),
broadcast(true),
listener()
{
pthread_mutex_init(&restart_pause, NULL);
char hname[MAX_MACHINE_NAME];
gethostname(hname, MAX_MACHINE_NAME);
requested_source_address = std::string(hname);
cancellable = false;
}
Trick::VariableServerListenThread::~VariableServerListenThread() {
@ -31,7 +37,10 @@ Trick::VariableServerListenThread::~VariableServerListenThread() {
const char * Trick::VariableServerListenThread::get_hostname() {
const char * ret = listener.getHostname();
std::string hostname = listener.getHostname();
char * ret = (char *) malloc(hostname.length() + 1);
strncpy(ret, hostname.c_str(), hostname.length());
ret[hostname.length()] = '\0';
return ret;
}
@ -81,44 +90,45 @@ void Trick::VariableServerListenThread::set_broadcast(bool in_broadcast) {
int Trick::VariableServerListenThread::init_listen_device() {
int ret = listener.initialize();
requested_port = listener.getPort();
requested_source_address = std::string(listener.getHostname());
user_requested_address = true;
return ret;
}
int Trick::VariableServerListenThread::check_and_move_listen_device() {
int ret ;
if ( user_requested_address ) {
/* The user has requested a different source address or port in the input file */
listener.disconnect();
ret = listener.initialize(requested_source_address, requested_port);
requested_port = listener.getPort();
requested_source_address = std::string(listener.getHostname());
if (ret != TC_SUCCESS) {
std::cout << "Unsuccessful initialization " << std::endl;
message_publish(MSG_ERROR, "ERROR: Could not establish variable server source_address %s: port %d. Aborting.\n",
requested_source_address.c_str(), requested_port);
return -1 ;
}
/* The user has requested a different source address or port in the input file */
listener.disconnect();
ret = listener.initialize(requested_source_address, requested_port);
requested_port = listener.getPort();
requested_source_address = listener.getHostname();
if (ret != 0) {
message_publish(MSG_ERROR, "ERROR: Could not establish variable server source_address %s: port %d. Aborting.\n",
requested_source_address.c_str(), requested_port);
return -1 ;
}
return 0 ;
}
void Trick::VariableServerListenThread::create_tcp_socket(const char * address, unsigned short in_port ) {
listener.initialize(address, in_port);
requested_source_address = listener.getHostname();
requested_port = listener.getPort();
user_requested_address = true;
message_publish(MSG_INFO, "Created TCP variable server %s: %d\n", requested_source_address.c_str(), in_port);
}
void * Trick::VariableServerListenThread::thread_body() {
// This thread listens for incoming client connections, and when one is received, creates a new thread to handle the session
// Also broadcasts on multicast channel
test_shutdown();
std::string version = std::string(exec_get_current_version()) ;
version.erase(version.find_last_not_of(" \t\f\v\n\r")+1);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
// get username to broadcast on multicast channel
struct passwd *passp = getpwuid(getuid()) ;
std::string user_name;
@ -128,38 +138,49 @@ void * Trick::VariableServerListenThread::thread_body() {
user_name = strdup(passp->pw_name) ;
}
listener.setBlockMode(TC_COMM_BLOCKIO);
listener.setBlockMode(true);
if ( broadcast ) {
initializeMulticast();
}
while (1) {
// Quit here if it's time
test_shutdown();
// Look for a new client requesting a connection
if (listener.checkForNewConnections()) {
// pause here during restart
pthread_mutex_lock(&restart_pause) ;
// Recheck - sometimes we get false positive if something happens during restart
if (!listener.checkForNewConnections()) {
pthread_mutex_unlock(&restart_pause) ;
continue;
}
// Create a new thread to service this connection
VariableServerThread * vst = new Trick::VariableServerThread(&listener) ;
VariableServerThread * vst = new Trick::VariableServerThread() ;
vst->open_tcp_connection(&listener) ;
vst->copy_cpus(get_cpus()) ;
vst->create_thread() ;
vst->wait_for_accept() ;
ConnectionStatus status = vst->wait_for_accept() ;
if (status == CONNECTION_FAIL) {
// If the connection failed, the thread will exit.
// Make sure it joins fully before deleting the vst object
vst->join_thread();
delete vst;
}
pthread_mutex_unlock(&restart_pause) ;
} else if ( broadcast ) {
<<<<<<< HEAD
snprintf(buf1 , sizeof(buf1), "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , listen_dev.hostname , (unsigned short)listen_dev.port ,
user_name , (int)getpid() , command_line_args_get_default_dir() , command_line_args_get_cmdline_name() ,
command_line_args_get_input_file() , version.c_str() , user_tag.c_str(), (unsigned short)listen_dev.port ) ;
=======
// Otherwise, broadcast on the multicast channel if enabled
char buf1[1024];
sprintf(buf1 , "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , listener.getHostname(), (unsigned short)listener.getPort() ,
user_name.c_str() , (int)getpid() , command_line_args_get_default_dir() , command_line_args_get_cmdline_name() ,
command_line_args_get_input_file() , version.c_str() , user_tag.c_str(), (unsigned short)listener.getPort() ) ;
>>>>>>> e8ba4328 (Encapsulated listen device into ClientListener)
std::string message = buf1;
@ -180,12 +201,16 @@ int Trick::VariableServerListenThread::restart() {
int ret ;
listener.restart();
if ( user_requested_address ) {
if (!listener.validateSourceAddress(requested_source_address)) {
requested_source_address.clear() ;
}
printf("variable server restart user_port requested set %s:%d\n",requested_source_address.c_str(), requested_port);
listener.disconnect();
ret = listener.initialize(requested_source_address, requested_port);
@ -195,6 +220,7 @@ int Trick::VariableServerListenThread::restart() {
}
} else {
listener.checkSocket();
printf("restart variable server message port = %d\n", listener.getPort());
}
initializeMulticast();
@ -213,6 +239,7 @@ void Trick::VariableServerListenThread::pause_listening() {
}
void Trick::VariableServerListenThread::restart_listening() {
listener.restart();
pthread_mutex_unlock(&restart_pause) ;
}

View File

@ -303,7 +303,7 @@ int Trick::VariableServerSession::send_list_size() {
memcpy(&(buf1[8]), &var_count, sizeof(var_count));
if (debug >= 2) {
message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending %d event variables\n", connection, connection->get_client_tag().c_str(), var_count);
message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending %d event variables\n", connection, connection->getClientTag().c_str(), var_count);
}
connection->write(buf1, sizeof (buf1));
@ -312,7 +312,7 @@ int Trick::VariableServerSession::send_list_size() {
write_string << VS_LIST_SIZE << "\t" << var_count << "\n";
// ascii
if (debug >= 2) {
message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending number of event variables:\n%s\n", connection, connection->get_client_tag().c_str(), write_string.str().c_str()) ;
message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending number of event variables:\n%s\n", connection, connection->getClientTag().c_str(), write_string.str().c_str()) ;
}
connection->write(write_string.str());
@ -331,7 +331,7 @@ int Trick::VariableServerSession::transmit_file(std::string sie_file) {
int ret ;
if (debug >= 2) {
message_publish(MSG_DEBUG,"%p tag=<%s> var_server opening %s.\n", connection, connection->get_client_tag().c_str(), sie_file.c_str()) ;
message_publish(MSG_DEBUG,"%p tag=<%s> var_server opening %s.\n", connection, connection->getClientTag().c_str(), sie_file.c_str()) ;
}
if ((fp = fopen(sie_file.c_str() , "r")) == NULL ) {
@ -351,8 +351,8 @@ int Trick::VariableServerSession::transmit_file(std::string sie_file) {
rewind(fp) ;
// Switch to blocking writes since this could be a large transfer.
if (connection->setBlockMode(TC_COMM_BLOCKIO)) {
message_publish(MSG_DEBUG,"Variable Server Error: Failed to set TCDevice to TC_COMM_BLOCKIO.\n");
if (connection->setBlockMode(true)) {
message_publish(MSG_DEBUG,"Variable Server Error: Failed to set socket to blocking mode.\n");
}
while ( current_size < file_size ) {
@ -368,8 +368,8 @@ int Trick::VariableServerSession::transmit_file(std::string sie_file) {
}
// Switch back to non-blocking writes.
if (connection->setBlockMode(TC_COMM_NOBLOCKIO)) {
message_publish(MSG_ERROR,"Variable Server Error: Failed to set TCDevice to TC_COMM_NOBLOCKIO.\n");
if (connection->setBlockMode(false)) {
message_publish(MSG_DEBUG,"Variable Server Error: Failed to set socket to non-blocking mode.\n");
return(-1);
}

View File

@ -3,23 +3,29 @@
#include <stdlib.h>
#include "trick/VariableServerThread.hh"
#include "trick/exec_proto.h"
#include "trick/message_proto.h"
#include "trick/message_type.h"
#include "trick/TrickConstant.hh"
#include "trick/UDPConnection.hh"
#include "trick/TCPConnection.hh"
Trick::VariableServer * Trick::VariableServerThread::vs = NULL ;
Trick::VariableServerThread::VariableServerThread(ClientListener * in_listen_dev) :
Trick::SysThread("VarServer") , debug(0),
listener(in_listen_dev), session(NULL) {
static int instance_num = 0;
Trick::VariableServerThread::VariableServerThread() :
Trick::SysThread(std::string("VarServer" + std::to_string(instance_num++))) , debug(0), session(NULL), connection(NULL) {
connection_status = CONNECTION_PENDING ;
connection.initialize();
pthread_mutex_init(&connection_status_mutex, NULL);
pthread_cond_init(&connection_status_cv, NULL);
pthread_mutex_init(&restart_pause, NULL);
cancellable = false;
}
Trick::VariableServerThread::~VariableServerThread() {}
@ -30,19 +36,23 @@ std::ostream& Trick::operator<< (std::ostream& s, Trick::VariableServerThread& v
socklen_t len = (socklen_t)sizeof(otherside);
s << " \"connection\":{\n";
s << " \"client_tag\":\"" << vst.connection.get_client_tag() << "\",\n";
s << " \"client_tag\":\"" << vst.connection->getClientTag() << "\",\n";
int err = getpeername(vst.connection.get_socket(), (struct sockaddr*)&otherside, &len);
// int err = getpeername(vst.connection->get_socket(), (struct sockaddr*)&otherside, &len);
if (err == 0) {
s << " \"client_IP_address\":\"" << inet_ntoa(otherside.sin_addr) << "\",\n";
s << " \"client_port\":\"" << ntohs(otherside.sin_port) << "\",\n";
} else {
s << " \"client_IP_address\":\"unknown\",\n";
s << " \"client_port\":\"unknown\",";
// if (err == 0) {
// s << " \"client_IP_address\":\"" << inet_ntoa(otherside.sin_addr) << "\",\n";
// s << " \"client_port\":\"" << ntohs(otherside.sin_port) << "\",\n";
// } else {
// s << " \"client_IP_address\":\"unknown\",\n";
// s << " \"client_port\":\"unknown\",";
// }
pthread_mutex_lock(&vst.connection_status_mutex);
if (vst.connection_status == CONNECTION_SUCCESS) {
s << *(vst.session);
}
s << *(vst.session);
pthread_mutex_unlock(&vst.connection_status_mutex);
s << " }" << std::endl;
return s;
@ -57,7 +67,26 @@ Trick::VariableServer * Trick::VariableServerThread::get_vs() {
}
void Trick::VariableServerThread::set_client_tag(std::string tag) {
connection.set_client_tag(tag);
connection->setClientTag(tag);
}
int Trick::VariableServerThread::open_udp_socket(const std::string& hostname, int port) {
UDPConnection * udp_conn = new UDPConnection();
int status = udp_conn->initialize(hostname, port);
connection = udp_conn;
if (status == 0) {
message_publish(MSG_INFO, "Created UDP variable server %s: %d\n", udp_conn->getHostname().c_str(), udp_conn->getPort());
}
return status;
}
int Trick::VariableServerThread::open_tcp_connection(ClientListener * listener) {
connection = listener->setUpNewConnection();
return 0;
}
Trick::ConnectionStatus Trick::VariableServerThread::wait_for_accept() {
@ -79,33 +108,52 @@ void Trick::VariableServerThread::preload_checkpoint() {
// Stop variable server processing at the top of the processing loop.
pthread_mutex_lock(&restart_pause);
// Let the thread complete any data copying it has to do
// and then suspend data copying until the checkpoint is reloaded.
pthread_mutex_lock(&(session->copy_mutex));
// Make sure that the session has been initialized
pthread_mutex_lock(&connection_status_mutex);
if (connection_status == CONNECTION_SUCCESS) {
// Save the pause state of this thread.
saved_pause_cmd = session->get_pause();
// Let the thread complete any data copying it has to do
// and then suspend data copying until the checkpoint is reloaded.
pthread_mutex_lock(&(session->copy_mutex));
// Save the pause state of this thread.
saved_pause_cmd = session->get_pause();
// Disallow data writing.
session->set_pause(true);
// Disallow data writing.
session->set_pause(true);
// Temporarily "disconnect" the variable references from Trick Managed Memory
// by tagging each as a "bad reference".
session->disconnect_references();
// Temporarily "disconnect" the variable references from Trick Managed Memory
// by tagging each as a "bad reference".
session->disconnect_references();
// Allow data copying to continue.
pthread_mutex_unlock(&(session->copy_mutex));
// Allow data copying to continue.
pthread_mutex_unlock(&(session->copy_mutex));
}
pthread_mutex_unlock(&connection_status_mutex);
}
// Gets called from the main thread as a job
void Trick::VariableServerThread::restart() {
// Set the pause state of this thread back to its "pre-checkpoint reload" state.
session->set_pause(saved_pause_cmd) ;
connection->restart();
pthread_mutex_lock(&connection_status_mutex);
if (connection_status == CONNECTION_SUCCESS) {
session->set_pause(saved_pause_cmd) ;
}
pthread_mutex_unlock(&connection_status_mutex);
// Restart the variable server processing.
pthread_mutex_unlock(&restart_pause);
}
void Trick::VariableServerThread::cleanup() {
connection->disconnect();
if (session != NULL)
delete session;
}

View File

@ -22,16 +22,19 @@ void exit_var_thread(void *in_vst) ;
void * Trick::VariableServerThread::thread_body() {
// Check for short running sims
test_shutdown(NULL, NULL);
// We need to make the thread to VariableServerThread map before we accept the connection.
// Otherwise we have a race where this thread is unknown to the variable server and the
// client gets confirmation that the connection is ready for communication.
vs->add_vst( pthread_self() , this ) ;
// Accept client connection
int accept_status = accept(listener, &connection);
if (accept_status != 0) {
int status = connection->start();
if (status != 0) {
// TODO: Use a real error handler
std::cout << "Accept failed, variable server session exiting" << std::endl;
vs->delete_vst(pthread_self());
// Tell main thread that we failed to initialize
@ -40,13 +43,12 @@ void * Trick::VariableServerThread::thread_body() {
pthread_cond_signal(&connection_status_cv);
pthread_mutex_unlock(&connection_status_mutex);
cleanup();
pthread_exit(NULL);
}
connection.setBlockMode(TC_COMM_ALL_OR_NOTHING);
// Create session
session = new VariableServerSession(&connection);
session = new VariableServerSession(connection);
vs->add_session( pthread_self(), session );
// Tell main that we are ready
@ -55,10 +57,6 @@ void * Trick::VariableServerThread::thread_body() {
pthread_cond_signal(&connection_status_cv);
pthread_mutex_unlock(&connection_status_mutex);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL) ;
pthread_cleanup_push(exit_var_thread, (void *) this) ;
// if log is set on for variable server (e.g., in input file), turn log on for each client
if (vs->get_log()) {
session->set_log_on();
@ -66,6 +64,9 @@ void * Trick::VariableServerThread::thread_body() {
try {
while (1) {
// Shutdown here if it's time
test_shutdown(exit_var_thread, (void *) this);
// Pause here if we are in a restart condition
pthread_mutex_lock(&restart_pause) ;
@ -75,6 +76,7 @@ void * Trick::VariableServerThread::thread_body() {
// Check to see if exit is necessary
if (session->exit_cmd == true) {
pthread_mutex_unlock(&restart_pause) ;
break;
}
@ -91,6 +93,7 @@ void * Trick::VariableServerThread::thread_body() {
if ( should_write_async && !session->get_pause()) {
int ret = session->write_data() ;
if ( ret < 0 ) {
pthread_mutex_unlock(&restart_pause) ;
break ;
}
}
@ -132,11 +135,11 @@ void * Trick::VariableServerThread::thread_body() {
}
if (debug >= 3) {
message_publish(MSG_DEBUG, "%p tag=<%s> var_server receive loop exiting\n", &connection, connection.get_client_tag().c_str());
message_publish(MSG_DEBUG, "%p tag=<%s> var_server receive loop exiting\n", connection, connection->getClientTag().c_str());
}
pthread_cleanup_pop(1);
pthread_exit(NULL) ;
thread_shutdown(exit_var_thread, this);
return NULL ;
}

View File

@ -8,20 +8,21 @@ int Trick::VariableServer::create_tcp_socket(const char * address, unsigned shor
new_listen_thread->create_tcp_socket(address, in_port) ;
new_listen_thread->copy_cpus(listen_thread.get_cpus()) ;
new_listen_thread->create_thread() ;
// additional_listen_threads[new_listen_thread->get_pthread_id()] = new_listen_thread ;
additional_listen_threads[new_listen_thread->get_pthread_id()] = new_listen_thread ;
return 0 ;
}
int Trick::VariableServer::create_udp_socket(const char * address, unsigned short in_port ) {
// int ret ;
// Trick::VariableServerThread * vst ;
// vst = new Trick::VariableServerThread(NULL) ;
// ret = vst->create_udp_socket(address, in_port) ;
// if ( ret == 0 ) {
// vst->copy_cpus(listen_thread.get_cpus()) ;
// vst->create_thread() ;
// }
// UDP sockets are created without a listen thread
int ret ;
Trick::VariableServerThread * vst ;
vst = new Trick::VariableServerThread() ;
ret = vst->open_udp_socket(address, in_port) ;
if ( ret == 0 ) {
vst->copy_cpus(listen_thread.get_cpus()) ;
vst->create_thread() ;
}
//vst->var_debug(3) ;
return 0 ;

View File

@ -9,10 +9,10 @@ int Trick::VariableServer::restart() {
if ( listen_thread.get_pthread_id() == 0 ) {
listen_thread.create_thread() ;
}
// std::map < pthread_t , VariableServerListenThread * >::iterator it ;
// for( it = additional_listen_threads.begin() ; it != additional_listen_threads.end() ; it++ ) {
// (*it).second->restart() ;
// }
std::map < pthread_t , VariableServerListenThread * >::iterator it ;
for( it = additional_listen_threads.begin() ; it != additional_listen_threads.end() ; it++ ) {
(*it).second->restart() ;
}
return 0 ;
}
@ -26,6 +26,9 @@ int Trick::VariableServer::suspendPreCheckpointReload() {
std::map<pthread_t, VariableServerThread*>::iterator pos ;
listen_thread.pause_listening() ;
for (const auto& listen_it : additional_listen_threads) {
listen_it.second->pause_listening();
}
pthread_mutex_lock(&map_mutex) ;
for ( pos = var_server_threads.begin() ; pos != var_server_threads.end() ; pos++ ) {
@ -50,6 +53,9 @@ int Trick::VariableServer::resumePostCheckpointReload() {
pthread_mutex_unlock(&map_mutex) ;
listen_thread.restart_listening() ;
for (const auto& listen_it : additional_listen_threads) {
listen_it.second->restart_listening();
}
return 0;
}

View File

@ -3,20 +3,16 @@
int Trick::VariableServer::shutdown() {
listen_thread.cancel_thread() ;
std::map < pthread_t , VariableServerThread * >::iterator it ;
for (const auto& listen_it : additional_listen_threads) {
listen_it.second->cancel_thread();
}
pthread_mutex_lock(&map_mutex) ;
std::vector<pthread_t> ids;
for ( it = var_server_threads.begin() ; it != var_server_threads.end() ; it++ ) {
(*it).second->cancel_thread() ;
ids.push_back((*it).first);
// cancelling causes each var_server_thread map element to be erased by the exit_var_thread function
for (const auto& it : var_server_threads) {
it.second->cancel_thread() ;
}
pthread_mutex_unlock(&map_mutex) ;
for (pthread_t id : ids) {
pthread_join(id, NULL);
}
return 0 ;
}

View File

@ -1,21 +1,19 @@
#include "trick/VariableServer.hh"
#include "trick/tc_proto.h"
void exit_var_thread(void *in_vst) {
Trick::VariableServerThread * vst = (Trick::VariableServerThread *) in_vst ;
Trick::VariableServer * vs = vst->get_vs() ;
// tc_disconnect(&vst->get_connection());
if (vst->get_pthread_id() != pthread_self()) {
std::cerr << "exit_var_thread must be called from the variable server thread" << std::endl;
}
vs->delete_session(vst->get_pthread_id());
// Tell the variable server that this thread is exiting.
vs->delete_vst(vst->get_pthread_id()) ;
// This will deleting the vst object from within the object itself. exit_var_thread
// is called from within Trick::VariableServerThread::thread_body.
// I am claiming this is safe as this is the last thing routing is the last thing
// that touches vst. The C++ FAQ says it's safe if you can say this. (Alex 6/9/14)
delete vst ;
vst->cleanup();
}

View File

@ -61,7 +61,7 @@ class TestConnection : public Trick::ClientConnection {
client_tag = tag;
}
int setBlockMode (int mode) {
int set_block_mode (int mode) {
return 0;
}

View File

@ -44,7 +44,6 @@ int master( int nargs, char **args) {
if ( ret == 0 ) {
exec->loop() ;
}
ret = exec->shutdown() ;
//TODO: add call to free all memory from memory manager

View File

@ -1 +1,10 @@
#include "trick/ClientConnection.hh"
#include "trick/ClientConnection.hh"
std::string Trick::ClientConnection::getClientTag () {
return _client_tag;
}
int Trick::ClientConnection::setClientTag (std::string tag) {
_client_tag = tag;
return 0;
}

View File

@ -1,115 +1,245 @@
#include <unistd.h>
#include <iostream>
#include <sys/select.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "trick/ClientListener.hh"
#include "trick/tc_proto.h"
Trick::ClientListener::ClientListener () : _listen_dev(), initialized(false) {
char hname[80];
gethostname(hname , (size_t) 80 ) ;
saved_source = std::string(hname);
strcpy(_listen_dev.client_tag, "<empty>");
tc_error(&_listen_dev, 0);
}
Trick::ClientListener::ClientListener () : ClientListener (new SystemInterface()) {}
Trick::ClientListener::ClientListener (SystemInterface * system_interface) : _listen_socket(-1), _hostname(""), _port(0), _client_tag("<empty>"), _initialized(false), _system_interface(system_interface) {}
Trick::ClientListener::~ClientListener () {
// Clean up our socket if initialized
if (_initialized) {
close (_listen_socket);
}
delete _system_interface;
}
int Trick::ClientListener::initialize(std::string hostname, int port) {
int ret = tc_init_with_connection_info(&_listen_dev, AF_INET, SOCK_STREAM, hostname.c_str(), port);
initialized = true;
return ret;
int Trick::ClientListener::initialize(std::string in_hostname, int in_port) {
if ((_listen_socket = _system_interface->socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror ("Server: Unable to open socket");
return LISTENER_ERROR;
}
int option_val = 1;
// Do not replicate the socket's handle if this process is forked
// Removing this will cause all kinds of problems when starting a variable server client via input file
_system_interface->fcntl(_listen_socket, F_SETFD, FD_CLOEXEC);
// Allow socket's bound address to be used by others
if (_system_interface->setsockopt(_listen_socket, SOL_SOCKET, SO_REUSEADDR, (const char *) &option_val, (socklen_t) sizeof(option_val)) != 0) {
perror("Server: Could not set socket to reuse addr");
_system_interface->close (_listen_socket);
return LISTENER_ERROR;
}
// Turn off data buffering on the send side
if (_system_interface->setsockopt(_listen_socket, IPPROTO_TCP, TCP_NODELAY, (const void *) &option_val, (socklen_t) sizeof(option_val)) != 0) {
perror("Server: Could not turn off data buffering");
_system_interface->close (_listen_socket);
return LISTENER_ERROR;
}
struct sockaddr_in s_in;
memset(&s_in, 0 , sizeof(struct sockaddr_in)) ;
// Look up the hostname
char name[80];
gethostname(name, (size_t) 80);
struct hostent *ip_host ;
socklen_t s_in_size = sizeof(s_in) ;
s_in.sin_family = AF_INET;
if (in_hostname == "" || in_hostname == "localhost" || strcmp(in_hostname.c_str(),name) == 0) {
s_in.sin_addr.s_addr = INADDR_ANY;
_hostname = std::string(name);
} else if ( inet_pton(AF_INET, in_hostname.c_str(), (struct in_addr *)&s_in.sin_addr.s_addr) == 1 ) {
/* numeric character string address */
_hostname = in_hostname;
} else if ( (ip_host = gethostbyname(in_hostname.c_str())) != NULL ) {
/* some name other than the default name was given */
memcpy((void *) &(s_in.sin_addr.s_addr), (const void *) ip_host->h_addr, (size_t) ip_host->h_length);
_hostname = in_hostname;
} else {
perror("Server: Could not determine source address");
return -1 ;
}
// Set port
s_in.sin_port = htons((short) in_port);
// Bind to socket
if (_system_interface->bind(_listen_socket, (struct sockaddr *)&s_in, sizeof(s_in)) < 0) {
perror("Server: Could not bind to socket");
_system_interface->close (_listen_socket);
return LISTENER_ERROR;
}
// Check that correct port was bound to
_system_interface->getsockname( _listen_socket , (struct sockaddr *)&s_in, &s_in_size) ;
int bound_port = ntohs(s_in.sin_port);
if (in_port != 0 && bound_port != in_port) {
std::cerr << "Server: Could not bind to requested port " << in_port << std::endl;
_system_interface->close(_listen_socket);
return LISTENER_ERROR;
}
// Save port number
_port = bound_port;
// Start listening
if (_system_interface->listen(_listen_socket, SOMAXCONN) < 0) {
std::string error_message = "Server: Could not listen on port " + std::to_string(_port);
perror (error_message.c_str());
_system_interface->close(_listen_socket);
return LISTENER_ERROR;
}
// Done!
_initialized = true;
return 0;
}
int Trick::ClientListener::initialize() {
int ret = tc_init(&_listen_dev);
if (ret != TC_SUCCESS) {
fprintf(stderr, "ERROR: Could not establish listen port for Variable Server. Aborting.\n");
ret = -1 ;
}
initialized = true;
return ret;
return initialize("", 0);
}
int Trick::ClientListener::setBlockMode(TCCommBlocking mode) {
if (!initialized)
int Trick::ClientListener::setBlockMode(bool blocking) {
if (!_initialized)
return -1;
return tc_blockio(&_listen_dev, mode);
int flag = _system_interface->fcntl(_listen_socket, F_GETFL, 0);
if (flag == -1) {
std::string error_message = "Unable to get flags for fd " + std::to_string(_listen_socket) + " block mode to " + std::to_string(blocking);
perror (error_message.c_str());
return -1;
}
if (blocking) {
flag &= ~O_NONBLOCK;
} else {
flag |= O_NONBLOCK;
}
if (_system_interface->fcntl(_listen_socket, F_SETFL, flag) == -1) {
std::string error_message = "Unable to set fd " + std::to_string(_listen_socket) + " block mode to " + std::to_string(blocking);
perror (error_message.c_str());
return -1;
}
return 0;
}
bool Trick::ClientListener::checkForNewConnections() {
if (!initialized)
return -1;
if (!_initialized)
return false;
fd_set rfds;
struct timeval timeout_time = { 2, 0 };
FD_ZERO(&rfds);
FD_SET(_listen_dev.socket, &rfds);
FD_SET(_listen_socket, &rfds);
timeout_time.tv_sec = 2 ;
select(_listen_dev.socket + 1, &rfds, NULL, NULL, &timeout_time);
return FD_ISSET(_listen_dev.socket, &rfds);
// Listen with a timeout of 2 seconds
int result = _system_interface->select(_listen_socket + 1, &rfds, NULL, NULL, &timeout_time);
// If there's some kind of error, just ignore it and return false
if (result < 0) {
return false;
}
return FD_ISSET(_listen_socket, &rfds);
}
const char * Trick::ClientListener::getHostname () {
if (!initialized)
std::string Trick::ClientListener::getHostname () {
if (!_initialized)
return "";
return _listen_dev.hostname;
return _hostname;
}
int Trick::ClientListener::getPort() {
if (!initialized)
if (!_initialized)
return -1;
return _listen_dev.port;
return _port;
}
int Trick::ClientListener::disconnect() {
if (!initialized)
if (!_initialized) {
return -1;
}
return tc_disconnect(&_listen_dev) ;
_system_interface->shutdown(_listen_socket, SHUT_RDWR);
_system_interface->close (_listen_socket);
_initialized = false;
return 0;
}
bool Trick::ClientListener::validateSourceAddress(std::string requested_source_address) {
char hname[80];
static struct sockaddr_in s_in;
gethostname(hname, (size_t) 80);
struct addrinfo hints, *res;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
// Test to see if the restart address is on this machine. If it is not, it's not an error
if ( strcmp( requested_source_address.c_str(), hname )) {
if (! inet_pton(AF_INET, requested_source_address.c_str(), (struct in_addr *)&s_in.sin_addr.s_addr) ) {
return false;
}
int err;
if ((err = _system_interface->getaddrinfo(requested_source_address.c_str(), 0, &hints, &res)) != 0) {
std::cerr << "Unable to lookup address: " << gai_strerror(err) << std::endl;
return false;
}
return true;
}
int Trick::ClientListener::checkSocket() {
if (!initialized)
if (!_initialized)
return -1;
struct sockaddr_in s_in;
int s_in_size = sizeof(s_in) ;
getsockname( _listen_dev.socket , (struct sockaddr *)&s_in, (socklen_t *)&s_in_size) ;
printf("restart variable server message port = %d\n" , ntohs(s_in.sin_port)) ;
_listen_dev.port = ntohs(s_in.sin_port);
socklen_t s_in_size = sizeof(s_in) ;
_system_interface->getsockname( _listen_socket , (struct sockaddr *)&s_in, &s_in_size) ;
_port = ntohs(s_in.sin_port);
return 0;
}
bool Trick::ClientListener::isInitialized() {
return initialized;
return _initialized;
}
Trick::TCPConnection * Trick::ClientListener::setUpNewConnection () {
if (!_initialized)
return NULL;
TCPConnection * connection = new TCPConnection(_listen_socket);
return connection;
}
int Trick::ClientListener::restart () {
_system_interface = new SystemInterface();
}

View File

@ -23,6 +23,7 @@ int Trick::MulticastManager::broadcast (std::string message) {
for (struct sockaddr_in& address : addresses) {
sendto(mcast_socket , message_send , strlen(message_send) , 0 , (struct sockaddr *)&address , (socklen_t)sizeof(address)) ;
}
return 0;
}
int Trick::MulticastManager::addAddress (std::string addr, int port) {
@ -33,6 +34,7 @@ int Trick::MulticastManager::addAddress (std::string addr, int port) {
mcast_addr.sin_addr.s_addr = inet_addr(addr.c_str());
mcast_addr.sin_port = htons((uint16_t) port);
addresses.emplace_back(mcast_addr);
return 0;
}
int Trick::MulticastManager::is_initialized () {

View File

@ -1,117 +0,0 @@
#include "trick/TCConnection.hh"
#include "trick/tc.h"
#include "trick/tc_proto.h"
#include <sstream>
#include <iostream>
Trick::TCConnection::TCConnection () {}
int Trick::accept(ClientListener *listener, Trick::TCConnection *connection) {
if (!listener->isInitialized())
return -1;
if ( listener->_listen_dev.socket_type == SOCK_STREAM ) {
return tc_accept(&(listener->_listen_dev), &(connection->_device));
}
return 0;
}
int Trick::TCConnection::initialize() {
_device.disabled = TC_COMM_FALSE ;
_device.disable_handshaking = TC_COMM_TRUE ;
_device.blockio_limit = 0.0 ;
_device.blockio_type = TC_COMM_BLOCKIO ;
_device.client_id = 0 ;
strcpy(_device.client_tag, "") ;
_device.error_handler = (TrickErrorHndlr *) calloc(1, (int)sizeof(TrickErrorHndlr));
_device.error_handler->report_level = TRICK_ERROR_CAUTION;
}
int Trick::TCConnection::write (char * message, int size) {
int ret = tc_write(&_device, message, size);
return ret;
}
int Trick::TCConnection::write (const std::string& message) {
char send_buf[message.length()+1];
strcpy (send_buf, message.c_str());
int ret = tc_write(&_device, send_buf, message.length());
return ret;
}
std::string Trick::TCConnection::read (int max_len) {
char incoming_msg[max_len];
int nbytes = recvfrom( _device.socket, incoming_msg, MAX_CMD_LEN, MSG_PEEK, NULL, NULL ) ;
if (nbytes == 0 ) {
return 0;
}
if (nbytes != -1) { // -1 means socket is nonblocking and no data to read
/* find the last newline that is present on the socket */
incoming_msg[nbytes] = '\0' ;
char *last_newline = rindex( incoming_msg , '\n') ;
/* if there is a newline then there is a complete command on the socket */
if ( last_newline != NULL ) {
/* only remove up to (and including) the last newline on the socket */
int size = last_newline - incoming_msg + 1;
nbytes = recvfrom( _device.socket, incoming_msg, size, 0 , NULL, NULL ) ;
} else {
nbytes = 0 ;
}
}
std::stringstream msg_stream;
if ( nbytes > 0 ) {
int msg_len = nbytes ;
// if (debug >= 3) {
// message_publish(MSG_DEBUG, "%p tag=<%s> var_server received bytes = msg_len = %d\n", &_device, _device.client_tag, nbytes);
// }
incoming_msg[msg_len] = '\0' ;
for( int ii = 0 , jj = 0 ; ii <= msg_len ; ii++ ) {
if ( incoming_msg[ii] != '\r' ) {
msg_stream << incoming_msg[ii] ;
}
}
}
return msg_stream.str();
}
std::string Trick::TCConnection::get_client_tag () {
return std::string(_device.client_tag);
}
int Trick::TCConnection::set_client_tag(std::string tag) {
// Max size of device client tag is 80
// TODO: Make 80 a constant somewhere, probably in TC device
if (tag.length() >= 80) {
tag.resize(79);
}
strcpy(_device.client_tag, tag.c_str());
return 0;
}
int Trick::TCConnection::get_socket() {
return _device.socket;
}
int Trick::TCConnection::disconnect () {
return tc_disconnect(&_device);
}
int Trick::TCConnection::setBlockMode(int block_mode) {
return tc_blockio(&_device, (TCCommBlocking)block_mode);
}
int Trick::TCConnection::setErrorReporting (bool on) {
return tc_error(&_device, (int)on);
}

View File

@ -0,0 +1,156 @@
#include "trick/TCPConnection.hh"
#include <sstream>
#include <iostream>
#include <cstring>
#include <strings.h>
Trick::TCPConnection::TCPConnection (SystemInterface * system_interface) : TCPConnection(0, system_interface) {}
Trick::TCPConnection::TCPConnection (int listen_socket) : TCPConnection(listen_socket, new SystemInterface()) {}
Trick::TCPConnection::TCPConnection (int listen_socket, SystemInterface * system_interface) : _system_interface(system_interface), _listen_socket(listen_socket), _socket(0), _connected(false) {
_connection_type = TCP;
}
int Trick::TCPConnection::start() {
if (_listen_socket <= 0) {
return -1;
}
// Accept a waiting connection
struct sockaddr_storage their_addr;
socklen_t addr_size = sizeof their_addr;
if ((_socket = _system_interface->accept(_listen_socket, (struct sockaddr *)&their_addr, &addr_size)) < 0) {
perror ("Unable to accept incoming connection");
return -1;
}
// Set to non blocking
setBlockMode(false);
_connected = true;
return 0;
}
int Trick::TCPConnection::write (char * message, int size) {
if (!_connected)
return -1;
return _system_interface->send(_socket, message, size, 0);
}
int Trick::TCPConnection::write (const std::string& message) {
if (!_connected)
return -1;
char send_buf[message.length()+1];
strcpy (send_buf, message.c_str());
return _system_interface->send(_socket, send_buf, message.length(), 0);
}
std::string Trick::TCPConnection::read (int max_len) {
if (!_connected) {
std::cerr << "Trying to read from a socket that is not connected" << std::endl;
return "";
}
char incoming_msg[max_len];
int max_receive_length = max_len < MAX_CMD_LEN ? max_len : MAX_CMD_LEN;
int nbytes = _system_interface->recv(_socket, incoming_msg, max_receive_length, MSG_PEEK);
if (nbytes == 0 ) {
return std::string("");
}
if (nbytes == -1) {
if (errno == EAGAIN) {
return std::string("");
} else {
std::string error_msg = "Error while reading from socket " + std::to_string(_socket);
perror(error_msg.c_str());
return std::string("");
}
}
/* find the last newline that is present on the socket */
incoming_msg[nbytes] = '\0' ;
char *last_newline = rindex( incoming_msg , '\n') ;
/* if there is a newline then there is a complete command on the socket */
if ( last_newline != NULL ) {
/* only remove up to (and including) the last newline on the socket */
int size = last_newline - incoming_msg + 1;
nbytes = _system_interface->recv( _socket, incoming_msg, size, 0) ;
} else {
nbytes = 0 ;
}
std::stringstream msg_stream;
if ( nbytes > 0 ) {
int msg_len = nbytes ;
incoming_msg[msg_len] = '\0' ;
// Strip off any \r characters
for( int ii = 0 , jj = 0 ; ii <= msg_len ; ii++ ) {
if ( incoming_msg[ii] != '\r' ) {
msg_stream << incoming_msg[ii] ;
}
}
}
return msg_stream.str();
}
int Trick::TCPConnection::disconnect () {
if (!_connected) {
return -1;
}
_system_interface->shutdown(_socket, SHUT_RDWR);
_system_interface->close (_socket);
_connected = false;
return 0;
}
int Trick::TCPConnection::setBlockMode(bool blocking) {
int flag = _system_interface->fcntl(_socket, F_GETFL, 0);
if (flag == -1) {
std::string error_message = "Unable to get flags for fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking);
perror (error_message.c_str());
return -1;
}
if (blocking) {
flag &= ~O_NONBLOCK;
} else {
flag |= O_NONBLOCK;
}
if (_system_interface->fcntl(_socket, F_SETFL, flag) == -1) {
std::string error_message = "Unable to set fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking);
perror (error_message.c_str());
return -1;
}
return 0;
}
bool Trick::TCPConnection::isInitialized() {
return _connected;
}
int Trick::TCPConnection::restart() {
_system_interface = new SystemInterface();
}

View File

@ -0,0 +1,215 @@
#include "trick/UDPConnection.hh"
#include <sstream>
#include <iostream>
#include <cstring>
#include <strings.h>
#include <arpa/inet.h>
Trick::UDPConnection::UDPConnection () : UDPConnection(new SystemInterface()) {}
Trick::UDPConnection::UDPConnection (SystemInterface * system_interface) : _started(false), _initialized(false), _port(0), _hostname(""), _system_interface(system_interface), _socket(0) {}
int Trick::UDPConnection::initialize(const std::string& in_hostname, int in_port) {
if ((_socket = _system_interface->socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror ("Server: Unable to open socket");
return -1;
}
int option_val = 1;
// Do not replicate the socket's handle if this process is forked
// Removing this will cause all kinds of problems when starting a variable server client via input file
_system_interface->fcntl(_socket, F_SETFD, FD_CLOEXEC);
// Allow socket's bound address to be used by others
if (_system_interface->setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, (const char *) &option_val, (socklen_t) sizeof(option_val)) != 0) {
perror("Server: Could not set socket to reuse addr");
_system_interface->close (_socket);
return -1;
}
struct sockaddr_in s_in;
memset(&s_in, 0 , sizeof(struct sockaddr_in)) ;
// Look up the hostname
char name[80];
gethostname(name, (size_t) 80);
struct hostent *ip_host ;
socklen_t s_in_size = sizeof(s_in) ;
s_in.sin_family = AF_INET;
if (in_hostname == "" || in_hostname == "localhost" || in_hostname == "127.0.0.1" || strcmp(in_hostname.c_str(),name) == 0) {
s_in.sin_addr.s_addr = INADDR_ANY;
_hostname = std::string(name);
} else if ( inet_pton(AF_INET, in_hostname.c_str(), (struct in_addr *)&s_in.sin_addr.s_addr) == 1 ) {
/* numeric character string address */
_hostname = in_hostname;
} else if ( (ip_host = gethostbyname(in_hostname.c_str())) != NULL ) {
/* some name other than the default name was given */
memcpy((void *) &(s_in.sin_addr.s_addr), (const void *) ip_host->h_addr, (size_t) ip_host->h_length);
_hostname = in_hostname;
} else {
perror("UDP socket: Could not determine source address");
return -1 ;
}
// Set port
s_in.sin_port = htons((short) in_port);
// Bind to socket
if (_system_interface->bind(_socket, (struct sockaddr *)&s_in, sizeof(s_in)) < 0) {
perror("UDP socket: Could not bind to socket");
_system_interface->close (_socket);
return -1;
}
// Check that correct port was bound to
_system_interface->getsockname( _socket , (struct sockaddr *)&s_in, &s_in_size) ;
int bound_port = ntohs(s_in.sin_port);
if (in_port != 0 && bound_port != in_port) {
std::cerr << "UDP socket: Could not bind to requested port " << in_port << std::endl;
_system_interface->close(_socket);
return -1;
}
// Save port number
_port = bound_port;
setBlockMode(false);
// Done!
_initialized = true;
return 0;
}
int Trick::UDPConnection::start() {
// We've already set everything up, just make sure it was successful
if (_initialized) {
_started = true;
return 0;
}
return -1;
}
int Trick::UDPConnection::write (char * message, int size) {
if (!_started)
return -1;
socklen_t sock_size = sizeof(_remote_serv_addr);
return _system_interface->sendto(_socket, message, size, 0, (struct sockaddr *) &_remote_serv_addr, sock_size );
}
int Trick::UDPConnection::write (const std::string& message) {
if (!_started)
return -1;
char send_buf[message.length()+1];
strcpy (send_buf, message.c_str());
socklen_t sock_size = sizeof(_remote_serv_addr);
return _system_interface->sendto(_socket, send_buf, message.length(), 0, (struct sockaddr *) &_remote_serv_addr, sock_size );
}
std::string Trick::UDPConnection::read (int max_len) {
if (!_started)
return "";
char incoming_msg[max_len];
int nbytes = _system_interface->recvfrom( _socket, incoming_msg, MAX_CMD_LEN, MSG_PEEK, NULL, NULL ) ;
if (nbytes == 0 ) {
return 0;
}
if (nbytes != -1) { // -1 means socket is nonblocking and no data to read
/* find the last newline that is present on the socket */
incoming_msg[nbytes] = '\0' ;
char *last_newline = rindex( incoming_msg , '\n') ;
/* if there is a newline then there is a complete command on the socket */
if ( last_newline != NULL ) {
socklen_t sock_size = sizeof(_remote_serv_addr);
/* Save the remote host information so we know where to send replies */
/* only remove up to (and including) the last newline on the socket */
int size = last_newline - incoming_msg + 1;
nbytes = _system_interface->recvfrom( _socket, incoming_msg, size, 0 , (struct sockaddr *) &_remote_serv_addr, &sock_size ) ;
} else {
nbytes = 0 ;
}
}
std::stringstream msg_stream;
if ( nbytes > 0 ) {
int msg_len = nbytes ;
incoming_msg[msg_len] = '\0' ;
for( int ii = 0 , jj = 0 ; ii <= msg_len ; ii++ ) {
if ( incoming_msg[ii] != '\r' ) {
msg_stream << incoming_msg[ii] ;
}
}
}
return msg_stream.str();
}
int Trick::UDPConnection::disconnect () {
if (!_initialized) {
return -1;
}
_system_interface->shutdown(_socket, SHUT_RDWR);
_system_interface->close (_socket);
_initialized = false;
_started = false;
return 0;
}
int Trick::UDPConnection::setBlockMode(bool blocking) {
int flag = _system_interface->fcntl(_socket, F_GETFL, 0);
if (flag == -1) {
std::string error_message = "Unable to get flags for fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking);
perror (error_message.c_str());
return -1;
}
if (blocking) {
flag &= ~O_NONBLOCK;
} else {
flag |= O_NONBLOCK;
}
if (_system_interface->fcntl(_socket, F_SETFL, flag) == -1) {
std::string error_message = "Unable to set fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking);
perror (error_message.c_str());
return -1;
}
return 0;
}
int Trick::UDPConnection::restart() {
_system_interface = new SystemInterface();
}
bool Trick::UDPConnection::isInitialized() {
return _started;
}
int Trick::UDPConnection::getPort() {
return _port;
}
std::string Trick::UDPConnection::getHostname() {
return _hostname;
}

View File

@ -0,0 +1 @@
comm_test

View File

@ -0,0 +1,400 @@
#include <gtest/gtest.h>
#include <errno.h>
#include <unistd.h>
#include <iostream>
#include <sys/select.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "trick/ClientListener.hh"
#include "SystemInterfaceMock/SystemInterfaceMock.hh"
class ClientListenerTest : public testing::Test {
protected:
ClientListenerTest() : system_context(new SystemInferfaceMock()), listener (system_context) {}
~ClientListenerTest(){}
SystemInferfaceMock * system_context;
Trick::ClientListener listener;
};
TEST_F( ClientListenerTest, initialized ) {
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, initialize_localhost_0 ) {
// ARRANGE
// Look up the hostname
char name[80];
gethostname(name, (size_t) 80);
// ACT
listener.initialize("localhost", 0);
// ASSERT
EXPECT_EQ(listener.isInitialized(), true);
EXPECT_EQ(listener.getHostname(), std::string(name));
}
TEST_F( ClientListenerTest, initialize_localhost_54321 ) {
// ARRANGE
// ACT
listener.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), true);
EXPECT_EQ(listener.getPort(), 54321);
}
TEST_F( ClientListenerTest, initialize_no_args ) {
// ARRANGE
// ACT
listener.initialize();
// ASSERT
EXPECT_EQ(listener.isInitialized(), true);
EXPECT_GT(listener.getPort(), 1000);
}
TEST_F( ClientListenerTest, initialize_localhost_numerical_54321 ) {
// ARRANGE
// ACT
listener.initialize("127.0.0.1", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), true);
EXPECT_EQ(listener.getPort(), 54321);
}
TEST_F( ClientListenerTest, initialize_invalid_hostname ) {
// ARRANGE
// ACT
listener.initialize("some_invalid_hostname", 0);
// ASSERT
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, failed_socket ) {
// ARRANGE
system_context->register_socket_impl([](int a, int b, int c) {
errno = EPERM;
return -1;
});
// ACT
listener.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, failed_setsockopt_reuseaddr ) {
// ARRANGE
system_context->register_setsockopt_impl([](int sockfd, int level, int optname, const void *optval, socklen_t optlen) {
errno = EINVAL;
return -1;
});
// ACT
listener.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, failed_setsockopt_buffering ) {
// ARRANGE
system_context->register_setsockopt_impl([](int sockfd, int level, int optname, const void *optval, socklen_t optlen) {
if (level == IPPROTO_TCP && optname == TCP_NODELAY) {
errno = ENOTSOCK;
return -1;
}
return 0;
});
// ACT
listener.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, failed_bind ) {
// ARRANGE
system_context->register_bind_impl([](int sockfd, const struct sockaddr *addr,socklen_t addrlen) {
errno = EADDRINUSE;
return -1;
});
// ACT
listener.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, failed_sockname ) {
// ARRANGE
system_context->register_getsockname_impl([](int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
((struct sockaddr_in *) addr)->sin_port = htons(1234);
return 0;
});
// ACT
listener.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, failed_listen ) {
// ARRANGE
system_context->register_listen_impl([](int sockfd, int backlog) {
errno = EADDRINUSE;
return -1;
});
// ACT
listener.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(listener.isInitialized(), false);
}
TEST_F( ClientListenerTest, checkForNewConnections_uninitialized ) {
// ARRANGE
// ACT
bool result = listener.checkForNewConnections();
// ASSERT
EXPECT_EQ(result, false);
}
TEST_F( ClientListenerTest, checkForNewConnections ) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
system_context->register_select_impl([&socket_fd](int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout) {
FD_SET(socket_fd, readfds);
return 1;
});
listener.initialize();
// ACT
bool result = listener.checkForNewConnections();
// ASSERT
EXPECT_EQ(result, true);
}
TEST_F( ClientListenerTest, checkForNewConnections_select_error ) {
// ARRANGE
system_context->register_select_impl([](int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout) {
return -1;
});
listener.initialize();
// ACT
bool result = listener.checkForNewConnections();
// ASSERT
EXPECT_EQ(result, false);
}
TEST_F( ClientListenerTest, setBlockMode_false ) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
listener.initialize();
// ACT
int status = listener.setBlockMode(false);
// ASSERT
EXPECT_EQ(status, 0);
int flag = fcntl(socket_fd, F_GETFL, 0);
EXPECT_TRUE(flag & O_NONBLOCK);
}
TEST_F( ClientListenerTest, setBlockMode_nonblocking) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
listener.initialize();
// ACT
int status = listener.setBlockMode(false);
// ASSERT
EXPECT_EQ(status, 0);
int flag = fcntl(socket_fd, F_GETFL, 0);
EXPECT_TRUE(flag & O_NONBLOCK);
}
TEST_F( ClientListenerTest, setBlockMode_blocking) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
listener.initialize();
// ACT
int status = listener.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, 0);
int flag = fcntl(socket_fd, F_GETFL, 0);
EXPECT_FALSE(flag & O_NONBLOCK);
}
TEST_F( ClientListenerTest, setBlockMode_fcntl_getfl_fail) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
system_context->register_fcntl_impl([](int a, int b, int c) {
errno = EACCES;
return -1;
});
listener.initialize();
// ACT
int status = listener.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, -1);
}
TEST_F( ClientListenerTest, setBlockMode_fcntl_setfl_fail) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
system_context->register_fcntl_impl([](int a, int cmd, int c) {
if (cmd == F_SETFL) {
errno = EBADF;
return -1;
}
return 0;
});
listener.initialize();
// ACT
int status = listener.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, -1);
}
TEST_F( ClientListenerTest, validateSourceAddress_localhost) {
// ARRANGE
// ACT
bool status = listener.validateSourceAddress("localhost");
// ASSERT
EXPECT_EQ(status, true);
}
TEST_F( ClientListenerTest, validateSourceAddress_junk) {
// ARRANGE
// ACT
bool status = listener.validateSourceAddress("alsdkfjgalkdj");
// ASSERT
EXPECT_EQ(status, false);
}
TEST_F( ClientListenerTest, checkSocket) {
// ARRANGE
listener.initialize();
int port = listener.getPort();
// ACT
int status = listener.checkSocket();
// ASSERT
EXPECT_EQ(status, 0);
EXPECT_EQ(listener.getPort(), port);
}
TEST_F( ClientListenerTest, checkSocket_uninitialized) {
// ARRANGE
// ACT
int status = listener.checkSocket();
// ASSERT
EXPECT_EQ(status, -1);
}
TEST_F( ClientListenerTest, disconnect) {
// ARRANGE
listener.initialize();
// ACT
int status = listener.disconnect();
// ASSERT
EXPECT_EQ(status, 0);
}
TEST_F( ClientListenerTest, disconnect_uninitialized) {
// ARRANGE
// ACT
int status = listener.disconnect();
// ASSERT
EXPECT_EQ(status, -1);
}
TEST_F( ClientListenerTest, setupNewConnection) {
// ARRANGE
listener.initialize();
// ACT
Trick::TCPConnection * connection = listener.setUpNewConnection();
// ASSERT
EXPECT_TRUE(connection != NULL);
}
TEST_F( ClientListenerTest, setupNewConnection_uninitialized) {
// ARRANGE
// ACT
Trick::TCPConnection * connection = listener.setUpNewConnection();
// ASSERT
EXPECT_TRUE(connection == NULL);
}

View File

@ -0,0 +1,38 @@
include ${TRICK_HOME}/share/trick/makefiles/Makefile.common
include ${TRICK_HOME}/share/trick/makefiles/Makefile.tricklib
INCLUDE_DIRS = -I$(GTEST_HOME) -I${GTEST_HOME}/include -I$(TRICK_HOME)/include
# Use the libtrick_connection_handlers library only. libtrick.a would also work.
TRICK_LIBS := ${TRICK_LIB_DIR}/libtrick_connection_handlers.a
TRICK_EXEC_LINK_LIBS += -L${GTEST_HOME}/lib64 -L${GTEST_HOME}/lib -lgtest -lgtest_main
TRICK_CXXFLAGS += ${INCLUDE_DIRS} -g -Wall -Wextra ${TRICK_SYSTEM_CXXFLAGS} ${TRICK_TEST_FLAGS}
# Automatically determine all executable names produced by this Makefile.
TESTS = comm_test
# Objects that we depend on
DEPENDS = ../object_*/*.o
TRICK_CXXFLAGS += -Wno-unused-parameter
default : all
all : $(TESTS)
# Set XML test results name
test : all
@ for i in $(TESTS) ; do \
./$$i --gtest_output=xml:${TRICK_HOME}/trick_test/ConnectionHandlers.xml; \
done
clean : clean_test
clean_test :
$(RM) -rf $(TESTS)
$(TESTS) : $(CPP_OBJS) $(DEPENDS)
$(TRICK_CXX) $(TRICK_CXXFLAGS) $^ $(TRICK_EXEC_LINK_LIBS) -o $@ $(SYSCALL_MOCK_LINKFLAGS)

View File

@ -0,0 +1,238 @@
#ifndef __SYSTEM_INTERFACE_MOCK__
#define __SYSTEM_INTERFACE_MOCK__
#include "trick/SystemInterface.hh"
typedef std::function<int (int, int, int)> socket_func_type;
typedef std::function<int (int, int, int, const void *, socklen_t)> setsockopt_func_type;
typedef std::function<int (int, const struct sockaddr *, socklen_t)> bind_func_type;
typedef std::function<int (int, struct sockaddr *, socklen_t *)> getsockname_func_type;
typedef std::function<int (int, int)> listen_func_type;
typedef std::function<int (int, fd_set *, fd_set *, fd_set *, struct timeval *)> select_func_type;
typedef std::function<int (int)> close_func_type;
typedef std::function<int (const char *, const char *, const struct addrinfo *, struct addrinfo **)> getaddrinfo_func_type;
typedef std::function<int (int, int, int)> fcntl_func_type;
typedef std::function<int (int, int)> shutdown_func_type;
typedef std::function<int (int, struct sockaddr *, socklen_t *)> accept_func_type;
typedef std::function<ssize_t (int, const void *, size_t, int)> send_func_type;
typedef std::function<ssize_t (int, const void *, size_t, int, const struct sockaddr *, socklen_t)> sendto_func_type;
typedef std::function<ssize_t (int, void *, size_t, int)> recv_func_type;
typedef std::function<ssize_t (int, void *, size_t, int, struct sockaddr *, socklen_t *)> recvfrom_func_type;
class SystemInferfaceMock : public SystemInterface {
public:
SystemInferfaceMock () {
real_socket_impl();
real_setsockopt_impl();
real_bind_impl();
real_getsockname_impl();
real_listen_impl();
real_select_impl();
real_close_impl();
real_getaddrinfo_impl();
real_fcntl_impl();
real_shutdown_impl();
real_accept_impl();
real_send_impl();
real_sendto_impl();
real_recv_impl();
real_recvfrom_impl();
}
void set_all_real () {
real_socket_impl();
real_setsockopt_impl();
real_bind_impl();
real_getsockname_impl();
real_listen_impl();
real_select_impl();
real_close_impl();
real_getaddrinfo_impl();
real_fcntl_impl();
real_shutdown_impl();
real_accept_impl();
real_send_impl();
real_sendto_impl();
real_recv_impl();
real_recvfrom_impl();
}
void set_all_noop() {
noop_socket_impl();
noop_setsockopt_impl();
noop_bind_impl();
noop_getsockname_impl();
noop_listen_impl();
noop_select_impl();
noop_close_impl();
noop_getaddrinfo_impl();
noop_fcntl_impl();
noop_shutdown_impl();
noop_accept_impl();
noop_send_impl();
noop_sendto_impl();
noop_recv_impl();
noop_recvfrom_impl();
}
// socket implementation
public:
virtual int socket (int domain, int type, int protocol) override { return socket_impl( domain, type, protocol); }
void register_socket_impl (socket_func_type impl) { socket_impl = impl; }
void real_socket_impl () { socket_impl = [](int domain, int type, int protocol) -> int { return ::socket( domain, type, protocol); }; }
void noop_socket_impl () { socket_impl = [](int domain, int type, int protocol) -> int { return 0; }; }
private:
socket_func_type socket_impl;
// setsockopt implementation
public:
virtual int setsockopt (int socket, int level, int option_name, const void * option_value, socklen_t option_len) override { return setsockopt_impl( socket, level, option_name, option_value, option_len); }
void register_setsockopt_impl (setsockopt_func_type impl) { setsockopt_impl = impl; }
void real_setsockopt_impl () { setsockopt_impl = [](int socket, int level, int option_name, const void * option_value, socklen_t option_len) -> int { return ::setsockopt( socket, level, option_name, option_value, option_len); }; }
void noop_setsockopt_impl () { setsockopt_impl = [](int socket, int level, int option_name, const void * option_value, socklen_t option_len) -> int { return 0; }; }
private:
setsockopt_func_type setsockopt_impl;
// bind implementation
public:
virtual int bind (int socket, struct sockaddr * address, socklen_t address_len) override { return bind_impl( socket, address, address_len); }
void register_bind_impl (bind_func_type impl) { bind_impl = impl; }
void real_bind_impl () { bind_impl = [](int socket, const struct sockaddr * address, socklen_t address_len) -> int { return ::bind( socket, address, address_len); }; }
void noop_bind_impl () { bind_impl = [](int socket, const struct sockaddr * address, socklen_t address_len) -> int { return 0; }; }
private:
bind_func_type bind_impl;
// getsockname implementation
public:
virtual int getsockname (int socket, struct sockaddr * address, socklen_t * address_len) override { return getsockname_impl( socket, address, address_len); }
void register_getsockname_impl (getsockname_func_type impl) { getsockname_impl = impl; }
void real_getsockname_impl () { getsockname_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return ::getsockname( socket, address, address_len); }; }
void noop_getsockname_impl () { getsockname_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return 0; }; }
private:
getsockname_func_type getsockname_impl;
// listen implementation
public:
virtual int listen (int socket, int backlog) override { return listen_impl( socket, backlog); }
void register_listen_impl (listen_func_type impl) { listen_impl = impl; }
void real_listen_impl () { listen_impl = [](int socket, int backlog) -> int { return ::listen( socket, backlog); }; }
void noop_listen_impl () { listen_impl = [](int socket, int backlog) -> int { return 0; }; }
private:
listen_func_type listen_impl;
// select implementation
public:
virtual int select (int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) override { return select_impl( nfds, readfds, writefds, errorfds, timeout); }
void register_select_impl (select_func_type impl) { select_impl = impl; }
void real_select_impl () { select_impl = [](int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) -> int { return ::select( nfds, readfds, writefds, errorfds, timeout); }; }
void noop_select_impl () { select_impl = [](int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) -> int { return 0; }; }
private:
select_func_type select_impl;
// close implementation
public:
virtual int close (int fildes) override { return close_impl( fildes); }
void register_close_impl (close_func_type impl) { close_impl = impl; }
void real_close_impl () { close_impl = [](int fildes) -> int { return ::close( fildes); }; }
void noop_close_impl () { close_impl = [](int fildes) -> int { return 0; }; }
private:
close_func_type close_impl;
// getaddrinfo implementation
public:
virtual int getaddrinfo (const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) override { return getaddrinfo_impl( hostname, servname, hints, res); }
void register_getaddrinfo_impl (getaddrinfo_func_type impl) { getaddrinfo_impl = impl; }
void real_getaddrinfo_impl () { getaddrinfo_impl = [](const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) -> int { return ::getaddrinfo( hostname, servname, hints, res); }; }
void noop_getaddrinfo_impl () { getaddrinfo_impl = [](const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) -> int { return 0; }; }
private:
getaddrinfo_func_type getaddrinfo_impl;
// fcntl implementation
public:
virtual int fcntl (int fildes, int cmd, int arg) override { return fcntl_impl( fildes, cmd, arg); }
void register_fcntl_impl (fcntl_func_type impl) { fcntl_impl = impl; }
void real_fcntl_impl () { fcntl_impl = [](int fildes, int cmd, int arg) -> int { return ::fcntl( fildes, cmd, arg); }; }
void noop_fcntl_impl () { fcntl_impl = [](int fildes, int cmd, int arg) -> int { return 0; }; }
private:
fcntl_func_type fcntl_impl;
// shutdown implementation
public:
virtual int shutdown (int socket, int how) override { return shutdown_impl( socket, how); }
void register_shutdown_impl (shutdown_func_type impl) { shutdown_impl = impl; }
void real_shutdown_impl () { shutdown_impl = [](int socket, int how) -> int { return ::shutdown( socket, how); }; }
void noop_shutdown_impl () { shutdown_impl = [](int socket, int how) -> int { return 0; }; }
private:
shutdown_func_type shutdown_impl;
// accept implementation
public:
virtual int accept (int socket, struct sockaddr * address, socklen_t * address_len) override { return accept_impl( socket, address, address_len); }
void register_accept_impl (accept_func_type impl) { accept_impl = impl; }
void real_accept_impl () { accept_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return ::accept( socket, address, address_len); }; }
void noop_accept_impl () { accept_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return 0; }; }
private:
accept_func_type accept_impl;
// send implementation
public:
virtual ssize_t send (int socket, const void * buffer, size_t length, int flags) override { return send_impl( socket, buffer, length, flags); }
void register_send_impl (send_func_type impl) { send_impl = impl; }
void real_send_impl () { send_impl = [](int socket, const void * buffer, size_t length, int flags) -> ssize_t { return ::send( socket, buffer, length, flags); }; }
void noop_send_impl () { send_impl = [](int socket, const void * buffer, size_t length, int flags) -> ssize_t { return 0; }; }
private:
send_func_type send_impl;
// sendto implementation
public:
virtual ssize_t sendto (int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) override { return sendto_impl( socket, buffer, length, flags, dest_addr, dest_len); }
void register_sendto_impl (sendto_func_type impl) { sendto_impl = impl; }
void real_sendto_impl () { sendto_impl = [](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t { return ::sendto( socket, buffer, length, flags, dest_addr, dest_len); }; }
void noop_sendto_impl () { sendto_impl = [](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t { return 0; }; }
private:
sendto_func_type sendto_impl;
// recv implementation
public:
virtual ssize_t recv (int socket, void * buffer, size_t length, int flags) override { return recv_impl( socket, buffer, length, flags); }
void register_recv_impl (recv_func_type impl) { recv_impl = impl; }
void real_recv_impl () { recv_impl = [](int socket, void * buffer, size_t length, int flags) -> ssize_t { return ::recv( socket, buffer, length, flags); }; }
void noop_recv_impl () { recv_impl = [](int socket, void * buffer, size_t length, int flags) -> ssize_t { return 0; }; }
private:
recv_func_type recv_impl;
// recvfrom implementation
public:
virtual ssize_t recvfrom (int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) override { return recvfrom_impl( socket, buffer, length, flags, address, address_len); }
void register_recvfrom_impl (recvfrom_func_type impl) { recvfrom_impl = impl; }
void real_recvfrom_impl () { recvfrom_impl = [](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { return ::recvfrom( socket, buffer, length, flags, address, address_len); }; }
void noop_recvfrom_impl () { recvfrom_impl = [](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { return 0; }; }
private:
recvfrom_func_type recvfrom_impl;
};
#endif

View File

@ -0,0 +1,367 @@
#include <gtest/gtest.h>
#include <errno.h>
#include <unistd.h>
#include <iostream>
#include <sys/select.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "trick/TCPConnection.hh"
#include "SystemInterfaceMock/SystemInterfaceMock.hh"
class TCPConnectionTest : public testing::Test {
protected:
TCPConnectionTest() : system_context(new SystemInferfaceMock()), connection (fake_listen_socket_fd, system_context) {}
~TCPConnectionTest(){}
SystemInferfaceMock * system_context;
Trick::TCPConnection connection;
const static int fake_listen_socket_fd;
};
const int TCPConnectionTest::fake_listen_socket_fd = 5;
TEST_F( TCPConnectionTest, initialized ) {
// ARRANGE
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return 6;
});
system_context->noop_fcntl_impl();
// ACT
int result = connection.start();
// ASSERT
ASSERT_EQ(result, 0);
}
TEST_F( TCPConnectionTest, initialized_accept_failure ) {
// ARRANGE
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
errno = EWOULDBLOCK;
return -1;
});
// ACT
int result = connection.start();
// ASSERT
ASSERT_EQ(result, -1);
}
TEST_F( TCPConnectionTest, setBlockMode_nonblocking) {
// ARRANGE
int socket_fd;
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
socket_fd = ::socket(AF_INET, SOCK_STREAM, 0);
return socket_fd;
});
connection.start();
// ACT
int status = connection.setBlockMode(false);
// ASSERT
EXPECT_EQ(status, 0);
int flag = fcntl(socket_fd, F_GETFL, 0);
EXPECT_TRUE(flag & O_NONBLOCK);
close(socket_fd);
}
TEST_F( TCPConnectionTest, setBlockMode_blocking) {
// ARRANGE
int socket_fd;
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
socket_fd = ::socket(AF_INET, SOCK_STREAM, 0);
return socket_fd;
});
connection.start();
// ACT
int status = connection.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, 0);
int flag = fcntl(socket_fd, F_GETFL, 0);
EXPECT_FALSE(flag & O_NONBLOCK);
close(socket_fd);
}
TEST_F( TCPConnectionTest, setBlockMode_fcntl_getfl_fail) {
// ARRANGE
int socket_fd;
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
socket_fd = ::socket(AF_INET, SOCK_STREAM, 0);
return socket_fd;
});
system_context->register_fcntl_impl([](int a, int b, int c) {
errno = EACCES;
return -1;
});
connection.start();
// ACT
int status = connection.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, -1);
close(socket_fd);
}
TEST_F( TCPConnectionTest, setBlockMode_fcntl_setfl_fail) {
// ARRANGE
int socket_fd;
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
socket_fd = ::socket(AF_INET, SOCK_STREAM, 0);
return socket_fd;
});
system_context->register_fcntl_impl([](int a, int cmd, int c) {
if (cmd == F_SETFL) {
errno = EBADF;
return -1;
}
return 0;
});
connection.start();
// ACT
int status = connection.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, -1);
close(socket_fd);
}
TEST_F( TCPConnectionTest, write_string ) {
// ARRANGE
char sent_data[100];
memset(sent_data, 0, 100);
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return 6;
});
system_context->noop_fcntl_impl();
system_context->register_send_impl([&](int socket, const void * buffer, size_t length, int flags) -> ssize_t {
memcpy (sent_data, buffer, length);
return length;
});
connection.start();
// ACT
std::string str = "This is a message to write";
int result = connection.write(str);
// ASSERT
ASSERT_EQ(result, str.length());
ASSERT_STREQ(str.c_str(), sent_data);
}
TEST_F( TCPConnectionTest, write_binary_buf ) {
// ARRANGE
char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07};
char sent_data[100];
memset(sent_data, 0, 100);
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return 6;
});
system_context->noop_fcntl_impl();
system_context->register_send_impl([&](int socket, const void * buffer, size_t length, int flags) -> ssize_t {
memcpy (sent_data, buffer, length);
return length;
});
connection.start();
// ACT
int result = connection.write(to_send, sizeof(to_send));
// ASSERT
ASSERT_EQ(result, sizeof(to_send));
for (int i = 0; i < 8; i++) {
ASSERT_EQ(to_send[i], sent_data[i]);
}
}
TEST_F( TCPConnectionTest, write_string_uninitialized ) {
// ARRANGE
std::string str = "This is a message to write";
// ACT
int result = connection.write(str);
// ASSERT
ASSERT_EQ(result, -1);
}
TEST_F( TCPConnectionTest, write_binary_buf_uninitialized ) {
// ARRANGE
char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07};
// ACT
int result = connection.write(to_send, sizeof(to_send));
// ASSERT
ASSERT_EQ(result, -1);
}
TEST_F( TCPConnectionTest, read_nonewline ) {
// ARRANGE
std::string data_to_read = "Here is an incomplete message from a socket";
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return 6;
});
system_context->noop_fcntl_impl();
system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t {
int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length;
memcpy (buffer, data_to_read.c_str(), data_to_read_size);
return data_to_read_size;
});
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
ASSERT_EQ(result, std::string(""));
}
TEST_F( TCPConnectionTest, read ) {
// ARRANGE
std::string data_to_read = "Here is a complete message from a socket\n This part is incomplete";
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return 6;
});
system_context->noop_fcntl_impl();
system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t {
int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length;
memcpy (buffer, data_to_read.c_str(), data_to_read_size);
return data_to_read_size;
});
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "Here is a complete message from a socket\n";
expected += '\0';
ASSERT_EQ(result, expected);
}
TEST_F( TCPConnectionTest, read_nodata ) {
// ARRANGE
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return 6;
});
system_context->noop_fcntl_impl();
system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t {
errno = EAGAIN;
return -1;
});
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "";
ASSERT_EQ(result, expected);
}
TEST_F( TCPConnectionTest, read_other_error ) {
// ARRANGE
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return 6;
});
system_context->noop_fcntl_impl();
system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t {
errno = EBADF;
return -1;
});
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "";
ASSERT_EQ(result, expected);
}
TEST_F( TCPConnectionTest, read_uninitialized ) {
// ARRANGE
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "";
ASSERT_EQ(result, expected);
}
TEST_F( TCPConnectionTest, disconnect ) {
// ARRANGE
int socket_fd = 6;
int close_called = 0;
int shutdown_called = 0;
system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int {
return socket_fd;
});
system_context->register_close_impl([&](int socket) {
if (socket == socket_fd)
close_called++;
return 0;
});
system_context->register_shutdown_impl([&](int socket, int how) {
if (socket == socket_fd)
shutdown_called++;
return 0;
});
system_context->noop_fcntl_impl();
connection.start();
// ACT
int result = connection.disconnect();
// ASSERT
ASSERT_EQ(result, 0);
ASSERT_EQ(close_called, 1);
ASSERT_EQ(shutdown_called, 1);
// Don't shut down if already shut down
// ACT
result = connection.disconnect();
// ASSERT
ASSERT_EQ(result, -1);
ASSERT_EQ(close_called, 1);
ASSERT_EQ(shutdown_called, 1);
}

View File

@ -0,0 +1,394 @@
#include <gtest/gtest.h>
#include <errno.h>
#include <unistd.h>
#include <iostream>
#include <sys/select.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "trick/UDPConnection.hh"
#include "SystemInterfaceMock/SystemInterfaceMock.hh"
class UDPConnectionTest : public testing::Test {
protected:
UDPConnectionTest() : system_context(new SystemInferfaceMock()), connection (system_context) {}
~UDPConnectionTest() {}
SystemInferfaceMock * system_context;
Trick::UDPConnection connection;
};
TEST_F( UDPConnectionTest, initialize_localhost_0 ) {
// ARRANGE
char name[80];
gethostname(name, (size_t) 80);
// ACT
connection.initialize("localhost", 0);
connection.start();
// ASSERT
EXPECT_EQ(connection.isInitialized(), true);
EXPECT_EQ(connection.getHostname(), std::string(name));
}
TEST_F( UDPConnectionTest, initialize_localhost_54321 ) {
// ARRANGE
// ACT
connection.initialize("localhost", 54321);
connection.start();
// ASSERT
EXPECT_EQ(connection.isInitialized(), true);
EXPECT_EQ(connection.getPort(), 54321);
}
TEST_F( UDPConnectionTest, initialize_localhost_numerical_54321 ) {
// ARRANGE
// ACT
connection.initialize("127.0.0.1", 12345);
connection.start();
// ASSERT
EXPECT_EQ(connection.getPort(), 12345);
}
TEST_F( UDPConnectionTest, initialize_invalid_hostname ) {
// ARRANGE
// ACT
connection.initialize("some_invalid_hostname", 0);
connection.start();
// ASSERT
EXPECT_EQ(connection.isInitialized(), false);
}
TEST_F( UDPConnectionTest, failed_socket ) {
// ARRANGE
system_context->register_socket_impl([](int a, int b, int c) {
errno = EPERM;
return -1;
});
// ACT
connection.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(connection.isInitialized(), false);
}
TEST_F( UDPConnectionTest, failed_setsockopt_reuseaddr ) {
// ARRANGE
system_context->register_setsockopt_impl([](int sockfd, int level, int optname, const void *optval, socklen_t optlen) {
errno = EINVAL;
return -1;
});
// ACT
connection.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(connection.isInitialized(), false);
}
TEST_F( UDPConnectionTest, failed_bind ) {
// ARRANGE
system_context->register_bind_impl([](int sockfd, const struct sockaddr *addr,socklen_t addrlen) {
errno = EADDRINUSE;
return -1;
});
// ACT
connection.initialize("localhost", 54321);
// ASSERT
EXPECT_EQ(connection.isInitialized(), false);
}
TEST_F( UDPConnectionTest, failed_sockname ) {
// ARRANGE
system_context->register_getsockname_impl([](int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
((struct sockaddr_in *) addr)->sin_port = htons(1234);
return 0;
});
// ACT
connection.initialize("localhost", 3412);
// ASSERT
EXPECT_EQ(connection.isInitialized(), false);
}
TEST_F( UDPConnectionTest, setBlockMode_nonblocking) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
int status = connection.setBlockMode(false);
// ASSERT
EXPECT_EQ(status, 0);
int flag = fcntl(socket_fd, F_GETFL, 0);
EXPECT_TRUE(flag & O_NONBLOCK);
close(socket_fd);
}
TEST_F( UDPConnectionTest, setBlockMode_blocking) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
int status = connection.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, 0);
int flag = fcntl(socket_fd, F_GETFL, 0);
EXPECT_FALSE(flag & O_NONBLOCK);
close(socket_fd);
}
TEST_F( UDPConnectionTest, setBlockMode_fcntl_getfl_fail) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
connection.initialize("localhost", 0);
connection.start();
system_context->register_fcntl_impl([](int a, int b, int c) {
errno = EACCES;
return -1;
});
// ACT
int status = connection.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, -1);
close(socket_fd);
}
TEST_F( UDPConnectionTest, setBlockMode_fcntl_setfl_fail) {
// ARRANGE
int socket_fd;
system_context->register_socket_impl([&socket_fd](int a, int b, int c) {
socket_fd = ::socket(a, b, c);
return socket_fd;
});
connection.initialize("localhost", 0);
connection.start();
system_context->register_fcntl_impl([](int a, int cmd, int c) {
if (cmd == F_SETFL) {
errno = EBADF;
return -1;
}
return 0;
});
// ACT
int status = connection.setBlockMode(true);
// ASSERT
EXPECT_EQ(status, -1);
close(socket_fd);
}
TEST_F( UDPConnectionTest, write_string ) {
// ARRANGE
char sent_data[100];
memset(sent_data, 0, 100);
system_context->noop_fcntl_impl();
system_context->register_sendto_impl([&](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t {
memcpy (sent_data, buffer, length);
return length;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
std::string str = "This is a message to write";
int result = connection.write(str);
// ASSERT
ASSERT_EQ(result, str.length());
ASSERT_STREQ(str.c_str(), sent_data);
}
TEST_F( UDPConnectionTest, write_binary_buf ) {
// ARRANGE
char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07};
char sent_data[100];
memset(sent_data, 0, 100);
system_context->noop_fcntl_impl();
system_context->register_sendto_impl([&](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t {
memcpy (sent_data, buffer, length);
return length;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
int result = connection.write(to_send, sizeof(to_send));
// ASSERT
ASSERT_EQ(result, sizeof(to_send));
for (int i = 0; i < 8; i++) {
ASSERT_EQ(to_send[i], sent_data[i]);
}
}
TEST_F( UDPConnectionTest, write_string_uninitialized ) {
// ARRANGE
std::string str = "This is a message to write";
// ACT
int result = connection.write(str);
// ASSERT
ASSERT_EQ(result, -1);
}
TEST_F( UDPConnectionTest, write_binary_buf_uninitialized ) {
// ARRANGE
char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07};
// ACT
int result = connection.write(to_send, sizeof(to_send));
// ASSERT
ASSERT_EQ(result, -1);
}
TEST_F( UDPConnectionTest, read_nonewline ) {
// ARRANGE
std::string data_to_read = "Here is an incomplete message from a socket";
system_context->noop_fcntl_impl();
system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t {
int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length;
memcpy (buffer, data_to_read.c_str(), data_to_read_size);
return data_to_read_size;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
ASSERT_EQ(result, std::string(""));
}
TEST_F( UDPConnectionTest, read ) {
// ARRANGE
std::string data_to_read = "Here is a complete message from a socket\n This part is incomplete";
system_context->noop_fcntl_impl();
system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t {
int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length;
memcpy (buffer, data_to_read.c_str(), data_to_read_size);
return data_to_read_size;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "Here is a complete message from a socket\n";
expected += '\0';
ASSERT_EQ(result, expected);
}
TEST_F( UDPConnectionTest, read_nodata ) {
// ARRANGE
system_context->noop_fcntl_impl();
system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t {
errno = EAGAIN;
return -1;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "";
ASSERT_EQ(result, expected);
}
TEST_F( UDPConnectionTest, read_other_error ) {
// ARRANGE
system_context->noop_fcntl_impl();
system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t {
errno = EBADF;
return -1;
});
connection.initialize("localhost", 0);
connection.start();
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "";
ASSERT_EQ(result, expected);
}
TEST_F( UDPConnectionTest, read_uninitialized ) {
// ARRANGE
// ACT
std::string result = connection.read();
// ASSERT
std::string expected = "";
ASSERT_EQ(result, expected);
}

View File

@ -28,39 +28,10 @@ class SimTestWorkflow(TrickWorkflow):
remaining_run_jobs = self.get_jobs(kind='run', phase=0) # Get all jobs with default phase 0
analysis_jobs = self.get_jobs(kind='analyze')
# Some tests fail intermittently for reasons not related to the tests themselves, mostly network weirdness.
# Allow retries so that we can still cover some network-adjacent code
retry_allowed_sims = self.get_sims(labels='retries_allowed')
retry_allowed_jobs = [run.get_run_job() for run in [item for sublist in [sim.get_runs() for sim in retry_allowed_sims] for item in sublist]]
for job in retry_allowed_jobs:
# Note there's an assumption/dependency here that 'retries_allowed' runs
# are only in the remaining_run_jobs list. - Jordan 2/2023
remaining_run_jobs.remove(job)
# Some tests fail intermittently for reasons not related to the tests themselves, mostly network weirdness.
# Allow retries so that we can still cover some network-adjacent code
retry_allowed_sims = self.get_sims(labels='retries_allowed')
retry_allowed_jobs = [run.get_run_job() for run in [item for sublist in [sim.get_runs() for sim in retry_allowed_sims] for item in sublist]]
for job in retry_allowed_jobs:
run_jobs.remove(job)
builds_status = self.execute_jobs(build_jobs, max_concurrent=self.cpus, header='Executing all sim builds.')
first_phase_run_status = self.execute_jobs(first_run_jobs, max_concurrent=self.cpus, header="Executing first phase runs.")
runs_status = self.execute_jobs(remaining_run_jobs, max_concurrent=self.cpus, header='Executing remaining runs.')
first_phase_run_status = self.execute_jobs(first_run_jobs, max_concurrent=self.cpus, header="Executing first phase runs.", job_timeout=1000)
runs_status = self.execute_jobs(remaining_run_jobs, max_concurrent=self.cpus, header='Executing remaining runs.', job_timeout=1000)
# Run the retry_allowed jobs
self.execute_jobs(retry_allowed_jobs, max_concurrent=self.cpus, header='Executing retry-allowed runs.')
# If anything failed, try it again up to max_retries times
all_retried_status = 0
final_retry_jobs = []
for sim in retry_allowed_sims:
failing_runs = [run for run in sim.get_runs() if run.get_run_job().get_status() == Job.Status.FAILED]
for run in failing_runs:
status, final_job = self.retry_job(sim, run, max_retries)
final_retry_jobs += [final_job]
all_retried_status = all_retried_status or status
comparison_result = self.compare()
analysis_status = self.execute_jobs(analysis_jobs, max_concurrent=self.cpus, header='Executing all analysis.')
@ -68,32 +39,22 @@ class SimTestWorkflow(TrickWorkflow):
self.status_summary() # Print a Succinct summary
# Dump failing logs
jobs = build_jobs + first_run_jobs + remaining_run_jobs + final_retry_jobs
jobs = build_jobs + first_run_jobs + remaining_run_jobs
for job in jobs:
if job.get_status() == Job.Status.FAILED:
print("Failing job: ", job.name)
if job.get_status() == Job.Status.FAILED or job.get_status() == Job.Status.TIMEOUT:
print ("*"*120)
if job.get_status() == Job.Status.FAILED:
header = "Failing job: " + job.name
else:
header = "Timed out job: " + job.name
numspaces = int((120 - 20 - len(header))/2 -2)
print("*"*10, " "*numspaces, header, " "*numspaces, "*"*10,)
print ("*"*120)
print(open(job.log_file, "r").read())
print ("*"*120, "\n")
print ("*"*120, "\n\n\n")
return (builds_status or runs_status or first_phase_run_status or all_retried_status or len(self.config_errors) > 0 or comparison_result or analysis_status)
# Retries a job up to max_retries times and adds runs to the sim
# Returns tuple of (job_status, final retry job)
def retry_job(self, sim, run, max_retries):
tries = 0
job_failing = 1
retry_run = None
retry_job = None
while tries < max_retries and job_failing:
tries += 1
retry_run = TrickWorkflow.Run(sim_dir=run.sim_dir, input_file=run.input_file, binary=run.binary, returns=run.returns,log_dir=run.log_dir)
retry_job = retry_run.get_run_job()
retry_job.name = retry_job.name + "_retry_" + str(tries)
job_failing = self.execute_jobs([retry_job], max_concurrent=1, header="Retrying failed job")
sim.add_run(retry_run)
return (job_failing, retry_job)
return (builds_status or runs_status or first_phase_run_status or len(self.config_errors) > 0 or comparison_result or analysis_status)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Build, run, and compare all test sims for Trick',