Variable server restart issues

Added mutexes in the variable server listener thread and each variable server thread.
During checkpoint restart all of these mutexes are locked by the master thread to
stop accepting new connections and stop all read/writing to all variable server clients.
Communication is resumed after the checkpoint has been reloaded.

refs #168
This commit is contained in:
Alex Lin 2016-04-19 13:50:30 -05:00
parent fad36fc544
commit 8c3e322ed1
7 changed files with 47 additions and 5 deletions

View File

@ -46,6 +46,10 @@ namespace Trick {
int restart() ;
// pause and restart listen thread during checkpoint reload
void pause_listening() ;
void restart_listening() ;
virtual void dump( std::ostream & oss = std::cout ) ;
protected:
@ -67,6 +71,9 @@ namespace Trick {
/** The listen device\n */
TCDevice listen_dev; /**< trick_io(**) */
/** The mutex to stop accepting new connections during restart\n */
pthread_mutex_t restart_pause ; /**< trick_io(**) */
} ;
}

View File

@ -473,6 +473,9 @@ namespace Trick {
/** The mutex to protect variable output buffers when copying variable values to them from Trick memory.\n */
pthread_mutex_t copy_mutex ; /**< trick_io(**) */
/** The mutex pauses all processing during checkpoint restart */
pthread_mutex_t restart_pause ; /**< trick_io(**) */
/** Dummy integer for bad references.\n */
static int bad_ref_int ; /**< trick_io(**) */

View File

@ -22,6 +22,7 @@ Trick::VariableServerListenThread::VariableServerListenThread() :
source_address = std::string(hname) ;
strcpy(listen_dev.client_tag, "<empty>");
tc_error(&listen_dev, 0);
pthread_mutex_init(&restart_pause, NULL);
}
Trick::VariableServerListenThread::~VariableServerListenThread() {
@ -160,16 +161,20 @@ void * Trick::VariableServerListenThread::thread_body() {
}
while (1) {
FD_ZERO(&rfds);
FD_SET(listen_dev.socket, &rfds);
timeout_time.tv_sec = 2 ;
select(listen_dev.socket + 1, &rfds, NULL, NULL, &timeout_time);
if (FD_ISSET(listen_dev.socket, &rfds)) {
// pause here during restart
pthread_mutex_lock(&restart_pause) ;
vst = new Trick::VariableServerThread(&listen_dev) ;
vst->copy_cpus(get_cpus()) ;
vst->create_thread() ;
vst->wait_for_accept() ;
pthread_mutex_unlock(&restart_pause) ;
} else {
if ( broadcast ) {
sprintf(buf1 , "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , listen_dev.hostname , (unsigned short)listen_dev.port ,
@ -179,6 +184,7 @@ void * Trick::VariableServerListenThread::thread_body() {
sendto(mcast_socket , buf1 , strlen(buf1) , 0 , (struct sockaddr *)&mcast_addr , (socklen_t)sizeof(mcast_addr)) ;
}
}
}
return NULL ;
@ -222,6 +228,14 @@ int Trick::VariableServerListenThread::restart() {
return 0 ;
}
void Trick::VariableServerListenThread::pause_listening() {
pthread_mutex_lock(&restart_pause) ;
}
void Trick::VariableServerListenThread::restart_listening() {
pthread_mutex_unlock(&restart_pause) ;
}
void Trick::VariableServerListenThread::dump( std::ostream & oss ) {
oss << "Trick::VariableServerListenThread (" << name << ")" << std::endl ;
oss << " source_address = " << source_address << std::endl ;

View File

@ -48,6 +48,7 @@ Trick::VariableServerThread::VariableServerThread(TCDevice * in_listen_dev) :
connection.error_handler->report_level = TRICK_ERROR_CAUTION;
pthread_mutex_init(&copy_mutex, NULL);
pthread_mutex_init(&restart_pause, NULL);
var_data_staged = false;
packets_copied = 0 ;

View File

@ -67,6 +67,9 @@ void * Trick::VariableServerThread::thread_body() {
try {
while (1) {
// Pause here if we are in a restart condition
pthread_mutex_lock(&restart_pause) ;
/* Check the length of the message on the socket */
nbytes = recvfrom( connection.socket, incoming_msg, MAX_CMD_LEN, MSG_PEEK, NULL, NULL ) ;
if (nbytes == 0 ) {
@ -151,6 +154,7 @@ void * Trick::VariableServerThread::thread_body() {
}
}
}
pthread_mutex_unlock(&restart_pause) ;
usleep((unsigned int) (update_rate * 1000000));
}

View File

@ -3,6 +3,10 @@
#include "trick/VariableServerThread.hh"
void Trick::VariableServerThread::preload_checkpoint() {
// Stop variable server processing at the top of the processing loop.
pthread_mutex_lock(&restart_pause);
// Let the thread complete any data copying it has to do
// and then suspend data copying until the checkpoint is reloaded.
pthread_mutex_lock(&copy_mutex);
@ -23,15 +27,19 @@ void Trick::VariableServerThread::preload_checkpoint() {
(*it)->ref->attr->units = (char *)"--" ;
(*it)->ref->attr->size = sizeof(int) ;
}
}
void Trick::VariableServerThread::restart() {
// Set the pause state of this thread back to its "pre-checkpoint reload" state.
pause_cmd = saved_pause_cmd ;
// Allow data copying to continue.
pthread_mutex_unlock(&copy_mutex);
}
void Trick::VariableServerThread::restart() {
// Set the pause state of this thread back to its "pre-checkpoint reload" state.
pause_cmd = saved_pause_cmd ;
// Restart the variable server processing.
pthread_mutex_unlock(&restart_pause);
}

View File

@ -25,6 +25,8 @@ int Trick::VariableServer::restart() {
int Trick::VariableServer::suspendPreCheckpointReload() {
std::map<pthread_t, VariableServerThread*>::iterator pos ;
listen_thread.pause_listening() ;
pthread_mutex_lock(&map_mutex) ;
for ( pos = var_server_threads.begin() ; pos != var_server_threads.end() ; pos++ ) {
VariableServerThread* vst = (*pos).second ;
@ -46,5 +48,8 @@ int Trick::VariableServer::resumePostCheckpointReload() {
vst->restart() ;
}
pthread_mutex_unlock(&map_mutex) ;
listen_thread.restart_listening() ;
return 0;
}