From be5835f92f7a4442be33d07d4204c248c7b2f610 Mon Sep 17 00:00:00 2001 From: Alex Lin Date: Tue, 19 Apr 2016 14:24:25 -0500 Subject: [PATCH] Variable server restart issues Backporting addition to suspend accepting new variable server connections and all communications to variable server clients for the duration of reloading a checkpoint refs #168 --- .../include/VariableServerListenThread.hh | 7 +++++++ .../include/VariableServerThread.hh | 3 +++ .../src/VariableServerListenThread.cpp | 14 ++++++++++++++ .../src/VariableServerThread.cpp | 1 + .../src/VariableServerThread_loop.cpp | 4 ++++ .../src/VariableServerThread_restart.cpp | 18 +++++++++++++----- .../src/VariableServer_restart.cpp | 5 +++++ 7 files changed, 47 insertions(+), 5 deletions(-) diff --git a/trick_source/sim_services/VariableServer/include/VariableServerListenThread.hh b/trick_source/sim_services/VariableServer/include/VariableServerListenThread.hh index 60a36b6e..962188c3 100644 --- a/trick_source/sim_services/VariableServer/include/VariableServerListenThread.hh +++ b/trick_source/sim_services/VariableServer/include/VariableServerListenThread.hh @@ -49,6 +49,10 @@ namespace Trick { int restart() ; + // pause and restart listen thread during checkpoint reload + void pause_listening() ; + void restart_listening() ; + virtual void dump( std::ostream & oss = std::cout ) ; protected: @@ -70,6 +74,9 @@ namespace Trick { /** The listen device\n */ TCDevice listen_dev; /**< trick_io(**) */ + /** The mutex to stop accepting new connections during restart\n */ + pthread_mutex_t restart_pause ; /**< trick_io(**) */ + } ; } diff --git a/trick_source/sim_services/VariableServer/include/VariableServerThread.hh b/trick_source/sim_services/VariableServer/include/VariableServerThread.hh index 8d0dc692..fc753865 100644 --- a/trick_source/sim_services/VariableServer/include/VariableServerThread.hh +++ b/trick_source/sim_services/VariableServer/include/VariableServerThread.hh @@ -471,6 +471,9 @@ namespace Trick { /** The mutex to protect variable output buffers when copying variable values to them from Trick memory.\n */ pthread_mutex_t copy_mutex ; /**< trick_io(**) */ + /** The mutex pauses all processing during checkpoint restart */ + pthread_mutex_t restart_pause ; /**< trick_io(**) */ + /** Dummy integer for bad references.\n */ static int bad_ref_int ; /**< trick_io(**) */ diff --git a/trick_source/sim_services/VariableServer/src/VariableServerListenThread.cpp b/trick_source/sim_services/VariableServer/src/VariableServerListenThread.cpp index a8d59c3c..1fb46d2e 100644 --- a/trick_source/sim_services/VariableServer/src/VariableServerListenThread.cpp +++ b/trick_source/sim_services/VariableServer/src/VariableServerListenThread.cpp @@ -22,6 +22,7 @@ Trick::VariableServerListenThread::VariableServerListenThread() : source_address = std::string(hname) ; strcpy(listen_dev.client_tag, ""); tc_error(&listen_dev, 0); + pthread_mutex_init(&restart_pause, NULL); } Trick::VariableServerListenThread::~VariableServerListenThread() { @@ -160,16 +161,20 @@ void * Trick::VariableServerListenThread::thread_body() { } while (1) { + FD_ZERO(&rfds); FD_SET(listen_dev.socket, &rfds); timeout_time.tv_sec = 2 ; select(listen_dev.socket + 1, &rfds, NULL, NULL, &timeout_time); if (FD_ISSET(listen_dev.socket, &rfds)) { + // pause here during restart + pthread_mutex_lock(&restart_pause) ; vst = new Trick::VariableServerThread(&listen_dev) ; vst->copy_cpus(get_cpus()) ; vst->create_thread() ; vst->wait_for_accept() ; + pthread_mutex_unlock(&restart_pause) ; } else { if ( broadcast ) { sprintf(buf1 , "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , listen_dev.hostname , (unsigned short)listen_dev.port , @@ -179,6 +184,7 @@ void * Trick::VariableServerListenThread::thread_body() { sendto(mcast_socket , buf1 , strlen(buf1) , 0 , (struct sockaddr *)&mcast_addr , (socklen_t)sizeof(mcast_addr)) ; } } + } return NULL ; @@ -222,6 +228,14 @@ int Trick::VariableServerListenThread::restart() { return 0 ; } +void Trick::VariableServerListenThread::pause_listening() { + pthread_mutex_lock(&restart_pause) ; +} + +void Trick::VariableServerListenThread::restart_listening() { + pthread_mutex_unlock(&restart_pause) ; +} + void Trick::VariableServerListenThread::dump( std::ostream & oss ) { oss << "Trick::VariableServerListenThread (" << name << ")" << std::endl ; oss << " source_address = " << source_address << std::endl ; diff --git a/trick_source/sim_services/VariableServer/src/VariableServerThread.cpp b/trick_source/sim_services/VariableServer/src/VariableServerThread.cpp index 08ad9d41..64302951 100644 --- a/trick_source/sim_services/VariableServer/src/VariableServerThread.cpp +++ b/trick_source/sim_services/VariableServer/src/VariableServerThread.cpp @@ -48,6 +48,7 @@ Trick::VariableServerThread::VariableServerThread(TCDevice * in_listen_dev) : connection.error_handler->report_level = TRICK_ERROR_CAUTION; pthread_mutex_init(©_mutex, NULL); + pthread_mutex_init(&restart_pause, NULL); var_data_staged = false; packets_copied = 0 ; diff --git a/trick_source/sim_services/VariableServer/src/VariableServerThread_loop.cpp b/trick_source/sim_services/VariableServer/src/VariableServerThread_loop.cpp index 5830b668..f96f7fe6 100644 --- a/trick_source/sim_services/VariableServer/src/VariableServerThread_loop.cpp +++ b/trick_source/sim_services/VariableServer/src/VariableServerThread_loop.cpp @@ -65,6 +65,9 @@ void * Trick::VariableServerThread::thread_body() { try { while (1) { + // Pause here if we are in a restart condition + pthread_mutex_lock(&restart_pause) ; + /* Check the length of the message on the socket */ nbytes = recvfrom( connection.socket, incoming_msg, MAX_CMD_LEN, MSG_PEEK, NULL, NULL ) ; if (nbytes == 0 ) { @@ -149,6 +152,7 @@ void * Trick::VariableServerThread::thread_body() { } } } + pthread_mutex_unlock(&restart_pause) ; usleep((unsigned int) (update_rate * 1000000)); } diff --git a/trick_source/sim_services/VariableServer/src/VariableServerThread_restart.cpp b/trick_source/sim_services/VariableServer/src/VariableServerThread_restart.cpp index af8f92cc..4761fe02 100644 --- a/trick_source/sim_services/VariableServer/src/VariableServerThread_restart.cpp +++ b/trick_source/sim_services/VariableServer/src/VariableServerThread_restart.cpp @@ -3,6 +3,10 @@ #include "sim_services/VariableServer/include/VariableServerThread.hh" void Trick::VariableServerThread::preload_checkpoint() { + + // Stop variable server processing at the top of the processing loop. + pthread_mutex_lock(&restart_pause); + // Let the thread complete any data copying it has to do // and then suspend data copying until the checkpoint is reloaded. pthread_mutex_lock(©_mutex); @@ -23,15 +27,19 @@ void Trick::VariableServerThread::preload_checkpoint() { (*it)->ref->attr->units = (char *)"--" ; (*it)->ref->attr->size = sizeof(int) ; } -} - -void Trick::VariableServerThread::restart() { - // Set the pause state of this thread back to its "pre-checkpoint reload" state. - pause_cmd = saved_pause_cmd ; // Allow data copying to continue. pthread_mutex_unlock(©_mutex); } +void Trick::VariableServerThread::restart() { + // Set the pause state of this thread back to its "pre-checkpoint reload" state. + pause_cmd = saved_pause_cmd ; + + // Restart the variable server processing. + pthread_mutex_unlock(&restart_pause); + +} + diff --git a/trick_source/sim_services/VariableServer/src/VariableServer_restart.cpp b/trick_source/sim_services/VariableServer/src/VariableServer_restart.cpp index 5a833799..1dbdfc76 100644 --- a/trick_source/sim_services/VariableServer/src/VariableServer_restart.cpp +++ b/trick_source/sim_services/VariableServer/src/VariableServer_restart.cpp @@ -25,6 +25,8 @@ int Trick::VariableServer::restart() { int Trick::VariableServer::suspendPreCheckpointReload() { std::map::iterator pos ; + listen_thread.pause_listening() ; + pthread_mutex_lock(&map_mutex) ; for ( pos = var_server_threads.begin() ; pos != var_server_threads.end() ; pos++ ) { VariableServerThread* vst = (*pos).second ; @@ -46,5 +48,8 @@ int Trick::VariableServer::resumePostCheckpointReload() { vst->restart() ; } pthread_mutex_unlock(&map_mutex) ; + + listen_thread.restart_listening() ; + return 0; }