diff --git a/include/trick/ClientConnection.hh b/include/trick/ClientConnection.hh index 50883c4f..064a1e4a 100644 --- a/include/trick/ClientConnection.hh +++ b/include/trick/ClientConnection.hh @@ -14,28 +14,31 @@ LIBRARY DEPENDENCIES: namespace Trick { class ClientConnection { public: - // Should this be here? ¯\_(ツ)_/¯ + static const unsigned int MAX_CMD_LEN = 200000 ; + enum ConnectionType { TCP, UDP, MCAST, WS } ; - virtual int initialize() = 0; + // Pure virtual methods + virtual int start() = 0; virtual int write (const std::string& message) = 0; virtual int write (char * message, int size) = 0; virtual std::string read (int max_len = MAX_CMD_LEN) = 0; + virtual int setBlockMode (bool blocking) = 0; virtual int disconnect () = 0; - virtual std::string get_client_tag () = 0; - virtual int set_client_tag(std::string tag) = 0; - virtual int setBlockMode (int mode) = 0; - - static const unsigned int MAX_CMD_LEN = 200000 ; + + virtual bool isInitialized() = 0; + + virtual std::string getClientTag (); + virtual int setClientTag (std::string tag); + + virtual int restart() {}; protected: ConnectionType _connection_type; - - // RHEL appears to have an issue with std::atomic - // std::atomic_bool _is_initialized; + std::string _client_tag; }; } diff --git a/include/trick/ClientListener.hh b/include/trick/ClientListener.hh index c9df618d..ebba212f 100644 --- a/include/trick/ClientListener.hh +++ b/include/trick/ClientListener.hh @@ -1,28 +1,39 @@ #ifndef CLIENT_LISTENER_HH #define CLIENT_LISTENER_HH -#include "trick/tc.h" -#include "trick/TCConnection.hh" +/* + PURPOSE: ( Encapsulate a TCP server. ) +*/ + #include +#include "trick/SystemInterface.hh" +#include "trick/TCPConnection.hh" + +#define LISTENER_ERROR -1 + namespace Trick { - class TCConnection; + class TCPConnection; class ClientListener { public: + ClientListener (); + ClientListener (SystemInterface * system_interface); + ~ClientListener (); - // We'll see if we need separate methods for these int initialize(std::string hostname, int port); int initialize(); - int setBlockMode(TCCommBlocking mode); + int setBlockMode(bool blocking); bool checkForNewConnections(); - const char * getHostname (); + TCPConnection * setUpNewConnection (); + + std::string getHostname (); int getPort(); @@ -34,13 +45,18 @@ namespace Trick { bool isInitialized(); - friend int accept(ClientListener* listener, TCConnection* connection); - + int restart(); + private: - TCDevice _listen_dev; - std::string saved_source; - int port; - bool initialized; + + int _listen_socket; + std::string _hostname; + int _port; + std::string _client_tag; + + bool _initialized; + + SystemInterface * _system_interface; /* ** */ }; } diff --git a/include/trick/SysThread.hh b/include/trick/SysThread.hh index a565b8c5..d5f639dd 100644 --- a/include/trick/SysThread.hh +++ b/include/trick/SysThread.hh @@ -33,7 +33,7 @@ namespace Trick { **/ class SysThread : public Trick::ThreadBase { public: - SysThread(std::string in_name, bool self_deleting = false); + SysThread(std::string in_name); ~SysThread(); static int ensureAllShutdown(); @@ -47,7 +47,6 @@ namespace Trick { static bool shutdown_finished; - bool self_deleting; } ; } diff --git a/include/trick/SystemInterface.hh b/include/trick/SystemInterface.hh new file mode 100644 index 00000000..b6daa767 --- /dev/null +++ b/include/trick/SystemInterface.hh @@ -0,0 +1,50 @@ + +#ifndef __SYSTEM_INTERFACE__ +#define __SYSTEM_INTERFACE__ + + +#include +#include +#include +#include +#include +#include +#include +#include + +class SystemInterface { + public: + + virtual int socket (int domain, int type, int protocol) { return ::socket ( domain, type, protocol); } + + virtual int setsockopt (int socket, int level, int option_name, const void * option_value, socklen_t option_len) { return ::setsockopt ( socket, level, option_name, option_value, option_len); } + + virtual int bind (int socket, struct sockaddr * address, socklen_t address_len) { return ::bind ( socket, address, address_len); } + + virtual int getsockname (int socket, struct sockaddr * address, socklen_t * address_len) { return ::getsockname ( socket, address, address_len); } + + virtual int listen (int socket, int backlog) { return ::listen ( socket, backlog); } + + virtual int select (int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) { return ::select ( nfds, readfds, writefds, errorfds, timeout); } + + virtual int close (int fildes) { return ::close ( fildes); } + + virtual int getaddrinfo (const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) { return ::getaddrinfo ( hostname, servname, hints, res); } + + virtual int fcntl (int fildes, int cmd, int arg) { return ::fcntl ( fildes, cmd, arg); } + + virtual int shutdown (int socket, int how) { return ::shutdown ( socket, how); } + + virtual int accept (int socket, struct sockaddr * address, socklen_t * address_len) { return ::accept ( socket, address, address_len); } + + virtual ssize_t send (int socket, const void * buffer, size_t length, int flags) { return ::send ( socket, buffer, length, flags); } + + virtual ssize_t sendto (int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) { return ::sendto ( socket, buffer, length, flags, dest_addr, dest_len); } + + virtual ssize_t recv (int socket, void * buffer, size_t length, int flags) { return ::recv ( socket, buffer, length, flags); } + + virtual ssize_t recvfrom (int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) { return ::recvfrom ( socket, buffer, length, flags, address, address_len); } + +}; + +#endif diff --git a/include/trick/TCConnection.hh b/include/trick/TCConnection.hh deleted file mode 100644 index 070892d1..00000000 --- a/include/trick/TCConnection.hh +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef TC_CONNECTION_HH -#define TC_CONNECTION_HH - -/* - PURPOSE: ( Encapsulate a connection with TrickComm. ) -*/ - -#include "trick/ClientConnection.hh" -#include "trick/ClientListener.hh" -#include "tc.h" - - -namespace Trick { - - class ClientListener; - - class TCConnection : public ClientConnection { - public: - - TCConnection (); - - int initialize() override; - - int write (const std::string& message) override; - int write (char * message, int size) override; - - std::string read (int max_len) override; - - int disconnect () override; - std::string get_client_tag () override; - int set_client_tag(std::string tag) override; - int get_socket(); - - int setBlockMode(int block_mode) override; - int setErrorReporting (bool on); - - static const unsigned int MAX_CMD_LEN = 200000 ; - - friend int accept(ClientListener* listener, TCConnection* connection); - - private: - TCDevice _device; /**< trick_io(**) */ - }; - - int accept(ClientListener* listener, TCConnection* connection); -} - -#endif \ No newline at end of file diff --git a/include/trick/TCPConnection.hh b/include/trick/TCPConnection.hh new file mode 100644 index 00000000..9285610d --- /dev/null +++ b/include/trick/TCPConnection.hh @@ -0,0 +1,46 @@ +#ifndef TCP_CONNECTION_HH +#define TCP_CONNECTION_HH + +/* + PURPOSE: ( Encapsulate a TCP connection. ) +*/ + +#include "trick/ClientConnection.hh" +#include "trick/SystemInterface.hh" + +namespace Trick { + + class TCPConnection : public ClientConnection { + public: + + TCPConnection (int listen_socket); + TCPConnection (SystemInterface * system_interface); + TCPConnection (int listen_socket, SystemInterface * system_interface); + + int start() override; + + int write (const std::string& message) override; + int write (char * message, int size) override; + + std::string read (int max_len = MAX_CMD_LEN) override; + + int disconnect () override; + bool isInitialized() override; + + int setBlockMode(bool blocking) override; + + int restart() override; + + private: + int _socket; + bool _connected; + + // This is needed to be able to accept an incoming connection, after being set up by the listener + int _listen_socket; + + SystemInterface * _system_interface; /* ** */ + + }; +} + +#endif \ No newline at end of file diff --git a/include/trick/ThreadBase.hh b/include/trick/ThreadBase.hh index 41ec3273..ffe51ceb 100644 --- a/include/trick/ThreadBase.hh +++ b/include/trick/ThreadBase.hh @@ -131,7 +131,7 @@ namespace Trick { virtual int cancel_thread() ; /** - * Cancels thread. + * Joins thread. * @return always 0 */ virtual int join_thread() ; @@ -153,6 +153,12 @@ namespace Trick { */ virtual void dump( std::ostream & oss = std::cout ) ; + virtual void test_shutdown(); + virtual void test_shutdown(void (*exit_handler) (void *), void * exit_arg); + + virtual void thread_shutdown(); + virtual void thread_shutdown(void (*exit_handler) (void *), void * exit_arg); + protected: /** optional name of thread */ @@ -170,6 +176,14 @@ namespace Trick { /** Set of cpus to use with thread */ unsigned int max_cpu ; /**< trick_io(**) */ + /** Track whether the thread has been created */ + bool created ; /**< trick_io(**) */ + + /** Manage thread shutdown */ + bool should_shutdown; /**< trick_io(**) */ + bool cancellable; /**< trick_io(**) */ + pthread_mutex_t shutdown_mutex; /**< trick_io(**) */ + #if __linux #ifndef SWIG /** Set of cpus to use with thread */ diff --git a/include/trick/UDPConnection.hh b/include/trick/UDPConnection.hh new file mode 100644 index 00000000..abfc7e6a --- /dev/null +++ b/include/trick/UDPConnection.hh @@ -0,0 +1,56 @@ +#ifndef UDP_CONNECTION_HH +#define UDP_CONNECTION_HH + +/* + PURPOSE: ( Encapsulate a UDP socket. ) +*/ + +#include "trick/ClientConnection.hh" +#include "trick/SystemInterface.hh" + + +namespace Trick { + + class UDPConnection : public ClientConnection { + public: + + UDPConnection (); + UDPConnection (SystemInterface * system_interface); + + int start() override; + + int write (const std::string& message) override; + int write (char * message, int size) override; + + std::string read (int max_len = MAX_CMD_LEN) override; + + int disconnect () override; + bool isInitialized() override; + + int setBlockMode(bool block_mode) override; + int restart() override; + + // Non-override functions + int initialize(const std::string& hostname, int port); + + + int getPort(); + std::string getHostname(); + + + private: + bool _initialized; + bool _started; + int _socket; + + int _port; + std::string _hostname; + + struct sockaddr_in _remote_serv_addr; + + SystemInterface * _system_interface; /* ** */ + }; + +} + +#endif \ No newline at end of file diff --git a/include/trick/VariableServerListenThread.hh b/include/trick/VariableServerListenThread.hh index 41fa2052..bc87a34e 100644 --- a/include/trick/VariableServerListenThread.hh +++ b/include/trick/VariableServerListenThread.hh @@ -75,10 +75,10 @@ namespace Trick { bool broadcast ; /**< trick_units(--) */ /** The listen device */ - ClientListener listener; + ClientListener listener; /**< trick_io(**) trick_units(--) */ /* Multicast broadcaster */ - MulticastManager multicast; + MulticastManager multicast; /**< trick_io(**) trick_units(--) */ /** The mutex to stop accepting new connections during restart\n */ pthread_mutex_t restart_pause ; /**< trick_io(**) */ diff --git a/include/trick/VariableServerThread.hh b/include/trick/VariableServerThread.hh index 3d932524..0b046035 100644 --- a/include/trick/VariableServerThread.hh +++ b/include/trick/VariableServerThread.hh @@ -11,11 +11,12 @@ #include #include #include "trick/tc.h" -#include "trick/TCConnection.hh" #include "trick/SysThread.hh" #include "trick/VariableServerSession.hh" #include "trick/variable_server_sync_types.h" #include "trick/variable_server_message_types.h" + +#include "trick/ClientConnection.hh" #include "trick/ClientListener.hh" namespace Trick { @@ -39,7 +40,7 @@ namespace Trick { @brief Constructor. @param listen_dev - the TCDevice set up in listen() */ - VariableServerThread(ClientListener * in_listen_dev ) ; + VariableServerThread() ; virtual ~VariableServerThread() ; /** @@ -50,6 +51,16 @@ namespace Trick { void set_client_tag(std::string tag); + /** + @brief Open a UDP socket for this thread + */ + int open_udp_socket(const std::string& hostname, int port); + + /** + @brief Open a TCP connection for this thread + */ + int open_tcp_connection(ClientListener * listener); + /** @brief Block until thread has accepted connection */ @@ -67,6 +78,8 @@ namespace Trick { void restart() ; + void cleanup(); + protected: /** @@ -77,12 +90,11 @@ namespace Trick { /** The Master variable server object. */ static VariableServer * vs ; - /** this is where a lot of this should happen now */ - VariableServerSession * session; + /** Manages the variable list */ + VariableServerSession * session; /**< trick_io(**) */ - /** The listen device from the variable server\n */ - ClientListener * listener; /**< trick_io(**) */ - TCConnection connection; /**< trick_io(**) */ + /** Connection to the client */ + ClientConnection * connection; /**< trick_io(**) */ /** Value (1,2,or 3) that causes the variable server to output increasing amounts of debug information.\n */ int debug ; /**< trick_io(**) */ @@ -91,8 +103,8 @@ namespace Trick { bool enabled ; /**< trick_io(**) */ ConnectionStatus connection_status ; /**< trick_io(**) */ - pthread_mutex_t connection_status_mutex; - pthread_cond_t connection_status_cv; + pthread_mutex_t connection_status_mutex; /**< trick_io(**) */ + pthread_cond_t connection_status_cv; /**< trick_io(**) */ /** The mutex pauses all processing during checkpoint restart */ pthread_mutex_t restart_pause ; /**< trick_io(**) */ diff --git a/share/trick/trickops/WorkflowCommon.py b/share/trick/trickops/WorkflowCommon.py index 0846313b..6b164cc4 100644 --- a/share/trick/trickops/WorkflowCommon.py +++ b/share/trick/trickops/WorkflowCommon.py @@ -205,7 +205,7 @@ class Job(object): for getting status information. More specific types of Jobs should inherit from this base clasee. """ - enums = ['NOT_STARTED', 'RUNNING', 'SUCCESS', 'FAILED'] + enums = ['NOT_STARTED', 'RUNNING', 'SUCCESS', 'FAILED', 'TIMEOUT'] Status = collections.namedtuple('Status', enums)(*(range(len(enums)))) _success_progress_bar = create_progress_bar(1, 'Success') @@ -220,7 +220,9 @@ class Job(object): Job.Status.NOT_STARTED: ('NOT RUN', 'DARK_YELLOW'), Job.Status.SUCCESS: ('OK', 'DARK_GREEN'), Job.Status.FAILED: ('FAIL:' + str(self._expected_exit_status) + '/' + - str(self._exit_status), 'DARK_RED') }[self.get_status()] + str(self._exit_status), 'DARK_RED'), + Job.Status.TIMEOUT: ('TIMEOUT after ' + str(self._timeout) + ' seconds', 'YELLOW') + }[self.get_status()] return printer.colorstr(text, color) def __init__(self, name, command, log_file, expected_exit_status=0): @@ -245,6 +247,7 @@ class Job(object): self._stop_time = None self._exit_status = None self._expected_exit_status = expected_exit_status + self._timeout = None def start(self): """ @@ -275,6 +278,8 @@ class Job(object): This job completed with an exit status of zero. Status.FAILED This job completed with a non-zero exit status. + Status.TIMEOUT + This job ran longer than the allotted time given by execute_job """ if self._process is None: return self.Status.NOT_STARTED @@ -283,6 +288,9 @@ class Job(object): if self._exit_status is None: return self.Status.RUNNING + if self._timeout is not None: + return self.Status.TIMEOUT + if self._stop_time is None: self._stop_time = time.time() @@ -368,6 +376,17 @@ class Job(object): """ return self._done_string() + def _timeout_string(self): + """ + Get a string to display when this Job has failed. + + Returns + ------- + str + A string to be displayed when in the FAILED state. + """ + return self._done_string() + def _done_string(self): """ This class uses the same string for SUCCESS and FAILED, but @@ -573,7 +592,7 @@ class WorkflowCommon: else: return 'unknown' - def execute_jobs(self, jobs, max_concurrent=None, header=None): + def execute_jobs(self, jobs, max_concurrent=None, header=None, job_timeout=None): """ Run jobs, blocking until all have returned. @@ -694,6 +713,15 @@ class WorkflowCommon: for i in range(min(len(waitingJobs), available_cpus)): waitingJobs[i].start() + if job_timeout is not None: + # Check if any jobs have timed out + runningJobs = [job for job in jobs if job.get_status() is job.Status.RUNNING] + for runningJob in runningJobs: + if time.time() - runningJob._start_time > job_timeout: + runningJob._timeout = job_timeout + runningJob._stop_time = time.time() + runningJob.die() + # display the status if enabled if stdscr: status_pad.erase() @@ -802,8 +830,8 @@ class WorkflowCommon: text, color = { job.Status.NOT_STARTED: ('was not run', 'GREY40'), job.Status.SUCCESS: ('succeeded', 'DARK_GREEN'), - job.Status.FAILED: ('failed', 'DARK_RED') - }[job.get_status()] + job.Status.FAILED: ('failed', 'DARK_RED'), + job.Status.TIMEOUT: ('timed out', 'YELLOW') }[job.get_status()] text = job.name + ' ' + text diff --git a/test/SIM_test_varserv/RUN_test/unit_test.py b/test/SIM_test_varserv/RUN_test/unit_test.py index e2de1e92..afd494eb 100644 --- a/test/SIM_test_varserv/RUN_test/unit_test.py +++ b/test/SIM_test_varserv/RUN_test/unit_test.py @@ -5,7 +5,7 @@ from trick.unit_test import * def main(): - trick.var_server_set_port(4000) + trick.var_server_set_port(40000) trick.var_ascii() trick.real_time_enable() trick.exec_set_software_frame(0.01) @@ -15,7 +15,7 @@ def main(): trick.var_server_create_udp_socket('', 48000) trick.var_server_create_multicast_socket('224.10.10.10','', 47000) - trick.exec_set_terminate_time(1000.0) + trick.exec_set_terminate_time(100.0) varServerPort = trick.var_server_get_port() test_output = ( os.getenv("TRICK_HOME") + "/trick_test/SIM_test_varserv.xml" ) diff --git a/test/SIM_test_varserv/models/test_client/test_client.cpp b/test/SIM_test_varserv/models/test_client/test_client.cpp index ffc1aefb..6d172d39 100644 --- a/test/SIM_test_varserv/models/test_client/test_client.cpp +++ b/test/SIM_test_varserv/models/test_client/test_client.cpp @@ -12,7 +12,7 @@ #include #include #include -#include + #include #include "trick/var_binary_parser.hh" @@ -44,20 +44,14 @@ class Socket { return -1; } - int value = 1; - if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &value, (socklen_t) sizeof(value)) < 0) { - std::cout << "init_multicast: Socket option failed" << std::endl; - return -1; - } - - if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEPORT, (char *) &value, sizeof(value)) < 0) { - perror("setsockopt: reuseport"); - } - struct sockaddr_in serv_addr; serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(port); // convert to weird network byte format - serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0) { + std::cout << "Invalid address/ Address not supported" << std::endl; + return -1; + } tries = 0; int connection_status; @@ -81,7 +75,8 @@ class Socket { _port = port; int tries = 0; - _socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + while ((_socket_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 && tries < max_retries) tries++; + if (_socket_fd < 0) { std::cout << "init_multicast: Socket open failed" << std::endl; return -1; @@ -93,10 +88,6 @@ class Socket { return -1; } - if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEPORT, (char *) &value, sizeof(value)) < 0) { - perror("setsockopt: reuseport"); - } - struct ip_mreq mreq; // Use setsockopt() to request that the kernel join a multicast group mreq.imr_multiaddr.s_addr = inet_addr(_hostname.c_str()); @@ -109,10 +100,9 @@ class Socket { struct sockaddr_in sockin ; - // Set up local interface - // We must bind to the multicast address + // Set up destination address sockin.sin_family = AF_INET; - sockin.sin_addr.s_addr = inet_addr(_hostname.c_str()); + sockin.sin_addr.s_addr = htonl(INADDR_ANY); sockin.sin_port = htons(_port); if ( bind(_socket_fd, (struct sockaddr *) &sockin, (socklen_t) sizeof(sockin)) < 0 ) { @@ -120,14 +110,6 @@ class Socket { return -1; } - char loopch = 1; - - if(setsockopt(_socket_fd, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch)) < 0) - { - perror("Setting IP_MULTICAST_LOOP error"); - return -1; - } - _initialized = true; return 0; } @@ -163,25 +145,6 @@ class Socket { ret = receive(); } - std::vector receive_bytes() { - unsigned char buffer[SOCKET_BUF_SIZE]; - int numBytes = recv(_socket_fd, buffer, SOCKET_BUF_SIZE, 0); - if (numBytes < 0) { - std::cout << "Failed to read from socket" << std::endl; - } - - std::vector bytes; - for (int i = 0; i < numBytes; i++) { - bytes.push_back(buffer[i]); - } - - return bytes; - } - - void operator>> (std::string& ret) { - ret = receive(); - } - std::vector receive_bytes() { unsigned char buffer[SOCKET_BUF_SIZE]; int numBytes = recv(_socket_fd, buffer, SOCKET_BUF_SIZE, 0); @@ -228,6 +191,7 @@ class Socket { int _socket_fd; bool _initialized; bool _multicast_socket; + }; class VariableServerTest : public ::testing::Test { @@ -303,42 +267,12 @@ class VariableServerTestAltListener : public ::testing::Test { static int numSession; }; -#ifndef __APPLE__ -class VariableServerTestMulticast : public ::testing::Test { - protected: - VariableServerTestMulticast() { - socket_status = socket.init("", 47000, SOCK_DGRAM); - multicast_listener.init_multicast("224.10.10.10", 47000); - - if (socket_status == 0) { - std::stringstream request; - request << "trick.var_set_client_tag(\"multicast_VSTest"; - request << numSession++; - request << "\") \n"; - - socket << request.str(); - } - } - ~VariableServerTestMulticast() { - socket.close(); - multicast_listener.close(); - } - - Socket socket; - Socket multicast_listener; - - int socket_status; - - static int numSession; -}; -int VariableServerTestMulticast::numSession = 0; -#endif - int VariableServerTest::numSession = 0; int VariableServerUDPTest::numSession = 0; int VariableServerTestAltListener::numSession = 0; + /**********************************************************/ /* Helpful constants and functions */ /**********************************************************/ @@ -411,108 +345,6 @@ void load_checkpoint (Socket& socket, const std::string& checkpoint_name) { socket << "trick.var_unpause()\n"; } -/*****************************************/ -/* Multicast Test */ -/*****************************************/ - -#ifndef __APPLE__ - -TEST_F (VariableServerTestMulticast, Strings) { - if (socket_status != 0) { - FAIL(); - } - - std::string reply; - socket << "trick.var_send_once(\"vsx.vst.o\")\n"; - std::string expected("5\tYou will rejoice to hear that no disaster has accompanied the commencement of an enterprise which you have regarded with such evil forebodings. I arrived here yesterday, and my first task is to assure my dear sister of my welfare and increasing confidence in the success of my undertaking."); - - multicast_listener >> reply; - - EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0); - - - expected = std::string("5\tI am already far north of London, and as I walk in the streets of Petersburgh, I feel a cold northern breeze play upon my cheeks, which braces my nerves and fills me with delight. Do you understand this feeling?"); - socket << "trick.var_send_once(\"vsx.vst.p\")\n"; - - multicast_listener >> reply; - - EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0); - - // TODO: Does wchar actually work? - // expected = std::string("5\tThis breeze, which has travelled from the regions towards which I am advancing, gives me a foretaste of those icy climes. Inspirited by this wind of promise, my daydreams become more fervent and vivid."); - // socket << "trick.var_send_once(\"vsx.vst.q\")\n"; - - // socket >> reply; - - // std::cout << "\tExpected: " << expected << "\n\tActual: " << reply << std::endl; - - // EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0); -} - - -TEST_F (VariableServerTestMulticast, AddRemove) { - if (socket_status != 0) { - FAIL(); - } - - std::string reply; - std::string expected; - - int max_tries = 3; - int tries = 0; - - socket << "trick.var_add(\"vsx.vst.c\")\n"; - multicast_listener >> reply; - expected = std::string("0 -1234"); - - tries = 0; - while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) { - multicast_listener >> reply; - } - - EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply; - - multicast_listener >> reply; - tries = 0; - while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) { - multicast_listener >> reply; - } - - EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply; - - socket << "trick.var_add(\"vsx.vst.m\")\n"; - multicast_listener >> reply; - expected = std::string("0 -1234 1"); - tries = 0; - while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) { - multicast_listener >> reply; - } - EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply; - - socket << "trick.var_remove(\"vsx.vst.m\")\n"; - multicast_listener >> reply; - expected = std::string("0 -1234"); - tries = 0; - while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) { - multicast_listener >> reply; - } - - EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply; - - socket << "trick.var_add(\"vsx.vst.n\")\n"; - multicast_listener >> reply; - expected = std::string("0 -1234 0,1,2,3,4"); - tries = 0; - while (strcmp_IgnoringWhiteSpace(reply, expected) != 0 && tries++ < max_tries) { - multicast_listener >> reply; - } - EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0) << "Expected: " << expected << "\tAcutal: " << reply; - - socket << "trick.var_exit()\n"; -} - -#endif - /************************************/ /* UDP Tests */ /************************************/ @@ -601,7 +433,6 @@ TEST_F (VariableServerTestAltListener, Strings) { socket >> reply; EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0); - } TEST_F (VariableServerTestAltListener, AddRemove) { @@ -679,10 +510,6 @@ TEST_F (VariableServerTestAltListener, RestartAndSet) { /* Normal case tests */ /*********************************************/ -void spin (Socket& socket, int wait_cycles = 5) { - socket.receive(); -} - TEST_F (VariableServerTest, Strings) { if (socket_status != 0) { FAIL(); @@ -749,42 +576,7 @@ TEST_F (VariableServerTest, NoExtraTab) { EXPECT_STREQ(reply.c_str(), expected.c_str()); } -TEST_F (VariableServerTest, NoExtraTab) { - if (socket_status != 0) { - FAIL(); - } - - std::string reply; - std::string expected; - - socket << "trick.var_add(\"vsx.vst.c\")\n"; - socket >> reply; - expected = std::string("0\t-1234\n"); - - EXPECT_STREQ(reply.c_str(), expected.c_str()); - - socket >> reply; - - EXPECT_STREQ(reply.c_str(), expected.c_str()); - - socket << "trick.var_add(\"vsx.vst.m\")\n"; - socket >> reply; - expected = std::string("0\t-1234\t1\n"); - - EXPECT_STREQ(reply.c_str(), expected.c_str()); - - socket << "trick.var_remove(\"vsx.vst.m\")\n"; - socket >> reply; - expected = std::string("0\t-1234\n"); - - socket << "trick.var_add(\"vsx.vst.n\")\n"; - socket >> reply; - expected = std::string("0\t-1234\t0,1,2,3,4\n"); - - EXPECT_STREQ(reply.c_str(), expected.c_str()); -} - -TEST_F (VariableServerTest, DISABLED_AddRemove) { +TEST_F (VariableServerTest, AddRemove) { if (socket_status != 0) { FAIL(); } @@ -811,10 +603,6 @@ TEST_F (VariableServerTest, DISABLED_AddRemove) { socket << "trick.var_remove(\"vsx.vst.m\")\n"; socket >> reply; expected = std::string("0 -1234"); - - socket << "trick.var_add(\"vsx.vst.n\")\n"; - socket >> reply; - expected = std::string("0 -1234 0,1,2,3,4"); EXPECT_EQ(strcmp_IgnoringWhiteSpace(reply, expected), 0); @@ -1115,7 +903,7 @@ TEST_F (VariableServerTest, Multicast) { FAIL() << "Multicast Socket failed to initialize."; } - int max_multicast_tries = 100; + int max_multicast_tries = 10000; int tries = 0; bool found = false; @@ -1263,18 +1051,7 @@ TEST_F (VariableServerTest, Freeze) { ASSERT_EQ(mode, MODE_RUN); } -<<<<<<< HEAD -<<<<<<< HEAD -======= - -<<<<<<< HEAD ->>>>>>> 6fe76071 (Add multicast test) -TEST_F (VariableServerTest, DISABLED_CopyAndWriteModes) { -======= -======= ->>>>>>> d158199e (Added var_send_list_size test, various other fixes) TEST_F (VariableServerTest, CopyAndWriteModes) { ->>>>>>> f9665aba (Fix and test var_send_stdio) if (socket_status != 0) { FAIL(); } @@ -1357,6 +1134,8 @@ TEST_F (VariableServerTest, CopyAndWriteModes) { command = "trick.var_clear()\n" + test_vars_command + "trick.var_set_write_mode(1)\ntrick.var_add(\"vsx.vst.e\")\ntrick.var_add(\"vsx.vst.f\")\ntrick.var_unpause()\n"; socket << command; + spin(socket); + parse_message(socket.receive()); expected = "-123456 123456"; EXPECT_EQ(strcmp_IgnoringWhiteSpace(vars, expected), 0) << "Received: " << vars << " Expected: " << expected; @@ -1384,7 +1163,7 @@ TEST_F (VariableServerTest, CopyAndWriteModes) { socket << command; // Same issue as copy mode 1 write mode 0 - // spin(); + spin(socket); parse_message(socket.receive()); expected = "-1234567 123456789"; EXPECT_EQ(strcmp_IgnoringWhiteSpace(vars, expected), 0) << "Received: " << vars << " Expected: " << expected; @@ -1410,6 +1189,8 @@ TEST_F (VariableServerTest, CopyAndWriteModes) { command = "trick.var_clear()\n" + test_vars_command + "trick.var_set_copy_mode(2)\ntrick.var_set_write_mode(1)\ntrick.var_set_frame_multiple(" + std::to_string(frame_multiple) + ")\ntrick.var_set_frame_offset(" + std::to_string(frame_offset) + ")\ntrick.var_add(\"vsx.vst.i\")\ntrick.var_add(\"vsx.vst.j\")\ntrick.var_unpause()\n"; socket << command; + spin(socket); + expected = "1234.5677 -1234.56789"; parse_message(socket.receive()); EXPECT_EQ(strcmp_IgnoringWhiteSpace(vars, expected), 0) << "Received: " << vars << " Expected: " << expected; @@ -1472,38 +1253,6 @@ TEST_F (VariableServerTest, send_stdio) { std::stringstream message_stream(message); std::string token; -<<<<<<< HEAD -======= - std::getline(message_stream, token, ' '); - int message_type = stoi(token); - std::getline(message_stream, token, ' '); - int stream_num = stoi(token); - std::getline(message_stream, token, '\n'); - int text_size = stoi(token); - std::string text; - std::getline(message_stream, text); - - EXPECT_EQ (message_type, 4); - EXPECT_EQ (stream_num, 1); - EXPECT_EQ (text_size, 41); - - EXPECT_EQ(text, std::string("This message should redirect to varserver")); -} - -TEST_F (VariableServerTest, RestartAndSet) { - if (socket_status != 0) { - FAIL(); - } - - std::string reply; - std::string expected; - - socket << "trick.var_add(\"vsx.vst.c\")\n"; - socket >> reply; - expected = std::string("0\t-1234\n"); - - EXPECT_EQ(reply, expected); ->>>>>>> f9665aba (Fix and test var_send_stdio) int message_type; int stream_num; @@ -1528,234 +1277,6 @@ TEST_F (VariableServerTest, RestartAndSet) { } -#ifndef __APPLE__ - -<<<<<<< HEAD -<<<<<<< HEAD -TEST_F (VariableServerTest, MulticastAfterRestart) { -======= -======= - -TEST_F (VariableServerTest, MulticastAfterRestart) { - if (socket_status != 0) { - FAIL(); - } - - socket << "trick.var_server_set_user_tag(\"VSTestServer\")\n"; - - Socket multicast_socket; - multicast_socket.init_multicast("224.3.14.15", 9265); - - int max_multicast_tries = 100; - int tries = 0; - bool found = false; - - char expected_hostname[80]; - gethostname(expected_hostname, 80); - int expected_port = 4000; - - // get expected username - struct passwd *passp = getpwuid(getuid()) ; - char * expected_username; - if ( passp == NULL ) { - expected_username = strdup("unknown") ; - } else { - expected_username = strdup(passp->pw_name) ; - } - - // Don't care about PID, just check that it's > 0 - char * expected_sim_dir = "trick/test/SIM_test_varserv"; // Compare against the end of the string for this one - // Don't care about cmdline name - char * expected_input_file = "RUN_test/unit_test.py"; - // Don't care about trick_version - char * expected_tag = "VSTestServer"; - - // Variables to be populated by the multicast message - char actual_hostname[80]; - unsigned short actual_port = 0; - char actual_username[80]; - int actual_pid = 0; - char actual_sim_dir[80]; - char actual_cmdline_name[80]; - char actual_input_file[80]; - char actual_trick_version[80]; - char actual_tag[80]; - unsigned short actual_duplicate_port = 0; - - while (!found && tries++ < max_multicast_tries) { - std::string broadcast_data = multicast_socket.receive(); - sscanf(broadcast_data.c_str(), "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , actual_hostname, &actual_port , - actual_username , &actual_pid , actual_sim_dir , actual_cmdline_name , - actual_input_file , actual_trick_version , actual_tag, &actual_duplicate_port) ; - - if (strcmp(actual_hostname, expected_hostname) == 0 && strcmp(expected_tag, actual_tag) == 0) { - found = true; - EXPECT_STREQ(actual_hostname, expected_hostname); - EXPECT_EQ(actual_port, expected_port); - EXPECT_STREQ(actual_username, expected_username); - EXPECT_GT(actual_pid, 0); - std::string expected_sim_dir_str(expected_sim_dir); - std::string actual_sim_dir_str(actual_sim_dir); - std::string end_of_actual = actual_sim_dir_str.substr(actual_sim_dir_str.length() - expected_sim_dir_str.length(), actual_sim_dir_str.length()); - EXPECT_EQ(expected_sim_dir_str, end_of_actual); - EXPECT_STREQ(actual_input_file, expected_input_file); - EXPECT_STREQ(actual_tag, expected_tag); - EXPECT_EQ(actual_duplicate_port, expected_port); - } - } - - if (!found) - FAIL() << "Multicast message never received"; -} - ->>>>>>> 6490e49d (Add more tests, error handling) -TEST_F (VariableServerTest, LargeMessages) { ->>>>>>> f9665aba (Fix and test var_send_stdio) - if (socket_status != 0) { - FAIL(); - } - - socket << "trick.var_server_set_user_tag(\"VSTestServer\")\n"; - -<<<<<<< HEAD - Socket multicast_socket; - if (multicast_socket.init_multicast("224.3.14.15", 9265) != 0) { - FAIL() << "Multicast Socket failed to initialize."; - } - - int max_multicast_tries = 100; - int tries = 0; - bool found = false; - - char expected_hostname[80]; - gethostname(expected_hostname, 80); - int expected_port = 40000; - - // get expected username - struct passwd *passp = getpwuid(getuid()) ; - char * expected_username; - if ( passp == NULL ) { - expected_username = strdup("unknown") ; - } else { - expected_username = strdup(passp->pw_name) ; - } - - // Don't care about PID, just check that it's > 0 - char * expected_sim_dir = "trick/test/SIM_test_varserv"; // Compare against the end of the string for this one - // Don't care about cmdline name - char * expected_input_file = "RUN_test/unit_test.py"; - // Don't care about trick_version - char * expected_tag = "VSTestServer"; - - // Variables to be populated by the multicast message - char actual_hostname[80]; - unsigned short actual_port = 0; - char actual_username[80]; - int actual_pid = 0; - char actual_sim_dir[80]; - char actual_cmdline_name[80]; - char actual_input_file[80]; - char actual_trick_version[80]; - char actual_tag[80]; - unsigned short actual_duplicate_port = 0; -======= - socket << "trick.var_pause()\n"; - - for (int i = 0; i < 4000; i++) { - std::string var_add = "trick.var_add(\"vsx.vst.large_arr[" + std::to_string(i) + "]\")\n"; - socket << var_add; - } - - // Message size limit is 8192 - // Message size should be somewhere between the limit and limit-5 due to size of variable - const static int msg_size_limit = 8192; - - auto get_last_number_in_string = [](const std::string& s) -> int { - int index = s.size() - 1; - while (!isdigit(s.at(index))) { index--; } - int num = 0; - int exp = 0; - while (isdigit(s.at(index))) { - num += (s.at(index) - '0') * pow(10, exp); - index--; - exp++; - } - return num; - }; - - auto get_first_number_in_string = [](const std::string& s) -> int { - std::stringstream message_stream(s); - std::string token; - std::getline(message_stream, token, '\t'); - if (token.size() == 0) { std::getline(message_stream, token, '\t'); } - int ret = stoi(token); - return ret; - }; - - socket.clear_buffered_data(); - - bool ready = false; - while (!ready) { - socket << "trick.var_send_list_size()\n"; - socket >> reply; - if (reply == std::string("3\t4000\n")) - ready = true; - } - - int new_reply_first = 0; - int prev_reply_last = 0; - - socket << "trick.var_send()\n"; - - socket >> reply; - new_reply_first = get_first_number_in_string(reply); - prev_reply_last = get_last_number_in_string(reply); - EXPECT_EQ(new_reply_first, 0); - EXPECT_TRUE(reply.size() <= msg_size_limit && reply.size() >= msg_size_limit-5); - - while (prev_reply_last != 3999) { - socket >> reply; - new_reply_first = get_first_number_in_string(reply); - - EXPECT_TRUE(reply.size() <= msg_size_limit); - EXPECT_EQ(prev_reply_last + 1, new_reply_first); ->>>>>>> d158199e (Added var_send_list_size test, various other fixes) - -<<<<<<< HEAD - while (!found && tries++ < max_multicast_tries) { - std::string broadcast_data = multicast_socket.receive(); - sscanf(broadcast_data.c_str(), "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , actual_hostname, &actual_port , - actual_username , &actual_pid , actual_sim_dir , actual_cmdline_name , - actual_input_file , actual_trick_version , actual_tag, &actual_duplicate_port) ; - - if (strcmp(actual_hostname, expected_hostname) == 0 && strcmp(expected_tag, actual_tag) == 0) { - found = true; - EXPECT_STREQ(actual_hostname, expected_hostname); - EXPECT_EQ(actual_port, expected_port); - EXPECT_STREQ(actual_username, expected_username); - EXPECT_GT(actual_pid, 0); - std::string expected_sim_dir_str(expected_sim_dir); - std::string actual_sim_dir_str(actual_sim_dir); - std::string end_of_actual = actual_sim_dir_str.substr(actual_sim_dir_str.length() - expected_sim_dir_str.length(), actual_sim_dir_str.length()); - EXPECT_EQ(expected_sim_dir_str, end_of_actual); - EXPECT_STREQ(actual_input_file, expected_input_file); - EXPECT_STREQ(actual_tag, expected_tag); - EXPECT_EQ(actual_duplicate_port, expected_port); - } -======= - prev_reply_last = get_last_number_in_string(reply); ->>>>>>> 6490e49d (Add more tests, error handling) - } - - if (!found) - FAIL() << "Multicast message never received"; -} - -<<<<<<< HEAD -#endif - -======= ->>>>>>> f9665aba (Fix and test var_send_stdio) TEST_F (VariableServerTest, Binary) { if (socket_status != 0) { FAIL(); @@ -1918,7 +1439,7 @@ int main(int argc, char **argv) { int result = RUN_ALL_TESTS(); Socket socket; - socket.init("localhost", 4000); + socket.init("localhost", 40000); if (result == 0) { // Success @@ -1932,4 +1453,4 @@ int main(int argc, char **argv) { socket << "trick.stop() \n"; return result; -} +} \ No newline at end of file diff --git a/trick_source/sim_services/MonteCarlo/MonteCarlo_master_shutdown.cpp b/trick_source/sim_services/MonteCarlo/MonteCarlo_master_shutdown.cpp index 38b67256..1b53cb75 100644 --- a/trick_source/sim_services/MonteCarlo/MonteCarlo_master_shutdown.cpp +++ b/trick_source/sim_services/MonteCarlo/MonteCarlo_master_shutdown.cpp @@ -6,6 +6,7 @@ #include "trick/message_type.h" #include "trick/tc_proto.h" #include "trick/exec_proto.h" +#include "trick/SysThread.hh" /** @par Detailed Design: */ void Trick::MonteCarlo::master_shutdown() { @@ -41,6 +42,8 @@ void Trick::MonteCarlo::master_shutdown() { except_return = -2 ; } + SysThread::ensureAllShutdown(); + exit(except_return); } diff --git a/trick_source/sim_services/MonteCarlo/MonteCarlo_slave_funcs.cpp b/trick_source/sim_services/MonteCarlo/MonteCarlo_slave_funcs.cpp index 777381cf..8fd5bf19 100644 --- a/trick_source/sim_services/MonteCarlo/MonteCarlo_slave_funcs.cpp +++ b/trick_source/sim_services/MonteCarlo/MonteCarlo_slave_funcs.cpp @@ -1,5 +1,7 @@ #include "trick/MonteCarlo.hh" +#include "trick/SysThread.hh" + /** @par Detailed Design: */ void Trick::MonteCarlo::slave_shutdown() { @@ -8,6 +10,8 @@ void Trick::MonteCarlo::slave_shutdown() { /**
  • Run the shutdown jobs and exit. */ run_queue(&slave_shutdown_queue, "in slave_shutdown queue") ; + SysThread::ensureAllShutdown(); + exit(0); } @@ -15,6 +19,9 @@ void Trick::MonteCarlo::slave_shutdown() { void Trick::MonteCarlo::slave_die() { /**
    • Kill any active child process executing a run and exit immediately. */ slave_kill_run(); + + SysThread::ensureAllShutdown(); + exit(1); } diff --git a/trick_source/sim_services/ThreadBase/SysThread.cpp b/trick_source/sim_services/ThreadBase/SysThread.cpp index cfb98f6f..a7f613e4 100644 --- a/trick_source/sim_services/ThreadBase/SysThread.cpp +++ b/trick_source/sim_services/ThreadBase/SysThread.cpp @@ -9,11 +9,13 @@ #endif #include #include +#include #include "trick/SysThread.hh" bool Trick::SysThread::shutdown_finished = false; + // Construct On First Use to avoid the Static Initialization Fiasco pthread_mutex_t& Trick::SysThread::list_mutex() { static pthread_mutex_t list_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -30,7 +32,7 @@ std::vector& Trick::SysThread::all_sys_threads() { return all_sys_threads; } -Trick::SysThread::SysThread(std::string in_name, bool sd) : self_deleting(sd), ThreadBase(in_name) { +Trick::SysThread::SysThread(std::string in_name) : ThreadBase(in_name) { pthread_mutex_lock(&(list_mutex())); all_sys_threads().push_back(this); pthread_mutex_unlock(&(list_mutex())); @@ -41,9 +43,6 @@ Trick::SysThread::~SysThread() { pthread_mutex_lock(&(list_mutex())); if (!shutdown_finished) { all_sys_threads().erase(std::remove(all_sys_threads().begin(), all_sys_threads().end(), this), all_sys_threads().end()); - if (all_sys_threads().size() == 0) { - pthread_cond_signal(&(list_empty_cv())); - } } pthread_mutex_unlock(&(list_mutex())); } @@ -51,25 +50,17 @@ Trick::SysThread::~SysThread() { int Trick::SysThread::ensureAllShutdown() { pthread_mutex_lock(&(list_mutex())); + // Cancel all threads for (SysThread * thread : all_sys_threads()) { thread->cancel_thread(); } - auto it = all_sys_threads().begin(); - while (it != all_sys_threads().end()){ - SysThread * thread = *it; - if (!(thread->self_deleting)) { - thread->join_thread(); - it = all_sys_threads().erase(it); - } else { - ++it; - } - } - - while (all_sys_threads().size() != 0) { - pthread_cond_wait(&(list_empty_cv()), &(list_mutex())); + // Join all threads + for (SysThread * thread : all_sys_threads()) { + thread->join_thread(); } + // Success! shutdown_finished = true; pthread_mutex_unlock(&(list_mutex())); diff --git a/trick_source/sim_services/ThreadBase/ThreadBase.cpp b/trick_source/sim_services/ThreadBase/ThreadBase.cpp index 11af01e7..ffe6fd47 100644 --- a/trick_source/sim_services/ThreadBase/ThreadBase.cpp +++ b/trick_source/sim_services/ThreadBase/ThreadBase.cpp @@ -17,8 +17,12 @@ Trick::ThreadBase::ThreadBase(std::string in_name) : name(in_name) , pthread_id(0) , pid(0) , - rt_priority(0) + rt_priority(0), + created(false), + should_shutdown(false), + cancellable(true) { + pthread_mutex_init(&shutdown_mutex, NULL); #if __linux max_cpu = sysconf( _SC_NPROCESSORS_ONLN ) ; #ifdef CPU_ALLOC @@ -274,12 +278,19 @@ int Trick::ThreadBase::execute_priority() { int Trick::ThreadBase::create_thread() { + if (created) { + message_publish(MSG_ERROR, "create_thread called on thread %s (%p) which has already been started.\n", name.c_str(), this); + return 0; + } + pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); pthread_create(&pthread_id, &attr, Trick::ThreadBase::thread_helper , (void *)this); + created = true; + #if __linux #ifdef __GNUC__ #if __GNUC__ >= 4 && __GNUC_MINOR__ >= 2 @@ -294,19 +305,56 @@ int Trick::ThreadBase::create_thread() { } int Trick::ThreadBase::cancel_thread() { + pthread_mutex_lock(&shutdown_mutex); + should_shutdown = true; + pthread_mutex_unlock(&shutdown_mutex); + if ( pthread_id != 0 ) { - pthread_cancel(pthread_id) ; + if (cancellable) + pthread_cancel(pthread_id) ; } return(0) ; } int Trick::ThreadBase::join_thread() { if ( pthread_id != 0 ) { - pthread_join(pthread_id, NULL) ; + if ((errno = pthread_join(pthread_id, NULL)) != 0) { + std::string msg = "Thread " + name + " had an error in join"; + perror(msg.c_str()); + } else { + pthread_id = 0; + } } return(0) ; } +void Trick::ThreadBase::test_shutdown() { + test_shutdown (NULL, NULL); +} + +void Trick::ThreadBase::test_shutdown(void (*exit_handler) (void *), void * exit_arg) { + pthread_mutex_lock(&shutdown_mutex); + if (should_shutdown) { + pthread_mutex_unlock(&shutdown_mutex); + + thread_shutdown(exit_handler, exit_arg); + } + pthread_mutex_unlock(&shutdown_mutex); +} + + +void Trick::ThreadBase::thread_shutdown() { + thread_shutdown (NULL, NULL); +} + +void Trick::ThreadBase::thread_shutdown(void (*exit_handler) (void *), void * exit_arg) { + if (exit_handler != NULL) { + exit_handler(exit_arg); + } + + pthread_exit(0); +} + void * Trick::ThreadBase::thread_helper( void * context ) { sigset_t sigs; diff --git a/trick_source/sim_services/VariableServer/Makefile_deps b/trick_source/sim_services/VariableServer/Makefile_deps index a95824db..a4cd9fd0 100644 --- a/trick_source/sim_services/VariableServer/Makefile_deps +++ b/trick_source/sim_services/VariableServer/Makefile_deps @@ -1116,7 +1116,7 @@ object_${TRICK_HOST_CPU}/VariableServerSession.o: VariableServerSession.cpp \ ${TRICK_HOME}/include/trick/TrickConstant.hh \ ${TRICK_HOME}/include/trick/tc_proto.h object_${TRICK_HOST_CPU}/ClientListener.o: ClientListener.cpp \ - ${TRICK_HOME}/include/trick/TCConnection.hh \ + ${TRICK_HOME}/include/trick/TCPConnection.hh \ ${TRICK_HOME}/include/trick/ClientConnection.hh \ ${TRICK_HOME}/include/trick/ClientListener.hh \ ${TRICK_HOME}/include/trick/tc.h \ diff --git a/trick_source/sim_services/VariableServer/VariableReference.cpp b/trick_source/sim_services/VariableServer/VariableReference.cpp index dcaa4925..ba9f97d6 100644 --- a/trick_source/sim_services/VariableServer/VariableReference.cpp +++ b/trick_source/sim_services/VariableServer/VariableReference.cpp @@ -15,7 +15,7 @@ #include "trick/UdUnits.hh" #include "trick/bitfield_proto.h" #include "trick/trick_byteswap.h" -#include "trick/tc_proto.h" +// #include "trick/tc_proto.h" // Static variables to be addresses that are known to be the error ref address diff --git a/trick_source/sim_services/VariableServer/VariableServerListenThread.cpp b/trick_source/sim_services/VariableServer/VariableServerListenThread.cpp index 72ac7ea4..c5ea2163 100644 --- a/trick_source/sim_services/VariableServer/VariableServerListenThread.cpp +++ b/trick_source/sim_services/VariableServer/VariableServerListenThread.cpp @@ -10,16 +10,22 @@ #include "trick/message_proto.h" #include "trick/message_type.h" +#define MAX_MACHINE_NAME 80 Trick::VariableServerListenThread::VariableServerListenThread() : Trick::SysThread("VarServListen"), requested_port(0), - requested_source_address(""), user_requested_address(false), broadcast(true), listener() { pthread_mutex_init(&restart_pause, NULL); + + char hname[MAX_MACHINE_NAME]; + gethostname(hname, MAX_MACHINE_NAME); + requested_source_address = std::string(hname); + + cancellable = false; } Trick::VariableServerListenThread::~VariableServerListenThread() { @@ -31,7 +37,10 @@ Trick::VariableServerListenThread::~VariableServerListenThread() { const char * Trick::VariableServerListenThread::get_hostname() { - const char * ret = listener.getHostname(); + std::string hostname = listener.getHostname(); + char * ret = (char *) malloc(hostname.length() + 1); + strncpy(ret, hostname.c_str(), hostname.length()); + ret[hostname.length()] = '\0'; return ret; } @@ -81,44 +90,45 @@ void Trick::VariableServerListenThread::set_broadcast(bool in_broadcast) { int Trick::VariableServerListenThread::init_listen_device() { int ret = listener.initialize(); requested_port = listener.getPort(); - requested_source_address = std::string(listener.getHostname()); + user_requested_address = true; return ret; } int Trick::VariableServerListenThread::check_and_move_listen_device() { int ret ; - if ( user_requested_address ) { - - /* The user has requested a different source address or port in the input file */ - listener.disconnect(); - ret = listener.initialize(requested_source_address, requested_port); - requested_port = listener.getPort(); - requested_source_address = std::string(listener.getHostname()); - if (ret != TC_SUCCESS) { - std::cout << "Unsuccessful initialization " << std::endl; - message_publish(MSG_ERROR, "ERROR: Could not establish variable server source_address %s: port %d. Aborting.\n", - requested_source_address.c_str(), requested_port); - return -1 ; - } + /* The user has requested a different source address or port in the input file */ + listener.disconnect(); + ret = listener.initialize(requested_source_address, requested_port); + requested_port = listener.getPort(); + requested_source_address = listener.getHostname(); + if (ret != 0) { + message_publish(MSG_ERROR, "ERROR: Could not establish variable server source_address %s: port %d. Aborting.\n", + requested_source_address.c_str(), requested_port); + return -1 ; } - return 0 ; } void Trick::VariableServerListenThread::create_tcp_socket(const char * address, unsigned short in_port ) { listener.initialize(address, in_port); + requested_source_address = listener.getHostname(); + requested_port = listener.getPort(); + user_requested_address = true; + + message_publish(MSG_INFO, "Created TCP variable server %s: %d\n", requested_source_address.c_str(), in_port); } void * Trick::VariableServerListenThread::thread_body() { // This thread listens for incoming client connections, and when one is received, creates a new thread to handle the session // Also broadcasts on multicast channel + test_shutdown(); + + std::string version = std::string(exec_get_current_version()) ; version.erase(version.find_last_not_of(" \t\f\v\n\r")+1); - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - // get username to broadcast on multicast channel struct passwd *passp = getpwuid(getuid()) ; std::string user_name; @@ -128,38 +138,49 @@ void * Trick::VariableServerListenThread::thread_body() { user_name = strdup(passp->pw_name) ; } - listener.setBlockMode(TC_COMM_BLOCKIO); + listener.setBlockMode(true); if ( broadcast ) { initializeMulticast(); } while (1) { + // Quit here if it's time + test_shutdown(); // Look for a new client requesting a connection if (listener.checkForNewConnections()) { // pause here during restart pthread_mutex_lock(&restart_pause) ; + // Recheck - sometimes we get false positive if something happens during restart + if (!listener.checkForNewConnections()) { + pthread_mutex_unlock(&restart_pause) ; + continue; + } + // Create a new thread to service this connection - VariableServerThread * vst = new Trick::VariableServerThread(&listener) ; + VariableServerThread * vst = new Trick::VariableServerThread() ; + vst->open_tcp_connection(&listener) ; vst->copy_cpus(get_cpus()) ; vst->create_thread() ; - vst->wait_for_accept() ; + ConnectionStatus status = vst->wait_for_accept() ; + + if (status == CONNECTION_FAIL) { + // If the connection failed, the thread will exit. + // Make sure it joins fully before deleting the vst object + vst->join_thread(); + delete vst; + } pthread_mutex_unlock(&restart_pause) ; + } else if ( broadcast ) { -<<<<<<< HEAD - snprintf(buf1 , sizeof(buf1), "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , listen_dev.hostname , (unsigned short)listen_dev.port , - user_name , (int)getpid() , command_line_args_get_default_dir() , command_line_args_get_cmdline_name() , - command_line_args_get_input_file() , version.c_str() , user_tag.c_str(), (unsigned short)listen_dev.port ) ; -======= // Otherwise, broadcast on the multicast channel if enabled char buf1[1024]; sprintf(buf1 , "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , listener.getHostname(), (unsigned short)listener.getPort() , user_name.c_str() , (int)getpid() , command_line_args_get_default_dir() , command_line_args_get_cmdline_name() , command_line_args_get_input_file() , version.c_str() , user_tag.c_str(), (unsigned short)listener.getPort() ) ; ->>>>>>> e8ba4328 (Encapsulated listen device into ClientListener) std::string message = buf1; @@ -180,12 +201,16 @@ int Trick::VariableServerListenThread::restart() { int ret ; + listener.restart(); + if ( user_requested_address ) { if (!listener.validateSourceAddress(requested_source_address)) { requested_source_address.clear() ; } + printf("variable server restart user_port requested set %s:%d\n",requested_source_address.c_str(), requested_port); + listener.disconnect(); ret = listener.initialize(requested_source_address, requested_port); @@ -195,6 +220,7 @@ int Trick::VariableServerListenThread::restart() { } } else { listener.checkSocket(); + printf("restart variable server message port = %d\n", listener.getPort()); } initializeMulticast(); @@ -213,6 +239,7 @@ void Trick::VariableServerListenThread::pause_listening() { } void Trick::VariableServerListenThread::restart_listening() { + listener.restart(); pthread_mutex_unlock(&restart_pause) ; } diff --git a/trick_source/sim_services/VariableServer/VariableServerSession_commands.cpp b/trick_source/sim_services/VariableServer/VariableServerSession_commands.cpp index e1d837ca..bd399503 100644 --- a/trick_source/sim_services/VariableServer/VariableServerSession_commands.cpp +++ b/trick_source/sim_services/VariableServer/VariableServerSession_commands.cpp @@ -303,7 +303,7 @@ int Trick::VariableServerSession::send_list_size() { memcpy(&(buf1[8]), &var_count, sizeof(var_count)); if (debug >= 2) { - message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending %d event variables\n", connection, connection->get_client_tag().c_str(), var_count); + message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending %d event variables\n", connection, connection->getClientTag().c_str(), var_count); } connection->write(buf1, sizeof (buf1)); @@ -312,7 +312,7 @@ int Trick::VariableServerSession::send_list_size() { write_string << VS_LIST_SIZE << "\t" << var_count << "\n"; // ascii if (debug >= 2) { - message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending number of event variables:\n%s\n", connection, connection->get_client_tag().c_str(), write_string.str().c_str()) ; + message_publish(MSG_DEBUG, "%p tag=<%s> var_server sending number of event variables:\n%s\n", connection, connection->getClientTag().c_str(), write_string.str().c_str()) ; } connection->write(write_string.str()); @@ -331,7 +331,7 @@ int Trick::VariableServerSession::transmit_file(std::string sie_file) { int ret ; if (debug >= 2) { - message_publish(MSG_DEBUG,"%p tag=<%s> var_server opening %s.\n", connection, connection->get_client_tag().c_str(), sie_file.c_str()) ; + message_publish(MSG_DEBUG,"%p tag=<%s> var_server opening %s.\n", connection, connection->getClientTag().c_str(), sie_file.c_str()) ; } if ((fp = fopen(sie_file.c_str() , "r")) == NULL ) { @@ -351,8 +351,8 @@ int Trick::VariableServerSession::transmit_file(std::string sie_file) { rewind(fp) ; // Switch to blocking writes since this could be a large transfer. - if (connection->setBlockMode(TC_COMM_BLOCKIO)) { - message_publish(MSG_DEBUG,"Variable Server Error: Failed to set TCDevice to TC_COMM_BLOCKIO.\n"); + if (connection->setBlockMode(true)) { + message_publish(MSG_DEBUG,"Variable Server Error: Failed to set socket to blocking mode.\n"); } while ( current_size < file_size ) { @@ -368,8 +368,8 @@ int Trick::VariableServerSession::transmit_file(std::string sie_file) { } // Switch back to non-blocking writes. - if (connection->setBlockMode(TC_COMM_NOBLOCKIO)) { - message_publish(MSG_ERROR,"Variable Server Error: Failed to set TCDevice to TC_COMM_NOBLOCKIO.\n"); + if (connection->setBlockMode(false)) { + message_publish(MSG_DEBUG,"Variable Server Error: Failed to set socket to non-blocking mode.\n"); return(-1); } diff --git a/trick_source/sim_services/VariableServer/VariableServerThread.cpp b/trick_source/sim_services/VariableServer/VariableServerThread.cpp index 68548fc2..659df23a 100644 --- a/trick_source/sim_services/VariableServer/VariableServerThread.cpp +++ b/trick_source/sim_services/VariableServer/VariableServerThread.cpp @@ -3,23 +3,29 @@ #include #include "trick/VariableServerThread.hh" #include "trick/exec_proto.h" +#include "trick/message_proto.h" +#include "trick/message_type.h" #include "trick/TrickConstant.hh" +#include "trick/UDPConnection.hh" +#include "trick/TCPConnection.hh" + Trick::VariableServer * Trick::VariableServerThread::vs = NULL ; -Trick::VariableServerThread::VariableServerThread(ClientListener * in_listen_dev) : - Trick::SysThread("VarServer") , debug(0), - listener(in_listen_dev), session(NULL) { +static int instance_num = 0; + +Trick::VariableServerThread::VariableServerThread() : + Trick::SysThread(std::string("VarServer" + std::to_string(instance_num++))) , debug(0), session(NULL), connection(NULL) { connection_status = CONNECTION_PENDING ; - connection.initialize(); pthread_mutex_init(&connection_status_mutex, NULL); pthread_cond_init(&connection_status_cv, NULL); pthread_mutex_init(&restart_pause, NULL); + cancellable = false; } Trick::VariableServerThread::~VariableServerThread() {} @@ -30,19 +36,23 @@ std::ostream& Trick::operator<< (std::ostream& s, Trick::VariableServerThread& v socklen_t len = (socklen_t)sizeof(otherside); s << " \"connection\":{\n"; - s << " \"client_tag\":\"" << vst.connection.get_client_tag() << "\",\n"; + s << " \"client_tag\":\"" << vst.connection->getClientTag() << "\",\n"; - int err = getpeername(vst.connection.get_socket(), (struct sockaddr*)&otherside, &len); + // int err = getpeername(vst.connection->get_socket(), (struct sockaddr*)&otherside, &len); - if (err == 0) { - s << " \"client_IP_address\":\"" << inet_ntoa(otherside.sin_addr) << "\",\n"; - s << " \"client_port\":\"" << ntohs(otherside.sin_port) << "\",\n"; - } else { - s << " \"client_IP_address\":\"unknown\",\n"; - s << " \"client_port\":\"unknown\","; + // if (err == 0) { + // s << " \"client_IP_address\":\"" << inet_ntoa(otherside.sin_addr) << "\",\n"; + // s << " \"client_port\":\"" << ntohs(otherside.sin_port) << "\",\n"; + // } else { + // s << " \"client_IP_address\":\"unknown\",\n"; + // s << " \"client_port\":\"unknown\","; + // } + + pthread_mutex_lock(&vst.connection_status_mutex); + if (vst.connection_status == CONNECTION_SUCCESS) { + s << *(vst.session); } - - s << *(vst.session); + pthread_mutex_unlock(&vst.connection_status_mutex); s << " }" << std::endl; return s; @@ -57,7 +67,26 @@ Trick::VariableServer * Trick::VariableServerThread::get_vs() { } void Trick::VariableServerThread::set_client_tag(std::string tag) { - connection.set_client_tag(tag); + connection->setClientTag(tag); +} + +int Trick::VariableServerThread::open_udp_socket(const std::string& hostname, int port) { + UDPConnection * udp_conn = new UDPConnection(); + int status = udp_conn->initialize(hostname, port); + + connection = udp_conn; + + if (status == 0) { + message_publish(MSG_INFO, "Created UDP variable server %s: %d\n", udp_conn->getHostname().c_str(), udp_conn->getPort()); + } + + return status; +} + +int Trick::VariableServerThread::open_tcp_connection(ClientListener * listener) { + connection = listener->setUpNewConnection(); + + return 0; } Trick::ConnectionStatus Trick::VariableServerThread::wait_for_accept() { @@ -79,33 +108,52 @@ void Trick::VariableServerThread::preload_checkpoint() { // Stop variable server processing at the top of the processing loop. pthread_mutex_lock(&restart_pause); - // Let the thread complete any data copying it has to do - // and then suspend data copying until the checkpoint is reloaded. - pthread_mutex_lock(&(session->copy_mutex)); + // Make sure that the session has been initialized + pthread_mutex_lock(&connection_status_mutex); + if (connection_status == CONNECTION_SUCCESS) { - // Save the pause state of this thread. - saved_pause_cmd = session->get_pause(); + // Let the thread complete any data copying it has to do + // and then suspend data copying until the checkpoint is reloaded. + pthread_mutex_lock(&(session->copy_mutex)); + + // Save the pause state of this thread. + saved_pause_cmd = session->get_pause(); - // Disallow data writing. - session->set_pause(true); + // Disallow data writing. + session->set_pause(true); - // Temporarily "disconnect" the variable references from Trick Managed Memory - // by tagging each as a "bad reference". - session->disconnect_references(); + // Temporarily "disconnect" the variable references from Trick Managed Memory + // by tagging each as a "bad reference". + session->disconnect_references(); - // Allow data copying to continue. - pthread_mutex_unlock(&(session->copy_mutex)); - + // Allow data copying to continue. + pthread_mutex_unlock(&(session->copy_mutex)); + + } + pthread_mutex_unlock(&connection_status_mutex); } // Gets called from the main thread as a job void Trick::VariableServerThread::restart() { // Set the pause state of this thread back to its "pre-checkpoint reload" state. - session->set_pause(saved_pause_cmd) ; + connection->restart(); + + pthread_mutex_lock(&connection_status_mutex); + if (connection_status == CONNECTION_SUCCESS) { + session->set_pause(saved_pause_cmd) ; + } + pthread_mutex_unlock(&connection_status_mutex); // Restart the variable server processing. pthread_mutex_unlock(&restart_pause); } +void Trick::VariableServerThread::cleanup() { + connection->disconnect(); + + if (session != NULL) + delete session; +} + diff --git a/trick_source/sim_services/VariableServer/VariableServerThread_loop.cpp b/trick_source/sim_services/VariableServer/VariableServerThread_loop.cpp index 660688a7..7c68d08d 100644 --- a/trick_source/sim_services/VariableServer/VariableServerThread_loop.cpp +++ b/trick_source/sim_services/VariableServer/VariableServerThread_loop.cpp @@ -22,16 +22,19 @@ void exit_var_thread(void *in_vst) ; void * Trick::VariableServerThread::thread_body() { + // Check for short running sims + test_shutdown(NULL, NULL); + // We need to make the thread to VariableServerThread map before we accept the connection. // Otherwise we have a race where this thread is unknown to the variable server and the // client gets confirmation that the connection is ready for communication. vs->add_vst( pthread_self() , this ) ; // Accept client connection - int accept_status = accept(listener, &connection); - if (accept_status != 0) { + int status = connection->start(); + + if (status != 0) { // TODO: Use a real error handler - std::cout << "Accept failed, variable server session exiting" << std::endl; vs->delete_vst(pthread_self()); // Tell main thread that we failed to initialize @@ -40,13 +43,12 @@ void * Trick::VariableServerThread::thread_body() { pthread_cond_signal(&connection_status_cv); pthread_mutex_unlock(&connection_status_mutex); + cleanup(); pthread_exit(NULL); } - connection.setBlockMode(TC_COMM_ALL_OR_NOTHING); - // Create session - session = new VariableServerSession(&connection); + session = new VariableServerSession(connection); vs->add_session( pthread_self(), session ); // Tell main that we are ready @@ -55,10 +57,6 @@ void * Trick::VariableServerThread::thread_body() { pthread_cond_signal(&connection_status_cv); pthread_mutex_unlock(&connection_status_mutex); - - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL) ; - pthread_cleanup_push(exit_var_thread, (void *) this) ; - // if log is set on for variable server (e.g., in input file), turn log on for each client if (vs->get_log()) { session->set_log_on(); @@ -66,6 +64,9 @@ void * Trick::VariableServerThread::thread_body() { try { while (1) { + // Shutdown here if it's time + test_shutdown(exit_var_thread, (void *) this); + // Pause here if we are in a restart condition pthread_mutex_lock(&restart_pause) ; @@ -75,6 +76,7 @@ void * Trick::VariableServerThread::thread_body() { // Check to see if exit is necessary if (session->exit_cmd == true) { + pthread_mutex_unlock(&restart_pause) ; break; } @@ -91,6 +93,7 @@ void * Trick::VariableServerThread::thread_body() { if ( should_write_async && !session->get_pause()) { int ret = session->write_data() ; if ( ret < 0 ) { + pthread_mutex_unlock(&restart_pause) ; break ; } } @@ -132,11 +135,11 @@ void * Trick::VariableServerThread::thread_body() { } if (debug >= 3) { - message_publish(MSG_DEBUG, "%p tag=<%s> var_server receive loop exiting\n", &connection, connection.get_client_tag().c_str()); + message_publish(MSG_DEBUG, "%p tag=<%s> var_server receive loop exiting\n", connection, connection->getClientTag().c_str()); } - pthread_cleanup_pop(1); - pthread_exit(NULL) ; + thread_shutdown(exit_var_thread, this); + return NULL ; } diff --git a/trick_source/sim_services/VariableServer/VariableServer_get_var_server_port.cpp b/trick_source/sim_services/VariableServer/VariableServer_get_var_server_port.cpp index 322bccc5..0e009dbd 100644 --- a/trick_source/sim_services/VariableServer/VariableServer_get_var_server_port.cpp +++ b/trick_source/sim_services/VariableServer/VariableServer_get_var_server_port.cpp @@ -8,20 +8,21 @@ int Trick::VariableServer::create_tcp_socket(const char * address, unsigned shor new_listen_thread->create_tcp_socket(address, in_port) ; new_listen_thread->copy_cpus(listen_thread.get_cpus()) ; new_listen_thread->create_thread() ; - // additional_listen_threads[new_listen_thread->get_pthread_id()] = new_listen_thread ; + additional_listen_threads[new_listen_thread->get_pthread_id()] = new_listen_thread ; return 0 ; } int Trick::VariableServer::create_udp_socket(const char * address, unsigned short in_port ) { - // int ret ; - // Trick::VariableServerThread * vst ; - // vst = new Trick::VariableServerThread(NULL) ; - // ret = vst->create_udp_socket(address, in_port) ; - // if ( ret == 0 ) { - // vst->copy_cpus(listen_thread.get_cpus()) ; - // vst->create_thread() ; - // } + // UDP sockets are created without a listen thread + int ret ; + Trick::VariableServerThread * vst ; + vst = new Trick::VariableServerThread() ; + ret = vst->open_udp_socket(address, in_port) ; + if ( ret == 0 ) { + vst->copy_cpus(listen_thread.get_cpus()) ; + vst->create_thread() ; + } //vst->var_debug(3) ; return 0 ; diff --git a/trick_source/sim_services/VariableServer/VariableServer_restart.cpp b/trick_source/sim_services/VariableServer/VariableServer_restart.cpp index 7bd8c0f8..dc312a3a 100644 --- a/trick_source/sim_services/VariableServer/VariableServer_restart.cpp +++ b/trick_source/sim_services/VariableServer/VariableServer_restart.cpp @@ -9,10 +9,10 @@ int Trick::VariableServer::restart() { if ( listen_thread.get_pthread_id() == 0 ) { listen_thread.create_thread() ; } - // std::map < pthread_t , VariableServerListenThread * >::iterator it ; - // for( it = additional_listen_threads.begin() ; it != additional_listen_threads.end() ; it++ ) { - // (*it).second->restart() ; - // } + std::map < pthread_t , VariableServerListenThread * >::iterator it ; + for( it = additional_listen_threads.begin() ; it != additional_listen_threads.end() ; it++ ) { + (*it).second->restart() ; + } return 0 ; } @@ -26,6 +26,9 @@ int Trick::VariableServer::suspendPreCheckpointReload() { std::map::iterator pos ; listen_thread.pause_listening() ; + for (const auto& listen_it : additional_listen_threads) { + listen_it.second->pause_listening(); + } pthread_mutex_lock(&map_mutex) ; for ( pos = var_server_threads.begin() ; pos != var_server_threads.end() ; pos++ ) { @@ -50,6 +53,9 @@ int Trick::VariableServer::resumePostCheckpointReload() { pthread_mutex_unlock(&map_mutex) ; listen_thread.restart_listening() ; + for (const auto& listen_it : additional_listen_threads) { + listen_it.second->restart_listening(); + } return 0; } diff --git a/trick_source/sim_services/VariableServer/VariableServer_shutdown.cpp b/trick_source/sim_services/VariableServer/VariableServer_shutdown.cpp index 14bd7830..84dd71bc 100644 --- a/trick_source/sim_services/VariableServer/VariableServer_shutdown.cpp +++ b/trick_source/sim_services/VariableServer/VariableServer_shutdown.cpp @@ -3,20 +3,16 @@ int Trick::VariableServer::shutdown() { listen_thread.cancel_thread() ; - std::map < pthread_t , VariableServerThread * >::iterator it ; + for (const auto& listen_it : additional_listen_threads) { + listen_it.second->cancel_thread(); + } pthread_mutex_lock(&map_mutex) ; - std::vector ids; - for ( it = var_server_threads.begin() ; it != var_server_threads.end() ; it++ ) { - (*it).second->cancel_thread() ; - ids.push_back((*it).first); - // cancelling causes each var_server_thread map element to be erased by the exit_var_thread function + for (const auto& it : var_server_threads) { + it.second->cancel_thread() ; } pthread_mutex_unlock(&map_mutex) ; - for (pthread_t id : ids) { - pthread_join(id, NULL); - } return 0 ; } diff --git a/trick_source/sim_services/VariableServer/exit_var_thread.cpp b/trick_source/sim_services/VariableServer/exit_var_thread.cpp index e40996c6..f8a02d85 100644 --- a/trick_source/sim_services/VariableServer/exit_var_thread.cpp +++ b/trick_source/sim_services/VariableServer/exit_var_thread.cpp @@ -1,21 +1,19 @@ #include "trick/VariableServer.hh" -#include "trick/tc_proto.h" void exit_var_thread(void *in_vst) { Trick::VariableServerThread * vst = (Trick::VariableServerThread *) in_vst ; Trick::VariableServer * vs = vst->get_vs() ; - // tc_disconnect(&vst->get_connection()); - + if (vst->get_pthread_id() != pthread_self()) { + std::cerr << "exit_var_thread must be called from the variable server thread" << std::endl; + } + vs->delete_session(vst->get_pthread_id()); + // Tell the variable server that this thread is exiting. vs->delete_vst(vst->get_pthread_id()) ; - // This will deleting the vst object from within the object itself. exit_var_thread - // is called from within Trick::VariableServerThread::thread_body. - // I am claiming this is safe as this is the last thing routing is the last thing - // that touches vst. The C++ FAQ says it's safe if you can say this. (Alex 6/9/14) - delete vst ; + vst->cleanup(); } diff --git a/trick_source/sim_services/VariableServer/test/TestConnection.hh b/trick_source/sim_services/VariableServer/test/TestConnection.hh index 0372dac4..89eea807 100644 --- a/trick_source/sim_services/VariableServer/test/TestConnection.hh +++ b/trick_source/sim_services/VariableServer/test/TestConnection.hh @@ -61,7 +61,7 @@ class TestConnection : public Trick::ClientConnection { client_tag = tag; } - int setBlockMode (int mode) { + int set_block_mode (int mode) { return 0; } diff --git a/trick_source/sim_services/mains/master.cpp b/trick_source/sim_services/mains/master.cpp index 72c8a46e..444cae29 100644 --- a/trick_source/sim_services/mains/master.cpp +++ b/trick_source/sim_services/mains/master.cpp @@ -44,7 +44,6 @@ int master( int nargs, char **args) { if ( ret == 0 ) { exec->loop() ; } - ret = exec->shutdown() ; //TODO: add call to free all memory from memory manager diff --git a/trick_source/trick_utils/connection_handlers/ClientConnection.cpp b/trick_source/trick_utils/connection_handlers/ClientConnection.cpp index 3df752b4..f6945bc9 100644 --- a/trick_source/trick_utils/connection_handlers/ClientConnection.cpp +++ b/trick_source/trick_utils/connection_handlers/ClientConnection.cpp @@ -1 +1,10 @@ -#include "trick/ClientConnection.hh" \ No newline at end of file +#include "trick/ClientConnection.hh" + +std::string Trick::ClientConnection::getClientTag () { + return _client_tag; +} + +int Trick::ClientConnection::setClientTag (std::string tag) { + _client_tag = tag; + return 0; +} \ No newline at end of file diff --git a/trick_source/trick_utils/connection_handlers/ClientListener.cpp b/trick_source/trick_utils/connection_handlers/ClientListener.cpp index 331e253b..9d7cfc96 100644 --- a/trick_source/trick_utils/connection_handlers/ClientListener.cpp +++ b/trick_source/trick_utils/connection_handlers/ClientListener.cpp @@ -1,115 +1,245 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include #include "trick/ClientListener.hh" -#include "trick/tc_proto.h" - - -Trick::ClientListener::ClientListener () : _listen_dev(), initialized(false) { - char hname[80]; - gethostname(hname , (size_t) 80 ) ; - saved_source = std::string(hname); - strcpy(_listen_dev.client_tag, ""); - tc_error(&_listen_dev, 0); -} +Trick::ClientListener::ClientListener () : ClientListener (new SystemInterface()) {} +Trick::ClientListener::ClientListener (SystemInterface * system_interface) : _listen_socket(-1), _hostname(""), _port(0), _client_tag(""), _initialized(false), _system_interface(system_interface) {} Trick::ClientListener::~ClientListener () { + // Clean up our socket if initialized + if (_initialized) { + close (_listen_socket); + } + delete _system_interface; } -int Trick::ClientListener::initialize(std::string hostname, int port) { - int ret = tc_init_with_connection_info(&_listen_dev, AF_INET, SOCK_STREAM, hostname.c_str(), port); - initialized = true; - return ret; +int Trick::ClientListener::initialize(std::string in_hostname, int in_port) { + + if ((_listen_socket = _system_interface->socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror ("Server: Unable to open socket"); + return LISTENER_ERROR; + } + + int option_val = 1; + + // Do not replicate the socket's handle if this process is forked + // Removing this will cause all kinds of problems when starting a variable server client via input file + _system_interface->fcntl(_listen_socket, F_SETFD, FD_CLOEXEC); + + // Allow socket's bound address to be used by others + if (_system_interface->setsockopt(_listen_socket, SOL_SOCKET, SO_REUSEADDR, (const char *) &option_val, (socklen_t) sizeof(option_val)) != 0) { + perror("Server: Could not set socket to reuse addr"); + _system_interface->close (_listen_socket); + return LISTENER_ERROR; + } + // Turn off data buffering on the send side + if (_system_interface->setsockopt(_listen_socket, IPPROTO_TCP, TCP_NODELAY, (const void *) &option_val, (socklen_t) sizeof(option_val)) != 0) { + perror("Server: Could not turn off data buffering"); + _system_interface->close (_listen_socket); + return LISTENER_ERROR; + } + + struct sockaddr_in s_in; + memset(&s_in, 0 , sizeof(struct sockaddr_in)) ; + + // Look up the hostname + char name[80]; + gethostname(name, (size_t) 80); + + struct hostent *ip_host ; + socklen_t s_in_size = sizeof(s_in) ; + + s_in.sin_family = AF_INET; + + if (in_hostname == "" || in_hostname == "localhost" || strcmp(in_hostname.c_str(),name) == 0) { + s_in.sin_addr.s_addr = INADDR_ANY; + _hostname = std::string(name); + } else if ( inet_pton(AF_INET, in_hostname.c_str(), (struct in_addr *)&s_in.sin_addr.s_addr) == 1 ) { + /* numeric character string address */ + _hostname = in_hostname; + } else if ( (ip_host = gethostbyname(in_hostname.c_str())) != NULL ) { + /* some name other than the default name was given */ + memcpy((void *) &(s_in.sin_addr.s_addr), (const void *) ip_host->h_addr, (size_t) ip_host->h_length); + _hostname = in_hostname; + } else { + perror("Server: Could not determine source address"); + return -1 ; + } + + // Set port + s_in.sin_port = htons((short) in_port); + + // Bind to socket + if (_system_interface->bind(_listen_socket, (struct sockaddr *)&s_in, sizeof(s_in)) < 0) { + + perror("Server: Could not bind to socket"); + _system_interface->close (_listen_socket); + return LISTENER_ERROR; + } + + // Check that correct port was bound to + _system_interface->getsockname( _listen_socket , (struct sockaddr *)&s_in, &s_in_size) ; + int bound_port = ntohs(s_in.sin_port); + + if (in_port != 0 && bound_port != in_port) { + std::cerr << "Server: Could not bind to requested port " << in_port << std::endl; + _system_interface->close(_listen_socket); + return LISTENER_ERROR; + } + + // Save port number + _port = bound_port; + + + // Start listening + if (_system_interface->listen(_listen_socket, SOMAXCONN) < 0) { + std::string error_message = "Server: Could not listen on port " + std::to_string(_port); + perror (error_message.c_str()); + _system_interface->close(_listen_socket); + return LISTENER_ERROR; + } + + // Done! + _initialized = true; + return 0; } int Trick::ClientListener::initialize() { - int ret = tc_init(&_listen_dev); - if (ret != TC_SUCCESS) { - - fprintf(stderr, "ERROR: Could not establish listen port for Variable Server. Aborting.\n"); - ret = -1 ; - } - initialized = true; - return ret; + return initialize("", 0); } -int Trick::ClientListener::setBlockMode(TCCommBlocking mode) { - if (!initialized) +int Trick::ClientListener::setBlockMode(bool blocking) { + if (!_initialized) return -1; - return tc_blockio(&_listen_dev, mode); + int flag = _system_interface->fcntl(_listen_socket, F_GETFL, 0); + + if (flag == -1) { + std::string error_message = "Unable to get flags for fd " + std::to_string(_listen_socket) + " block mode to " + std::to_string(blocking); + perror (error_message.c_str()); + return -1; + } + + if (blocking) { + flag &= ~O_NONBLOCK; + } else { + flag |= O_NONBLOCK; + } + + if (_system_interface->fcntl(_listen_socket, F_SETFL, flag) == -1) { + std::string error_message = "Unable to set fd " + std::to_string(_listen_socket) + " block mode to " + std::to_string(blocking); + perror (error_message.c_str()); + return -1; + } + + return 0; } bool Trick::ClientListener::checkForNewConnections() { - if (!initialized) - return -1; + if (!_initialized) + return false; fd_set rfds; struct timeval timeout_time = { 2, 0 }; FD_ZERO(&rfds); - FD_SET(_listen_dev.socket, &rfds); + FD_SET(_listen_socket, &rfds); timeout_time.tv_sec = 2 ; - select(_listen_dev.socket + 1, &rfds, NULL, NULL, &timeout_time); - return FD_ISSET(_listen_dev.socket, &rfds); + // Listen with a timeout of 2 seconds + int result = _system_interface->select(_listen_socket + 1, &rfds, NULL, NULL, &timeout_time); + + // If there's some kind of error, just ignore it and return false + if (result < 0) { + return false; + } + + return FD_ISSET(_listen_socket, &rfds); } -const char * Trick::ClientListener::getHostname () { - if (!initialized) + +std::string Trick::ClientListener::getHostname () { + if (!_initialized) return ""; - return _listen_dev.hostname; + return _hostname; } int Trick::ClientListener::getPort() { - if (!initialized) + if (!_initialized) return -1; - return _listen_dev.port; + return _port; } int Trick::ClientListener::disconnect() { - if (!initialized) + if (!_initialized) { return -1; + } - return tc_disconnect(&_listen_dev) ; + _system_interface->shutdown(_listen_socket, SHUT_RDWR); + _system_interface->close (_listen_socket); + _initialized = false; + + return 0; } bool Trick::ClientListener::validateSourceAddress(std::string requested_source_address) { - char hname[80]; - static struct sockaddr_in s_in; - gethostname(hname, (size_t) 80); + struct addrinfo hints, *res; + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; - // Test to see if the restart address is on this machine. If it is not, it's not an error - if ( strcmp( requested_source_address.c_str(), hname )) { - if (! inet_pton(AF_INET, requested_source_address.c_str(), (struct in_addr *)&s_in.sin_addr.s_addr) ) { - return false; - } + int err; + if ((err = _system_interface->getaddrinfo(requested_source_address.c_str(), 0, &hints, &res)) != 0) { + std::cerr << "Unable to lookup address: " << gai_strerror(err) << std::endl; + return false; } return true; } int Trick::ClientListener::checkSocket() { - if (!initialized) + if (!_initialized) return -1; struct sockaddr_in s_in; - int s_in_size = sizeof(s_in) ; - getsockname( _listen_dev.socket , (struct sockaddr *)&s_in, (socklen_t *)&s_in_size) ; - printf("restart variable server message port = %d\n" , ntohs(s_in.sin_port)) ; - _listen_dev.port = ntohs(s_in.sin_port); + socklen_t s_in_size = sizeof(s_in) ; + _system_interface->getsockname( _listen_socket , (struct sockaddr *)&s_in, &s_in_size) ; + _port = ntohs(s_in.sin_port); + + return 0; } bool Trick::ClientListener::isInitialized() { - return initialized; + return _initialized; } +Trick::TCPConnection * Trick::ClientListener::setUpNewConnection () { + if (!_initialized) + return NULL; + + TCPConnection * connection = new TCPConnection(_listen_socket); + return connection; +} + +int Trick::ClientListener::restart () { + _system_interface = new SystemInterface(); +} + + diff --git a/trick_source/trick_utils/connection_handlers/MulticastManager.cpp b/trick_source/trick_utils/connection_handlers/MulticastManager.cpp index 7dd8c10e..f5b82a31 100644 --- a/trick_source/trick_utils/connection_handlers/MulticastManager.cpp +++ b/trick_source/trick_utils/connection_handlers/MulticastManager.cpp @@ -23,6 +23,7 @@ int Trick::MulticastManager::broadcast (std::string message) { for (struct sockaddr_in& address : addresses) { sendto(mcast_socket , message_send , strlen(message_send) , 0 , (struct sockaddr *)&address , (socklen_t)sizeof(address)) ; } + return 0; } int Trick::MulticastManager::addAddress (std::string addr, int port) { @@ -33,6 +34,7 @@ int Trick::MulticastManager::addAddress (std::string addr, int port) { mcast_addr.sin_addr.s_addr = inet_addr(addr.c_str()); mcast_addr.sin_port = htons((uint16_t) port); addresses.emplace_back(mcast_addr); + return 0; } int Trick::MulticastManager::is_initialized () { diff --git a/trick_source/trick_utils/connection_handlers/TCConnection.cpp b/trick_source/trick_utils/connection_handlers/TCConnection.cpp deleted file mode 100644 index 51f398cf..00000000 --- a/trick_source/trick_utils/connection_handlers/TCConnection.cpp +++ /dev/null @@ -1,117 +0,0 @@ -#include "trick/TCConnection.hh" -#include "trick/tc.h" -#include "trick/tc_proto.h" -#include -#include - -Trick::TCConnection::TCConnection () {} - -int Trick::accept(ClientListener *listener, Trick::TCConnection *connection) { - if (!listener->isInitialized()) - return -1; - - if ( listener->_listen_dev.socket_type == SOCK_STREAM ) { - return tc_accept(&(listener->_listen_dev), &(connection->_device)); - } - - return 0; -} - -int Trick::TCConnection::initialize() { - _device.disabled = TC_COMM_FALSE ; - _device.disable_handshaking = TC_COMM_TRUE ; - _device.blockio_limit = 0.0 ; - _device.blockio_type = TC_COMM_BLOCKIO ; - _device.client_id = 0 ; - strcpy(_device.client_tag, "") ; - _device.error_handler = (TrickErrorHndlr *) calloc(1, (int)sizeof(TrickErrorHndlr)); - _device.error_handler->report_level = TRICK_ERROR_CAUTION; -} - -int Trick::TCConnection::write (char * message, int size) { - int ret = tc_write(&_device, message, size); - return ret; -} - -int Trick::TCConnection::write (const std::string& message) { - char send_buf[message.length()+1]; - strcpy (send_buf, message.c_str()); - int ret = tc_write(&_device, send_buf, message.length()); - return ret; -} - -std::string Trick::TCConnection::read (int max_len) { - char incoming_msg[max_len]; - int nbytes = recvfrom( _device.socket, incoming_msg, MAX_CMD_LEN, MSG_PEEK, NULL, NULL ) ; - if (nbytes == 0 ) { - return 0; - } - - if (nbytes != -1) { // -1 means socket is nonblocking and no data to read - /* find the last newline that is present on the socket */ - incoming_msg[nbytes] = '\0' ; - char *last_newline = rindex( incoming_msg , '\n') ; - - /* if there is a newline then there is a complete command on the socket */ - if ( last_newline != NULL ) { - /* only remove up to (and including) the last newline on the socket */ - int size = last_newline - incoming_msg + 1; - nbytes = recvfrom( _device.socket, incoming_msg, size, 0 , NULL, NULL ) ; - } else { - nbytes = 0 ; - } - } - - std::stringstream msg_stream; - - if ( nbytes > 0 ) { - - int msg_len = nbytes ; - // if (debug >= 3) { - // message_publish(MSG_DEBUG, "%p tag=<%s> var_server received bytes = msg_len = %d\n", &_device, _device.client_tag, nbytes); - // } - - incoming_msg[msg_len] = '\0' ; - - for( int ii = 0 , jj = 0 ; ii <= msg_len ; ii++ ) { - if ( incoming_msg[ii] != '\r' ) { - msg_stream << incoming_msg[ii] ; - } - } - } - - return msg_stream.str(); -} - - -std::string Trick::TCConnection::get_client_tag () { - return std::string(_device.client_tag); -} - -int Trick::TCConnection::set_client_tag(std::string tag) { - // Max size of device client tag is 80 - - // TODO: Make 80 a constant somewhere, probably in TC device - if (tag.length() >= 80) { - tag.resize(79); - } - strcpy(_device.client_tag, tag.c_str()); - return 0; -} - -int Trick::TCConnection::get_socket() { - return _device.socket; -} - - -int Trick::TCConnection::disconnect () { - return tc_disconnect(&_device); -} - -int Trick::TCConnection::setBlockMode(int block_mode) { - return tc_blockio(&_device, (TCCommBlocking)block_mode); -} - -int Trick::TCConnection::setErrorReporting (bool on) { - return tc_error(&_device, (int)on); -} \ No newline at end of file diff --git a/trick_source/trick_utils/connection_handlers/TCPConnection.cpp b/trick_source/trick_utils/connection_handlers/TCPConnection.cpp new file mode 100644 index 00000000..604c9976 --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/TCPConnection.cpp @@ -0,0 +1,156 @@ +#include "trick/TCPConnection.hh" + +#include +#include +#include +#include + +Trick::TCPConnection::TCPConnection (SystemInterface * system_interface) : TCPConnection(0, system_interface) {} + +Trick::TCPConnection::TCPConnection (int listen_socket) : TCPConnection(listen_socket, new SystemInterface()) {} + +Trick::TCPConnection::TCPConnection (int listen_socket, SystemInterface * system_interface) : _system_interface(system_interface), _listen_socket(listen_socket), _socket(0), _connected(false) { + _connection_type = TCP; +} + +int Trick::TCPConnection::start() { + if (_listen_socket <= 0) { + return -1; + } + + // Accept a waiting connection + struct sockaddr_storage their_addr; + socklen_t addr_size = sizeof their_addr; + + if ((_socket = _system_interface->accept(_listen_socket, (struct sockaddr *)&their_addr, &addr_size)) < 0) { + perror ("Unable to accept incoming connection"); + return -1; + } + + // Set to non blocking + setBlockMode(false); + + + _connected = true; + + return 0; +} + +int Trick::TCPConnection::write (char * message, int size) { + if (!_connected) + return -1; + + return _system_interface->send(_socket, message, size, 0); +} + +int Trick::TCPConnection::write (const std::string& message) { + if (!_connected) + return -1; + + char send_buf[message.length()+1]; + strcpy (send_buf, message.c_str()); + + return _system_interface->send(_socket, send_buf, message.length(), 0); +} + +std::string Trick::TCPConnection::read (int max_len) { + if (!_connected) { + std::cerr << "Trying to read from a socket that is not connected" << std::endl; + return ""; + } + + char incoming_msg[max_len]; + + int max_receive_length = max_len < MAX_CMD_LEN ? max_len : MAX_CMD_LEN; + int nbytes = _system_interface->recv(_socket, incoming_msg, max_receive_length, MSG_PEEK); + + if (nbytes == 0 ) { + return std::string(""); + } + + if (nbytes == -1) { + if (errno == EAGAIN) { + return std::string(""); + } else { + std::string error_msg = "Error while reading from socket " + std::to_string(_socket); + perror(error_msg.c_str()); + return std::string(""); + } + } + + /* find the last newline that is present on the socket */ + incoming_msg[nbytes] = '\0' ; + char *last_newline = rindex( incoming_msg , '\n') ; + + /* if there is a newline then there is a complete command on the socket */ + if ( last_newline != NULL ) { + /* only remove up to (and including) the last newline on the socket */ + int size = last_newline - incoming_msg + 1; + nbytes = _system_interface->recv( _socket, incoming_msg, size, 0) ; + } else { + nbytes = 0 ; + } + + std::stringstream msg_stream; + + if ( nbytes > 0 ) { + int msg_len = nbytes ; + incoming_msg[msg_len] = '\0' ; + + // Strip off any \r characters + for( int ii = 0 , jj = 0 ; ii <= msg_len ; ii++ ) { + if ( incoming_msg[ii] != '\r' ) { + msg_stream << incoming_msg[ii] ; + } + } + } + + return msg_stream.str(); +} + + +int Trick::TCPConnection::disconnect () { + if (!_connected) { + return -1; + } + + _system_interface->shutdown(_socket, SHUT_RDWR); + _system_interface->close (_socket); + _connected = false; + + return 0; +} + +int Trick::TCPConnection::setBlockMode(bool blocking) { + + int flag = _system_interface->fcntl(_socket, F_GETFL, 0); + + if (flag == -1) { + std::string error_message = "Unable to get flags for fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking); + perror (error_message.c_str()); + return -1; + } + + if (blocking) { + flag &= ~O_NONBLOCK; + } else { + flag |= O_NONBLOCK; + } + + if (_system_interface->fcntl(_socket, F_SETFL, flag) == -1) { + std::string error_message = "Unable to set fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking); + perror (error_message.c_str()); + return -1; + } + + return 0; +} + +bool Trick::TCPConnection::isInitialized() { + return _connected; +} + + +int Trick::TCPConnection::restart() { + _system_interface = new SystemInterface(); +} \ No newline at end of file diff --git a/trick_source/trick_utils/connection_handlers/UDPConnection.cpp b/trick_source/trick_utils/connection_handlers/UDPConnection.cpp new file mode 100644 index 00000000..d8be0276 --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/UDPConnection.cpp @@ -0,0 +1,215 @@ +#include "trick/UDPConnection.hh" + +#include +#include +#include +#include +#include + +Trick::UDPConnection::UDPConnection () : UDPConnection(new SystemInterface()) {} +Trick::UDPConnection::UDPConnection (SystemInterface * system_interface) : _started(false), _initialized(false), _port(0), _hostname(""), _system_interface(system_interface), _socket(0) {} + +int Trick::UDPConnection::initialize(const std::string& in_hostname, int in_port) { + + + if ((_socket = _system_interface->socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + perror ("Server: Unable to open socket"); + return -1; + } + + int option_val = 1; + + // Do not replicate the socket's handle if this process is forked + // Removing this will cause all kinds of problems when starting a variable server client via input file + _system_interface->fcntl(_socket, F_SETFD, FD_CLOEXEC); + + // Allow socket's bound address to be used by others + if (_system_interface->setsockopt(_socket, SOL_SOCKET, SO_REUSEADDR, (const char *) &option_val, (socklen_t) sizeof(option_val)) != 0) { + perror("Server: Could not set socket to reuse addr"); + _system_interface->close (_socket); + return -1; + } + + struct sockaddr_in s_in; + memset(&s_in, 0 , sizeof(struct sockaddr_in)) ; + + // Look up the hostname + char name[80]; + gethostname(name, (size_t) 80); + + struct hostent *ip_host ; + socklen_t s_in_size = sizeof(s_in) ; + + s_in.sin_family = AF_INET; + + if (in_hostname == "" || in_hostname == "localhost" || in_hostname == "127.0.0.1" || strcmp(in_hostname.c_str(),name) == 0) { + s_in.sin_addr.s_addr = INADDR_ANY; + _hostname = std::string(name); + } else if ( inet_pton(AF_INET, in_hostname.c_str(), (struct in_addr *)&s_in.sin_addr.s_addr) == 1 ) { + /* numeric character string address */ + _hostname = in_hostname; + } else if ( (ip_host = gethostbyname(in_hostname.c_str())) != NULL ) { + /* some name other than the default name was given */ + memcpy((void *) &(s_in.sin_addr.s_addr), (const void *) ip_host->h_addr, (size_t) ip_host->h_length); + _hostname = in_hostname; + } else { + perror("UDP socket: Could not determine source address"); + return -1 ; + } + + // Set port + s_in.sin_port = htons((short) in_port); + + // Bind to socket + if (_system_interface->bind(_socket, (struct sockaddr *)&s_in, sizeof(s_in)) < 0) { + + perror("UDP socket: Could not bind to socket"); + _system_interface->close (_socket); + return -1; + } + + // Check that correct port was bound to + _system_interface->getsockname( _socket , (struct sockaddr *)&s_in, &s_in_size) ; + int bound_port = ntohs(s_in.sin_port); + + if (in_port != 0 && bound_port != in_port) { + std::cerr << "UDP socket: Could not bind to requested port " << in_port << std::endl; + _system_interface->close(_socket); + return -1; + } + + // Save port number + _port = bound_port; + + setBlockMode(false); + + // Done! + _initialized = true; + return 0; +} + +int Trick::UDPConnection::start() { + // We've already set everything up, just make sure it was successful + if (_initialized) { + _started = true; + return 0; + } + return -1; +} + +int Trick::UDPConnection::write (char * message, int size) { + if (!_started) + return -1; + + socklen_t sock_size = sizeof(_remote_serv_addr); + + return _system_interface->sendto(_socket, message, size, 0, (struct sockaddr *) &_remote_serv_addr, sock_size ); +} + +int Trick::UDPConnection::write (const std::string& message) { + if (!_started) + return -1; + + char send_buf[message.length()+1]; + strcpy (send_buf, message.c_str()); + socklen_t sock_size = sizeof(_remote_serv_addr); + + return _system_interface->sendto(_socket, send_buf, message.length(), 0, (struct sockaddr *) &_remote_serv_addr, sock_size ); +} + +std::string Trick::UDPConnection::read (int max_len) { + if (!_started) + return ""; + + char incoming_msg[max_len]; + int nbytes = _system_interface->recvfrom( _socket, incoming_msg, MAX_CMD_LEN, MSG_PEEK, NULL, NULL ) ; + if (nbytes == 0 ) { + return 0; + } + + if (nbytes != -1) { // -1 means socket is nonblocking and no data to read + /* find the last newline that is present on the socket */ + incoming_msg[nbytes] = '\0' ; + char *last_newline = rindex( incoming_msg , '\n') ; + + /* if there is a newline then there is a complete command on the socket */ + if ( last_newline != NULL ) { + socklen_t sock_size = sizeof(_remote_serv_addr); + /* Save the remote host information so we know where to send replies */ + /* only remove up to (and including) the last newline on the socket */ + int size = last_newline - incoming_msg + 1; + nbytes = _system_interface->recvfrom( _socket, incoming_msg, size, 0 , (struct sockaddr *) &_remote_serv_addr, &sock_size ) ; + } else { + nbytes = 0 ; + } + } + + std::stringstream msg_stream; + + if ( nbytes > 0 ) { + + int msg_len = nbytes ; + incoming_msg[msg_len] = '\0' ; + + for( int ii = 0 , jj = 0 ; ii <= msg_len ; ii++ ) { + if ( incoming_msg[ii] != '\r' ) { + msg_stream << incoming_msg[ii] ; + } + } + } + + return msg_stream.str(); +} + +int Trick::UDPConnection::disconnect () { + if (!_initialized) { + return -1; + } + + _system_interface->shutdown(_socket, SHUT_RDWR); + _system_interface->close (_socket); + _initialized = false; + _started = false; + + return 0; +} + +int Trick::UDPConnection::setBlockMode(bool blocking) { + int flag = _system_interface->fcntl(_socket, F_GETFL, 0); + + if (flag == -1) { + std::string error_message = "Unable to get flags for fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking); + perror (error_message.c_str()); + return -1; + } + + if (blocking) { + flag &= ~O_NONBLOCK; + } else { + flag |= O_NONBLOCK; + } + + if (_system_interface->fcntl(_socket, F_SETFL, flag) == -1) { + std::string error_message = "Unable to set fd " + std::to_string(_socket) + " block mode to " + std::to_string(blocking); + perror (error_message.c_str()); + return -1; + } + + return 0; +} + +int Trick::UDPConnection::restart() { + _system_interface = new SystemInterface(); +} + +bool Trick::UDPConnection::isInitialized() { + return _started; +} + +int Trick::UDPConnection::getPort() { + return _port; +} + +std::string Trick::UDPConnection::getHostname() { + return _hostname; +} \ No newline at end of file diff --git a/trick_source/trick_utils/connection_handlers/test/.gitignore b/trick_source/trick_utils/connection_handlers/test/.gitignore new file mode 100644 index 00000000..05c5c19e --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/test/.gitignore @@ -0,0 +1 @@ +comm_test \ No newline at end of file diff --git a/trick_source/trick_utils/connection_handlers/test/ClientListener_test.cpp b/trick_source/trick_utils/connection_handlers/test/ClientListener_test.cpp new file mode 100644 index 00000000..02f9f125 --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/test/ClientListener_test.cpp @@ -0,0 +1,400 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "trick/ClientListener.hh" +#include "SystemInterfaceMock/SystemInterfaceMock.hh" + + +class ClientListenerTest : public testing::Test { + + protected: + ClientListenerTest() : system_context(new SystemInferfaceMock()), listener (system_context) {} + ~ClientListenerTest(){} + + SystemInferfaceMock * system_context; + Trick::ClientListener listener; +}; + + +TEST_F( ClientListenerTest, initialized ) { + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, initialize_localhost_0 ) { + // ARRANGE + // Look up the hostname + char name[80]; + gethostname(name, (size_t) 80); + + // ACT + listener.initialize("localhost", 0); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), true); + EXPECT_EQ(listener.getHostname(), std::string(name)); +} + +TEST_F( ClientListenerTest, initialize_localhost_54321 ) { + // ARRANGE + // ACT + listener.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), true); + EXPECT_EQ(listener.getPort(), 54321); +} + +TEST_F( ClientListenerTest, initialize_no_args ) { + // ARRANGE + // ACT + listener.initialize(); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), true); + EXPECT_GT(listener.getPort(), 1000); +} + +TEST_F( ClientListenerTest, initialize_localhost_numerical_54321 ) { + // ARRANGE + // ACT + listener.initialize("127.0.0.1", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), true); + EXPECT_EQ(listener.getPort(), 54321); +} + +TEST_F( ClientListenerTest, initialize_invalid_hostname ) { + // ARRANGE + // ACT + listener.initialize("some_invalid_hostname", 0); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, failed_socket ) { + // ARRANGE + system_context->register_socket_impl([](int a, int b, int c) { + errno = EPERM; + return -1; + }); + + // ACT + listener.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, failed_setsockopt_reuseaddr ) { + // ARRANGE + system_context->register_setsockopt_impl([](int sockfd, int level, int optname, const void *optval, socklen_t optlen) { + errno = EINVAL; + return -1; + }); + + // ACT + listener.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, failed_setsockopt_buffering ) { + // ARRANGE + system_context->register_setsockopt_impl([](int sockfd, int level, int optname, const void *optval, socklen_t optlen) { + if (level == IPPROTO_TCP && optname == TCP_NODELAY) { + errno = ENOTSOCK; + return -1; + } + + return 0; + }); + + // ACT + listener.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, failed_bind ) { + // ARRANGE + system_context->register_bind_impl([](int sockfd, const struct sockaddr *addr,socklen_t addrlen) { + errno = EADDRINUSE; + return -1; + }); + + // ACT + listener.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, failed_sockname ) { + // ARRANGE + system_context->register_getsockname_impl([](int sockfd, struct sockaddr *addr, socklen_t *addrlen) { + ((struct sockaddr_in *) addr)->sin_port = htons(1234); + return 0; + }); + + // ACT + listener.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, failed_listen ) { + // ARRANGE + system_context->register_listen_impl([](int sockfd, int backlog) { + errno = EADDRINUSE; + return -1; + }); + + // ACT + listener.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(listener.isInitialized(), false); +} + +TEST_F( ClientListenerTest, checkForNewConnections_uninitialized ) { + // ARRANGE + // ACT + bool result = listener.checkForNewConnections(); + + // ASSERT + EXPECT_EQ(result, false); +} + +TEST_F( ClientListenerTest, checkForNewConnections ) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + system_context->register_select_impl([&socket_fd](int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout) { + FD_SET(socket_fd, readfds); + return 1; + }); + listener.initialize(); + + // ACT + bool result = listener.checkForNewConnections(); + + // ASSERT + EXPECT_EQ(result, true); +} + +TEST_F( ClientListenerTest, checkForNewConnections_select_error ) { + // ARRANGE + system_context->register_select_impl([](int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout) { + return -1; + }); + listener.initialize(); + + // ACT + bool result = listener.checkForNewConnections(); + + // ASSERT + EXPECT_EQ(result, false); +} + +TEST_F( ClientListenerTest, setBlockMode_false ) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + listener.initialize(); + + // ACT + int status = listener.setBlockMode(false); + + // ASSERT + EXPECT_EQ(status, 0); + + int flag = fcntl(socket_fd, F_GETFL, 0); + EXPECT_TRUE(flag & O_NONBLOCK); +} + +TEST_F( ClientListenerTest, setBlockMode_nonblocking) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + listener.initialize(); + + // ACT + int status = listener.setBlockMode(false); + + // ASSERT + EXPECT_EQ(status, 0); + + int flag = fcntl(socket_fd, F_GETFL, 0); + EXPECT_TRUE(flag & O_NONBLOCK); +} + + +TEST_F( ClientListenerTest, setBlockMode_blocking) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + listener.initialize(); + + // ACT + int status = listener.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, 0); + + int flag = fcntl(socket_fd, F_GETFL, 0); + EXPECT_FALSE(flag & O_NONBLOCK); +} + +TEST_F( ClientListenerTest, setBlockMode_fcntl_getfl_fail) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + system_context->register_fcntl_impl([](int a, int b, int c) { + errno = EACCES; + return -1; + }); + listener.initialize(); + + // ACT + int status = listener.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, -1); +} + +TEST_F( ClientListenerTest, setBlockMode_fcntl_setfl_fail) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + system_context->register_fcntl_impl([](int a, int cmd, int c) { + if (cmd == F_SETFL) { + errno = EBADF; + return -1; + } + return 0; + }); + + listener.initialize(); + + // ACT + int status = listener.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, -1); +} + + +TEST_F( ClientListenerTest, validateSourceAddress_localhost) { + // ARRANGE + // ACT + bool status = listener.validateSourceAddress("localhost"); + + // ASSERT + EXPECT_EQ(status, true); +} + +TEST_F( ClientListenerTest, validateSourceAddress_junk) { + // ARRANGE + // ACT + bool status = listener.validateSourceAddress("alsdkfjgalkdj"); + + // ASSERT + EXPECT_EQ(status, false); +} + +TEST_F( ClientListenerTest, checkSocket) { + // ARRANGE + listener.initialize(); + int port = listener.getPort(); + + // ACT + int status = listener.checkSocket(); + + // ASSERT + EXPECT_EQ(status, 0); + EXPECT_EQ(listener.getPort(), port); +} + +TEST_F( ClientListenerTest, checkSocket_uninitialized) { + // ARRANGE + // ACT + int status = listener.checkSocket(); + + // ASSERT + EXPECT_EQ(status, -1); +} + +TEST_F( ClientListenerTest, disconnect) { + // ARRANGE + listener.initialize(); + + // ACT + int status = listener.disconnect(); + + // ASSERT + EXPECT_EQ(status, 0); +} + +TEST_F( ClientListenerTest, disconnect_uninitialized) { + // ARRANGE + // ACT + int status = listener.disconnect(); + + // ASSERT + EXPECT_EQ(status, -1); +} + +TEST_F( ClientListenerTest, setupNewConnection) { + // ARRANGE + listener.initialize(); + + // ACT + Trick::TCPConnection * connection = listener.setUpNewConnection(); + + // ASSERT + EXPECT_TRUE(connection != NULL); +} + +TEST_F( ClientListenerTest, setupNewConnection_uninitialized) { + // ARRANGE + + // ACT + Trick::TCPConnection * connection = listener.setUpNewConnection(); + + // ASSERT + EXPECT_TRUE(connection == NULL); +} \ No newline at end of file diff --git a/trick_source/trick_utils/connection_handlers/test/Makefile b/trick_source/trick_utils/connection_handlers/test/Makefile new file mode 100644 index 00000000..0047620a --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/test/Makefile @@ -0,0 +1,38 @@ + +include ${TRICK_HOME}/share/trick/makefiles/Makefile.common +include ${TRICK_HOME}/share/trick/makefiles/Makefile.tricklib + +INCLUDE_DIRS = -I$(GTEST_HOME) -I${GTEST_HOME}/include -I$(TRICK_HOME)/include + +# Use the libtrick_connection_handlers library only. libtrick.a would also work. +TRICK_LIBS := ${TRICK_LIB_DIR}/libtrick_connection_handlers.a +TRICK_EXEC_LINK_LIBS += -L${GTEST_HOME}/lib64 -L${GTEST_HOME}/lib -lgtest -lgtest_main +TRICK_CXXFLAGS += ${INCLUDE_DIRS} -g -Wall -Wextra ${TRICK_SYSTEM_CXXFLAGS} ${TRICK_TEST_FLAGS} + +# Automatically determine all executable names produced by this Makefile. +TESTS = comm_test + +# Objects that we depend on +DEPENDS = ../object_*/*.o + +TRICK_CXXFLAGS += -Wno-unused-parameter + +default : all + +all : $(TESTS) + +# Set XML test results name +test : all + @ for i in $(TESTS) ; do \ + ./$$i --gtest_output=xml:${TRICK_HOME}/trick_test/ConnectionHandlers.xml; \ + done + +clean : clean_test + +clean_test : + $(RM) -rf $(TESTS) + + +$(TESTS) : $(CPP_OBJS) $(DEPENDS) + $(TRICK_CXX) $(TRICK_CXXFLAGS) $^ $(TRICK_EXEC_LINK_LIBS) -o $@ $(SYSCALL_MOCK_LINKFLAGS) + diff --git a/trick_source/trick_utils/connection_handlers/test/SystemInterfaceMock/SystemInterfaceMock.hh b/trick_source/trick_utils/connection_handlers/test/SystemInterfaceMock/SystemInterfaceMock.hh new file mode 100644 index 00000000..2463582c --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/test/SystemInterfaceMock/SystemInterfaceMock.hh @@ -0,0 +1,238 @@ + +#ifndef __SYSTEM_INTERFACE_MOCK__ +#define __SYSTEM_INTERFACE_MOCK__ + + +#include "trick/SystemInterface.hh" + + + +typedef std::function socket_func_type; + +typedef std::function setsockopt_func_type; + +typedef std::function bind_func_type; + +typedef std::function getsockname_func_type; + +typedef std::function listen_func_type; + +typedef std::function select_func_type; + +typedef std::function close_func_type; + +typedef std::function getaddrinfo_func_type; + +typedef std::function fcntl_func_type; + +typedef std::function shutdown_func_type; + +typedef std::function accept_func_type; + +typedef std::function send_func_type; + +typedef std::function sendto_func_type; + +typedef std::function recv_func_type; + +typedef std::function recvfrom_func_type; + + +class SystemInferfaceMock : public SystemInterface { + public: + SystemInferfaceMock () { + + real_socket_impl(); + real_setsockopt_impl(); + real_bind_impl(); + real_getsockname_impl(); + real_listen_impl(); + real_select_impl(); + real_close_impl(); + real_getaddrinfo_impl(); + real_fcntl_impl(); + real_shutdown_impl(); + real_accept_impl(); + real_send_impl(); + real_sendto_impl(); + real_recv_impl(); + real_recvfrom_impl(); + } + + void set_all_real () { + + real_socket_impl(); + real_setsockopt_impl(); + real_bind_impl(); + real_getsockname_impl(); + real_listen_impl(); + real_select_impl(); + real_close_impl(); + real_getaddrinfo_impl(); + real_fcntl_impl(); + real_shutdown_impl(); + real_accept_impl(); + real_send_impl(); + real_sendto_impl(); + real_recv_impl(); + real_recvfrom_impl(); + } + + void set_all_noop() { + + noop_socket_impl(); + noop_setsockopt_impl(); + noop_bind_impl(); + noop_getsockname_impl(); + noop_listen_impl(); + noop_select_impl(); + noop_close_impl(); + noop_getaddrinfo_impl(); + noop_fcntl_impl(); + noop_shutdown_impl(); + noop_accept_impl(); + noop_send_impl(); + noop_sendto_impl(); + noop_recv_impl(); + noop_recvfrom_impl(); + } + + + // socket implementation + public: + virtual int socket (int domain, int type, int protocol) override { return socket_impl( domain, type, protocol); } + void register_socket_impl (socket_func_type impl) { socket_impl = impl; } + void real_socket_impl () { socket_impl = [](int domain, int type, int protocol) -> int { return ::socket( domain, type, protocol); }; } + void noop_socket_impl () { socket_impl = [](int domain, int type, int protocol) -> int { return 0; }; } + private: + socket_func_type socket_impl; + + // setsockopt implementation + public: + virtual int setsockopt (int socket, int level, int option_name, const void * option_value, socklen_t option_len) override { return setsockopt_impl( socket, level, option_name, option_value, option_len); } + void register_setsockopt_impl (setsockopt_func_type impl) { setsockopt_impl = impl; } + void real_setsockopt_impl () { setsockopt_impl = [](int socket, int level, int option_name, const void * option_value, socklen_t option_len) -> int { return ::setsockopt( socket, level, option_name, option_value, option_len); }; } + void noop_setsockopt_impl () { setsockopt_impl = [](int socket, int level, int option_name, const void * option_value, socklen_t option_len) -> int { return 0; }; } + private: + setsockopt_func_type setsockopt_impl; + + // bind implementation + public: + virtual int bind (int socket, struct sockaddr * address, socklen_t address_len) override { return bind_impl( socket, address, address_len); } + void register_bind_impl (bind_func_type impl) { bind_impl = impl; } + void real_bind_impl () { bind_impl = [](int socket, const struct sockaddr * address, socklen_t address_len) -> int { return ::bind( socket, address, address_len); }; } + void noop_bind_impl () { bind_impl = [](int socket, const struct sockaddr * address, socklen_t address_len) -> int { return 0; }; } + private: + bind_func_type bind_impl; + + // getsockname implementation + public: + virtual int getsockname (int socket, struct sockaddr * address, socklen_t * address_len) override { return getsockname_impl( socket, address, address_len); } + void register_getsockname_impl (getsockname_func_type impl) { getsockname_impl = impl; } + void real_getsockname_impl () { getsockname_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return ::getsockname( socket, address, address_len); }; } + void noop_getsockname_impl () { getsockname_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return 0; }; } + private: + getsockname_func_type getsockname_impl; + + // listen implementation + public: + virtual int listen (int socket, int backlog) override { return listen_impl( socket, backlog); } + void register_listen_impl (listen_func_type impl) { listen_impl = impl; } + void real_listen_impl () { listen_impl = [](int socket, int backlog) -> int { return ::listen( socket, backlog); }; } + void noop_listen_impl () { listen_impl = [](int socket, int backlog) -> int { return 0; }; } + private: + listen_func_type listen_impl; + + // select implementation + public: + virtual int select (int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) override { return select_impl( nfds, readfds, writefds, errorfds, timeout); } + void register_select_impl (select_func_type impl) { select_impl = impl; } + void real_select_impl () { select_impl = [](int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) -> int { return ::select( nfds, readfds, writefds, errorfds, timeout); }; } + void noop_select_impl () { select_impl = [](int nfds, fd_set * readfds, fd_set * writefds, fd_set * errorfds, struct timeval * timeout) -> int { return 0; }; } + private: + select_func_type select_impl; + + // close implementation + public: + virtual int close (int fildes) override { return close_impl( fildes); } + void register_close_impl (close_func_type impl) { close_impl = impl; } + void real_close_impl () { close_impl = [](int fildes) -> int { return ::close( fildes); }; } + void noop_close_impl () { close_impl = [](int fildes) -> int { return 0; }; } + private: + close_func_type close_impl; + + // getaddrinfo implementation + public: + virtual int getaddrinfo (const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) override { return getaddrinfo_impl( hostname, servname, hints, res); } + void register_getaddrinfo_impl (getaddrinfo_func_type impl) { getaddrinfo_impl = impl; } + void real_getaddrinfo_impl () { getaddrinfo_impl = [](const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) -> int { return ::getaddrinfo( hostname, servname, hints, res); }; } + void noop_getaddrinfo_impl () { getaddrinfo_impl = [](const char * hostname, const char * servname, const struct addrinfo * hints, struct addrinfo ** res) -> int { return 0; }; } + private: + getaddrinfo_func_type getaddrinfo_impl; + + // fcntl implementation + public: + virtual int fcntl (int fildes, int cmd, int arg) override { return fcntl_impl( fildes, cmd, arg); } + void register_fcntl_impl (fcntl_func_type impl) { fcntl_impl = impl; } + void real_fcntl_impl () { fcntl_impl = [](int fildes, int cmd, int arg) -> int { return ::fcntl( fildes, cmd, arg); }; } + void noop_fcntl_impl () { fcntl_impl = [](int fildes, int cmd, int arg) -> int { return 0; }; } + private: + fcntl_func_type fcntl_impl; + + // shutdown implementation + public: + virtual int shutdown (int socket, int how) override { return shutdown_impl( socket, how); } + void register_shutdown_impl (shutdown_func_type impl) { shutdown_impl = impl; } + void real_shutdown_impl () { shutdown_impl = [](int socket, int how) -> int { return ::shutdown( socket, how); }; } + void noop_shutdown_impl () { shutdown_impl = [](int socket, int how) -> int { return 0; }; } + private: + shutdown_func_type shutdown_impl; + + // accept implementation + public: + virtual int accept (int socket, struct sockaddr * address, socklen_t * address_len) override { return accept_impl( socket, address, address_len); } + void register_accept_impl (accept_func_type impl) { accept_impl = impl; } + void real_accept_impl () { accept_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return ::accept( socket, address, address_len); }; } + void noop_accept_impl () { accept_impl = [](int socket, struct sockaddr * address, socklen_t * address_len) -> int { return 0; }; } + private: + accept_func_type accept_impl; + + // send implementation + public: + virtual ssize_t send (int socket, const void * buffer, size_t length, int flags) override { return send_impl( socket, buffer, length, flags); } + void register_send_impl (send_func_type impl) { send_impl = impl; } + void real_send_impl () { send_impl = [](int socket, const void * buffer, size_t length, int flags) -> ssize_t { return ::send( socket, buffer, length, flags); }; } + void noop_send_impl () { send_impl = [](int socket, const void * buffer, size_t length, int flags) -> ssize_t { return 0; }; } + private: + send_func_type send_impl; + + // sendto implementation + public: + virtual ssize_t sendto (int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) override { return sendto_impl( socket, buffer, length, flags, dest_addr, dest_len); } + void register_sendto_impl (sendto_func_type impl) { sendto_impl = impl; } + void real_sendto_impl () { sendto_impl = [](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t { return ::sendto( socket, buffer, length, flags, dest_addr, dest_len); }; } + void noop_sendto_impl () { sendto_impl = [](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t { return 0; }; } + private: + sendto_func_type sendto_impl; + + // recv implementation + public: + virtual ssize_t recv (int socket, void * buffer, size_t length, int flags) override { return recv_impl( socket, buffer, length, flags); } + void register_recv_impl (recv_func_type impl) { recv_impl = impl; } + void real_recv_impl () { recv_impl = [](int socket, void * buffer, size_t length, int flags) -> ssize_t { return ::recv( socket, buffer, length, flags); }; } + void noop_recv_impl () { recv_impl = [](int socket, void * buffer, size_t length, int flags) -> ssize_t { return 0; }; } + private: + recv_func_type recv_impl; + + // recvfrom implementation + public: + virtual ssize_t recvfrom (int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) override { return recvfrom_impl( socket, buffer, length, flags, address, address_len); } + void register_recvfrom_impl (recvfrom_func_type impl) { recvfrom_impl = impl; } + void real_recvfrom_impl () { recvfrom_impl = [](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { return ::recvfrom( socket, buffer, length, flags, address, address_len); }; } + void noop_recvfrom_impl () { recvfrom_impl = [](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { return 0; }; } + private: + recvfrom_func_type recvfrom_impl; + +}; + +#endif diff --git a/trick_source/trick_utils/connection_handlers/test/TCPConnection_test.cpp b/trick_source/trick_utils/connection_handlers/test/TCPConnection_test.cpp new file mode 100644 index 00000000..2c9c92f1 --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/test/TCPConnection_test.cpp @@ -0,0 +1,367 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "trick/TCPConnection.hh" +#include "SystemInterfaceMock/SystemInterfaceMock.hh" + + +class TCPConnectionTest : public testing::Test { + + protected: + TCPConnectionTest() : system_context(new SystemInferfaceMock()), connection (fake_listen_socket_fd, system_context) {} + ~TCPConnectionTest(){} + + SystemInferfaceMock * system_context; + Trick::TCPConnection connection; + const static int fake_listen_socket_fd; +}; + +const int TCPConnectionTest::fake_listen_socket_fd = 5; + +TEST_F( TCPConnectionTest, initialized ) { + // ARRANGE + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return 6; + }); + system_context->noop_fcntl_impl(); + + // ACT + int result = connection.start(); + + // ASSERT + ASSERT_EQ(result, 0); +} + +TEST_F( TCPConnectionTest, initialized_accept_failure ) { + // ARRANGE + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + errno = EWOULDBLOCK; + return -1; + }); + + // ACT + int result = connection.start(); + + // ASSERT + ASSERT_EQ(result, -1); +} + + +TEST_F( TCPConnectionTest, setBlockMode_nonblocking) { + // ARRANGE + int socket_fd; + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + socket_fd = ::socket(AF_INET, SOCK_STREAM, 0); + return socket_fd; + }); + connection.start(); + + // ACT + int status = connection.setBlockMode(false); + + // ASSERT + EXPECT_EQ(status, 0); + + int flag = fcntl(socket_fd, F_GETFL, 0); + EXPECT_TRUE(flag & O_NONBLOCK); + + close(socket_fd); +} + + +TEST_F( TCPConnectionTest, setBlockMode_blocking) { +// ARRANGE + int socket_fd; + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + socket_fd = ::socket(AF_INET, SOCK_STREAM, 0); + return socket_fd; + }); + connection.start(); + + // ACT + int status = connection.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, 0); + + int flag = fcntl(socket_fd, F_GETFL, 0); + EXPECT_FALSE(flag & O_NONBLOCK); + + close(socket_fd); +} + +TEST_F( TCPConnectionTest, setBlockMode_fcntl_getfl_fail) { + // ARRANGE + int socket_fd; + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + socket_fd = ::socket(AF_INET, SOCK_STREAM, 0); + return socket_fd; + }); + + system_context->register_fcntl_impl([](int a, int b, int c) { + errno = EACCES; + return -1; + }); + connection.start(); + + // ACT + + int status = connection.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, -1); + + close(socket_fd); +} + +TEST_F( TCPConnectionTest, setBlockMode_fcntl_setfl_fail) { + // ARRANGE + int socket_fd; + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + socket_fd = ::socket(AF_INET, SOCK_STREAM, 0); + return socket_fd; + }); + system_context->register_fcntl_impl([](int a, int cmd, int c) { + if (cmd == F_SETFL) { + errno = EBADF; + return -1; + } + return 0; + }); + + connection.start(); + + // ACT + int status = connection.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, -1); + + close(socket_fd); +} + +TEST_F( TCPConnectionTest, write_string ) { + // ARRANGE + char sent_data[100]; + memset(sent_data, 0, 100); + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return 6; + }); + system_context->noop_fcntl_impl(); + system_context->register_send_impl([&](int socket, const void * buffer, size_t length, int flags) -> ssize_t { + memcpy (sent_data, buffer, length); + return length; + }); + connection.start(); + + // ACT + std::string str = "This is a message to write"; + int result = connection.write(str); + + // ASSERT + ASSERT_EQ(result, str.length()); + ASSERT_STREQ(str.c_str(), sent_data); +} + +TEST_F( TCPConnectionTest, write_binary_buf ) { + // ARRANGE + char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}; + char sent_data[100]; + + memset(sent_data, 0, 100); + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return 6; + }); + system_context->noop_fcntl_impl(); + system_context->register_send_impl([&](int socket, const void * buffer, size_t length, int flags) -> ssize_t { + memcpy (sent_data, buffer, length); + return length; + }); + connection.start(); + + // ACT + int result = connection.write(to_send, sizeof(to_send)); + + // ASSERT + ASSERT_EQ(result, sizeof(to_send)); + for (int i = 0; i < 8; i++) { + ASSERT_EQ(to_send[i], sent_data[i]); + } +} + + +TEST_F( TCPConnectionTest, write_string_uninitialized ) { + // ARRANGE + std::string str = "This is a message to write"; + + // ACT + int result = connection.write(str); + + // ASSERT + ASSERT_EQ(result, -1); +} + +TEST_F( TCPConnectionTest, write_binary_buf_uninitialized ) { + // ARRANGE + char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}; + + // ACT + int result = connection.write(to_send, sizeof(to_send)); + + // ASSERT + ASSERT_EQ(result, -1); +} + + +TEST_F( TCPConnectionTest, read_nonewline ) { + // ARRANGE + std::string data_to_read = "Here is an incomplete message from a socket"; + + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return 6; + }); + system_context->noop_fcntl_impl(); + system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t { + int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length; + + memcpy (buffer, data_to_read.c_str(), data_to_read_size); + return data_to_read_size; + }); + connection.start(); + + // ACT + std::string result = connection.read(); + + // ASSERT + ASSERT_EQ(result, std::string("")); +} + + +TEST_F( TCPConnectionTest, read ) { + // ARRANGE + std::string data_to_read = "Here is a complete message from a socket\n This part is incomplete"; + + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return 6; + }); + system_context->noop_fcntl_impl(); + system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t { + int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length; + memcpy (buffer, data_to_read.c_str(), data_to_read_size); + return data_to_read_size; + }); + connection.start(); + + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = "Here is a complete message from a socket\n"; + expected += '\0'; + ASSERT_EQ(result, expected); +} + + +TEST_F( TCPConnectionTest, read_nodata ) { + // ARRANGE + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return 6; + }); + system_context->noop_fcntl_impl(); + system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t { + errno = EAGAIN; + return -1; + }); + connection.start(); + + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = ""; + ASSERT_EQ(result, expected); +} + +TEST_F( TCPConnectionTest, read_other_error ) { + // ARRANGE + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return 6; + }); + system_context->noop_fcntl_impl(); + system_context->register_recv_impl([&](int socket, void * buffer, size_t length, int flags) -> ssize_t { + errno = EBADF; + return -1; + }); + + connection.start(); + + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = ""; + ASSERT_EQ(result, expected); +} + + + +TEST_F( TCPConnectionTest, read_uninitialized ) { + // ARRANGE + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = ""; + ASSERT_EQ(result, expected); +} + + +TEST_F( TCPConnectionTest, disconnect ) { + // ARRANGE + int socket_fd = 6; + int close_called = 0; + int shutdown_called = 0; + system_context->register_accept_impl([&](int socket, struct sockaddr * address, socklen_t * address_len) -> int { + return socket_fd; + }); + system_context->register_close_impl([&](int socket) { + if (socket == socket_fd) + close_called++; + return 0; + }); + system_context->register_shutdown_impl([&](int socket, int how) { + if (socket == socket_fd) + shutdown_called++; + return 0; + }); + system_context->noop_fcntl_impl(); + connection.start(); + + // ACT + int result = connection.disconnect(); + + // ASSERT + ASSERT_EQ(result, 0); + ASSERT_EQ(close_called, 1); + ASSERT_EQ(shutdown_called, 1); + + // Don't shut down if already shut down + // ACT + result = connection.disconnect(); + + // ASSERT + ASSERT_EQ(result, -1); + ASSERT_EQ(close_called, 1); + ASSERT_EQ(shutdown_called, 1); + +} \ No newline at end of file diff --git a/trick_source/trick_utils/connection_handlers/test/UDPConnection_test.cpp b/trick_source/trick_utils/connection_handlers/test/UDPConnection_test.cpp new file mode 100644 index 00000000..af6924f0 --- /dev/null +++ b/trick_source/trick_utils/connection_handlers/test/UDPConnection_test.cpp @@ -0,0 +1,394 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "trick/UDPConnection.hh" +#include "SystemInterfaceMock/SystemInterfaceMock.hh" + + +class UDPConnectionTest : public testing::Test { + + protected: + UDPConnectionTest() : system_context(new SystemInferfaceMock()), connection (system_context) {} + ~UDPConnectionTest() {} + + SystemInferfaceMock * system_context; + Trick::UDPConnection connection; +}; + + + +TEST_F( UDPConnectionTest, initialize_localhost_0 ) { + // ARRANGE + char name[80]; + gethostname(name, (size_t) 80); + + // ACT + connection.initialize("localhost", 0); + connection.start(); + + // ASSERT + EXPECT_EQ(connection.isInitialized(), true); + EXPECT_EQ(connection.getHostname(), std::string(name)); +} + +TEST_F( UDPConnectionTest, initialize_localhost_54321 ) { + // ARRANGE + // ACT + connection.initialize("localhost", 54321); + connection.start(); + + // ASSERT + EXPECT_EQ(connection.isInitialized(), true); + EXPECT_EQ(connection.getPort(), 54321); +} + +TEST_F( UDPConnectionTest, initialize_localhost_numerical_54321 ) { + // ARRANGE + // ACT + connection.initialize("127.0.0.1", 12345); + connection.start(); + + // ASSERT + EXPECT_EQ(connection.getPort(), 12345); +} + +TEST_F( UDPConnectionTest, initialize_invalid_hostname ) { + // ARRANGE + // ACT + connection.initialize("some_invalid_hostname", 0); + connection.start(); + + // ASSERT + EXPECT_EQ(connection.isInitialized(), false); +} + +TEST_F( UDPConnectionTest, failed_socket ) { + // ARRANGE + system_context->register_socket_impl([](int a, int b, int c) { + errno = EPERM; + return -1; + }); + + // ACT + connection.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(connection.isInitialized(), false); +} + +TEST_F( UDPConnectionTest, failed_setsockopt_reuseaddr ) { + // ARRANGE + system_context->register_setsockopt_impl([](int sockfd, int level, int optname, const void *optval, socklen_t optlen) { + errno = EINVAL; + return -1; + }); + + // ACT + connection.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(connection.isInitialized(), false); +} + +TEST_F( UDPConnectionTest, failed_bind ) { + // ARRANGE + system_context->register_bind_impl([](int sockfd, const struct sockaddr *addr,socklen_t addrlen) { + errno = EADDRINUSE; + return -1; + }); + + // ACT + connection.initialize("localhost", 54321); + + // ASSERT + EXPECT_EQ(connection.isInitialized(), false); +} + +TEST_F( UDPConnectionTest, failed_sockname ) { + // ARRANGE + system_context->register_getsockname_impl([](int sockfd, struct sockaddr *addr, socklen_t *addrlen) { + ((struct sockaddr_in *) addr)->sin_port = htons(1234); + return 0; + }); + + // ACT + connection.initialize("localhost", 3412); + + // ASSERT + EXPECT_EQ(connection.isInitialized(), false); +} + + +TEST_F( UDPConnectionTest, setBlockMode_nonblocking) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + + connection.initialize("localhost", 0); + connection.start(); + + // ACT + int status = connection.setBlockMode(false); + + // ASSERT + EXPECT_EQ(status, 0); + + int flag = fcntl(socket_fd, F_GETFL, 0); + EXPECT_TRUE(flag & O_NONBLOCK); + + close(socket_fd); +} + + +TEST_F( UDPConnectionTest, setBlockMode_blocking) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + + connection.initialize("localhost", 0); + connection.start(); + + // ACT + int status = connection.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, 0); + + int flag = fcntl(socket_fd, F_GETFL, 0); + EXPECT_FALSE(flag & O_NONBLOCK); + + close(socket_fd); +} + +TEST_F( UDPConnectionTest, setBlockMode_fcntl_getfl_fail) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + + connection.initialize("localhost", 0); + connection.start(); + + system_context->register_fcntl_impl([](int a, int b, int c) { + errno = EACCES; + return -1; + }); + + // ACT + + int status = connection.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, -1); + + close(socket_fd); +} + +TEST_F( UDPConnectionTest, setBlockMode_fcntl_setfl_fail) { + // ARRANGE + int socket_fd; + system_context->register_socket_impl([&socket_fd](int a, int b, int c) { + socket_fd = ::socket(a, b, c); + return socket_fd; + }); + + connection.initialize("localhost", 0); + connection.start(); + + system_context->register_fcntl_impl([](int a, int cmd, int c) { + if (cmd == F_SETFL) { + errno = EBADF; + return -1; + } + return 0; + }); + + // ACT + int status = connection.setBlockMode(true); + + // ASSERT + EXPECT_EQ(status, -1); + + close(socket_fd); +} + + +TEST_F( UDPConnectionTest, write_string ) { + // ARRANGE + char sent_data[100]; + memset(sent_data, 0, 100); + system_context->noop_fcntl_impl(); + system_context->register_sendto_impl([&](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t { + memcpy (sent_data, buffer, length); + return length; + }); + connection.initialize("localhost", 0); + connection.start(); + + // ACT + std::string str = "This is a message to write"; + int result = connection.write(str); + + // ASSERT + ASSERT_EQ(result, str.length()); + ASSERT_STREQ(str.c_str(), sent_data); +} + +TEST_F( UDPConnectionTest, write_binary_buf ) { + // ARRANGE + char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}; + char sent_data[100]; + + memset(sent_data, 0, 100); + system_context->noop_fcntl_impl(); + system_context->register_sendto_impl([&](int socket, const void * buffer, size_t length, int flags, const struct sockaddr * dest_addr, socklen_t dest_len) -> ssize_t { + memcpy (sent_data, buffer, length); + return length; + }); + connection.initialize("localhost", 0); + connection.start(); + + // ACT + int result = connection.write(to_send, sizeof(to_send)); + + // ASSERT + ASSERT_EQ(result, sizeof(to_send)); + for (int i = 0; i < 8; i++) { + ASSERT_EQ(to_send[i], sent_data[i]); + } +} + + +TEST_F( UDPConnectionTest, write_string_uninitialized ) { + // ARRANGE + std::string str = "This is a message to write"; + + // ACT + int result = connection.write(str); + + // ASSERT + ASSERT_EQ(result, -1); +} + +TEST_F( UDPConnectionTest, write_binary_buf_uninitialized ) { + // ARRANGE + char to_send[8] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}; + + // ACT + int result = connection.write(to_send, sizeof(to_send)); + + // ASSERT + ASSERT_EQ(result, -1); +} + + +TEST_F( UDPConnectionTest, read_nonewline ) { + // ARRANGE + std::string data_to_read = "Here is an incomplete message from a socket"; + + system_context->noop_fcntl_impl(); + system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { + int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length; + + memcpy (buffer, data_to_read.c_str(), data_to_read_size); + return data_to_read_size; + }); + connection.initialize("localhost", 0); + connection.start(); + + + // ACT + std::string result = connection.read(); + + // ASSERT + ASSERT_EQ(result, std::string("")); +} + + +TEST_F( UDPConnectionTest, read ) { + // ARRANGE + std::string data_to_read = "Here is a complete message from a socket\n This part is incomplete"; + + system_context->noop_fcntl_impl(); + system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { + int data_to_read_size = data_to_read.size() < length ? data_to_read.size() : length; + memcpy (buffer, data_to_read.c_str(), data_to_read_size); + return data_to_read_size; + }); + connection.initialize("localhost", 0); + connection.start(); + + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = "Here is a complete message from a socket\n"; + expected += '\0'; + ASSERT_EQ(result, expected); +} + + +TEST_F( UDPConnectionTest, read_nodata ) { + // ARRANGE + system_context->noop_fcntl_impl(); + system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { + errno = EAGAIN; + return -1; + }); + connection.initialize("localhost", 0); + connection.start(); + + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = ""; + ASSERT_EQ(result, expected); +} + +TEST_F( UDPConnectionTest, read_other_error ) { + // ARRANGE + system_context->noop_fcntl_impl(); + system_context->register_recvfrom_impl([&](int socket, void * buffer, size_t length, int flags, struct sockaddr * address, socklen_t * address_len) -> ssize_t { + errno = EBADF; + return -1; + }); + connection.initialize("localhost", 0); + connection.start(); + + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = ""; + ASSERT_EQ(result, expected); +} + +TEST_F( UDPConnectionTest, read_uninitialized ) { + // ARRANGE + // ACT + std::string result = connection.read(); + + // ASSERT + std::string expected = ""; + ASSERT_EQ(result, expected); +} diff --git a/trickops.py b/trickops.py index 3e9b74dd..a2703249 100644 --- a/trickops.py +++ b/trickops.py @@ -28,39 +28,10 @@ class SimTestWorkflow(TrickWorkflow): remaining_run_jobs = self.get_jobs(kind='run', phase=0) # Get all jobs with default phase 0 analysis_jobs = self.get_jobs(kind='analyze') - # Some tests fail intermittently for reasons not related to the tests themselves, mostly network weirdness. - # Allow retries so that we can still cover some network-adjacent code - retry_allowed_sims = self.get_sims(labels='retries_allowed') - retry_allowed_jobs = [run.get_run_job() for run in [item for sublist in [sim.get_runs() for sim in retry_allowed_sims] for item in sublist]] - for job in retry_allowed_jobs: - # Note there's an assumption/dependency here that 'retries_allowed' runs - # are only in the remaining_run_jobs list. - Jordan 2/2023 - remaining_run_jobs.remove(job) - - # Some tests fail intermittently for reasons not related to the tests themselves, mostly network weirdness. - # Allow retries so that we can still cover some network-adjacent code - retry_allowed_sims = self.get_sims(labels='retries_allowed') - retry_allowed_jobs = [run.get_run_job() for run in [item for sublist in [sim.get_runs() for sim in retry_allowed_sims] for item in sublist]] - for job in retry_allowed_jobs: - run_jobs.remove(job) - - builds_status = self.execute_jobs(build_jobs, max_concurrent=self.cpus, header='Executing all sim builds.') - first_phase_run_status = self.execute_jobs(first_run_jobs, max_concurrent=self.cpus, header="Executing first phase runs.") - runs_status = self.execute_jobs(remaining_run_jobs, max_concurrent=self.cpus, header='Executing remaining runs.') + first_phase_run_status = self.execute_jobs(first_run_jobs, max_concurrent=self.cpus, header="Executing first phase runs.", job_timeout=1000) + runs_status = self.execute_jobs(remaining_run_jobs, max_concurrent=self.cpus, header='Executing remaining runs.', job_timeout=1000) - # Run the retry_allowed jobs - self.execute_jobs(retry_allowed_jobs, max_concurrent=self.cpus, header='Executing retry-allowed runs.') - - # If anything failed, try it again up to max_retries times - all_retried_status = 0 - final_retry_jobs = [] - for sim in retry_allowed_sims: - failing_runs = [run for run in sim.get_runs() if run.get_run_job().get_status() == Job.Status.FAILED] - for run in failing_runs: - status, final_job = self.retry_job(sim, run, max_retries) - final_retry_jobs += [final_job] - all_retried_status = all_retried_status or status comparison_result = self.compare() analysis_status = self.execute_jobs(analysis_jobs, max_concurrent=self.cpus, header='Executing all analysis.') @@ -68,32 +39,22 @@ class SimTestWorkflow(TrickWorkflow): self.status_summary() # Print a Succinct summary # Dump failing logs - jobs = build_jobs + first_run_jobs + remaining_run_jobs + final_retry_jobs + jobs = build_jobs + first_run_jobs + remaining_run_jobs for job in jobs: - if job.get_status() == Job.Status.FAILED: - print("Failing job: ", job.name) + if job.get_status() == Job.Status.FAILED or job.get_status() == Job.Status.TIMEOUT: + print ("*"*120) + if job.get_status() == Job.Status.FAILED: + header = "Failing job: " + job.name + else: + header = "Timed out job: " + job.name + + numspaces = int((120 - 20 - len(header))/2 -2) + print("*"*10, " "*numspaces, header, " "*numspaces, "*"*10,) print ("*"*120) print(open(job.log_file, "r").read()) - print ("*"*120, "\n") + print ("*"*120, "\n\n\n") - return (builds_status or runs_status or first_phase_run_status or all_retried_status or len(self.config_errors) > 0 or comparison_result or analysis_status) - - # Retries a job up to max_retries times and adds runs to the sim - # Returns tuple of (job_status, final retry job) - def retry_job(self, sim, run, max_retries): - tries = 0 - job_failing = 1 - retry_run = None - retry_job = None - while tries < max_retries and job_failing: - tries += 1 - retry_run = TrickWorkflow.Run(sim_dir=run.sim_dir, input_file=run.input_file, binary=run.binary, returns=run.returns,log_dir=run.log_dir) - retry_job = retry_run.get_run_job() - retry_job.name = retry_job.name + "_retry_" + str(tries) - job_failing = self.execute_jobs([retry_job], max_concurrent=1, header="Retrying failed job") - sim.add_run(retry_run) - - return (job_failing, retry_job) + return (builds_status or runs_status or first_phase_run_status or len(self.config_errors) > 0 or comparison_result or analysis_status) if __name__ == "__main__": parser = argparse.ArgumentParser(description='Build, run, and compare all test sims for Trick',