Variable server restart issues

Backporting addition to suspend accepting new variable server connections and
all communications to variable server clients for the duration of reloading
a checkpoint

refs #168
This commit is contained in:
Alex Lin 2016-04-19 14:24:25 -05:00
parent f1a26615f1
commit be5835f92f
7 changed files with 47 additions and 5 deletions

View File

@ -49,6 +49,10 @@ namespace Trick {
int restart() ;
// pause and restart listen thread during checkpoint reload
void pause_listening() ;
void restart_listening() ;
virtual void dump( std::ostream & oss = std::cout ) ;
protected:
@ -70,6 +74,9 @@ namespace Trick {
/** The listen device\n */
TCDevice listen_dev; /**< trick_io(**) */
/** The mutex to stop accepting new connections during restart\n */
pthread_mutex_t restart_pause ; /**< trick_io(**) */
} ;
}

View File

@ -471,6 +471,9 @@ namespace Trick {
/** The mutex to protect variable output buffers when copying variable values to them from Trick memory.\n */
pthread_mutex_t copy_mutex ; /**< trick_io(**) */
/** The mutex pauses all processing during checkpoint restart */
pthread_mutex_t restart_pause ; /**< trick_io(**) */
/** Dummy integer for bad references.\n */
static int bad_ref_int ; /**< trick_io(**) */

View File

@ -22,6 +22,7 @@ Trick::VariableServerListenThread::VariableServerListenThread() :
source_address = std::string(hname) ;
strcpy(listen_dev.client_tag, "<empty>");
tc_error(&listen_dev, 0);
pthread_mutex_init(&restart_pause, NULL);
}
Trick::VariableServerListenThread::~VariableServerListenThread() {
@ -160,16 +161,20 @@ void * Trick::VariableServerListenThread::thread_body() {
}
while (1) {
FD_ZERO(&rfds);
FD_SET(listen_dev.socket, &rfds);
timeout_time.tv_sec = 2 ;
select(listen_dev.socket + 1, &rfds, NULL, NULL, &timeout_time);
if (FD_ISSET(listen_dev.socket, &rfds)) {
// pause here during restart
pthread_mutex_lock(&restart_pause) ;
vst = new Trick::VariableServerThread(&listen_dev) ;
vst->copy_cpus(get_cpus()) ;
vst->create_thread() ;
vst->wait_for_accept() ;
pthread_mutex_unlock(&restart_pause) ;
} else {
if ( broadcast ) {
sprintf(buf1 , "%s\t%hu\t%s\t%d\t%s\t%s\t%s\t%s\t%s\t%hu\n" , listen_dev.hostname , (unsigned short)listen_dev.port ,
@ -179,6 +184,7 @@ void * Trick::VariableServerListenThread::thread_body() {
sendto(mcast_socket , buf1 , strlen(buf1) , 0 , (struct sockaddr *)&mcast_addr , (socklen_t)sizeof(mcast_addr)) ;
}
}
}
return NULL ;
@ -222,6 +228,14 @@ int Trick::VariableServerListenThread::restart() {
return 0 ;
}
void Trick::VariableServerListenThread::pause_listening() {
pthread_mutex_lock(&restart_pause) ;
}
void Trick::VariableServerListenThread::restart_listening() {
pthread_mutex_unlock(&restart_pause) ;
}
void Trick::VariableServerListenThread::dump( std::ostream & oss ) {
oss << "Trick::VariableServerListenThread (" << name << ")" << std::endl ;
oss << " source_address = " << source_address << std::endl ;

View File

@ -48,6 +48,7 @@ Trick::VariableServerThread::VariableServerThread(TCDevice * in_listen_dev) :
connection.error_handler->report_level = TRICK_ERROR_CAUTION;
pthread_mutex_init(&copy_mutex, NULL);
pthread_mutex_init(&restart_pause, NULL);
var_data_staged = false;
packets_copied = 0 ;

View File

@ -65,6 +65,9 @@ void * Trick::VariableServerThread::thread_body() {
try {
while (1) {
// Pause here if we are in a restart condition
pthread_mutex_lock(&restart_pause) ;
/* Check the length of the message on the socket */
nbytes = recvfrom( connection.socket, incoming_msg, MAX_CMD_LEN, MSG_PEEK, NULL, NULL ) ;
if (nbytes == 0 ) {
@ -149,6 +152,7 @@ void * Trick::VariableServerThread::thread_body() {
}
}
}
pthread_mutex_unlock(&restart_pause) ;
usleep((unsigned int) (update_rate * 1000000));
}

View File

@ -3,6 +3,10 @@
#include "sim_services/VariableServer/include/VariableServerThread.hh"
void Trick::VariableServerThread::preload_checkpoint() {
// Stop variable server processing at the top of the processing loop.
pthread_mutex_lock(&restart_pause);
// Let the thread complete any data copying it has to do
// and then suspend data copying until the checkpoint is reloaded.
pthread_mutex_lock(&copy_mutex);
@ -23,15 +27,19 @@ void Trick::VariableServerThread::preload_checkpoint() {
(*it)->ref->attr->units = (char *)"--" ;
(*it)->ref->attr->size = sizeof(int) ;
}
}
void Trick::VariableServerThread::restart() {
// Set the pause state of this thread back to its "pre-checkpoint reload" state.
pause_cmd = saved_pause_cmd ;
// Allow data copying to continue.
pthread_mutex_unlock(&copy_mutex);
}
void Trick::VariableServerThread::restart() {
// Set the pause state of this thread back to its "pre-checkpoint reload" state.
pause_cmd = saved_pause_cmd ;
// Restart the variable server processing.
pthread_mutex_unlock(&restart_pause);
}

View File

@ -25,6 +25,8 @@ int Trick::VariableServer::restart() {
int Trick::VariableServer::suspendPreCheckpointReload() {
std::map<pthread_t, VariableServerThread*>::iterator pos ;
listen_thread.pause_listening() ;
pthread_mutex_lock(&map_mutex) ;
for ( pos = var_server_threads.begin() ; pos != var_server_threads.end() ; pos++ ) {
VariableServerThread* vst = (*pos).second ;
@ -46,5 +48,8 @@ int Trick::VariableServer::resumePostCheckpointReload() {
vst->restart() ;
}
pthread_mutex_unlock(&map_mutex) ;
listen_thread.restart_listening() ;
return 0;
}