mirror of
https://github.com/nasa/trick.git
synced 2025-01-29 15:43:57 +00:00
parent
17a5028e43
commit
80ab0767e2
@ -333,12 +333,6 @@ namespace Trick {
|
||||
/** Device over which data is sent and received. */
|
||||
TCDevice connection_device; /**< \n trick_units(--) */
|
||||
|
||||
/** Device over which connections are accepted between the Slave child and Master. */
|
||||
TCDevice data_listen_device; /**< \n trick_units(--) */
|
||||
|
||||
/** Device over which data is sent and received between Slave child and Master. */
|
||||
TCDevice data_connection_device; /**< \n trick_units(--) */
|
||||
|
||||
/** Runs to be dispatched. */
|
||||
std::deque <Trick::MonteRun *> runs; /**< \n trick_io(**) trick_units(--) */
|
||||
|
||||
@ -381,9 +375,6 @@ namespace Trick {
|
||||
/** Port on which the master is listening. This value is unspecified for the master. */
|
||||
unsigned int master_port; /**< \n trick_units(--) */
|
||||
|
||||
/** Port on which the master is listening for data. This value is unspecified for the master. */
|
||||
unsigned int data_port; /**< \n trick_units(--) */
|
||||
|
||||
/** Unique identifier. This value is zero for the master. */
|
||||
unsigned int slave_id; /**< \n trick_units(--) */
|
||||
|
||||
@ -710,12 +701,6 @@ namespace Trick {
|
||||
*/
|
||||
void set_current_run(int run_num) ;
|
||||
|
||||
/** Retrieves the #data_connection_device
|
||||
*
|
||||
* @return the address of the data_connection_device
|
||||
*/
|
||||
TCDevice* get_data_connection_device();
|
||||
|
||||
/** Allows the user to set the port number for
|
||||
* the listen_device
|
||||
*
|
||||
@ -723,13 +708,6 @@ namespace Trick {
|
||||
*/
|
||||
void set_listen_device_port(int port_number) ;
|
||||
|
||||
/** Allows the user to set the port number for
|
||||
* the data_listen_device
|
||||
*
|
||||
* @param port_number number for the port
|
||||
*/
|
||||
void set_data_listen_device_port(int port_number) ;
|
||||
|
||||
/** Allows the user to set the port number for
|
||||
* the connection_device
|
||||
*
|
||||
@ -737,13 +715,6 @@ namespace Trick {
|
||||
*/
|
||||
void set_connection_device_port(int port_number) ;
|
||||
|
||||
/** Allows the user to set the port number for
|
||||
* the data_connection_device
|
||||
*
|
||||
* @param port_number number for the port
|
||||
*/
|
||||
void set_data_connection_device_port(int port_number) ;
|
||||
|
||||
/** Allows the user to get the port number for
|
||||
* the listen_device
|
||||
*
|
||||
@ -751,13 +722,6 @@ namespace Trick {
|
||||
*/
|
||||
int get_listen_device_port() ;
|
||||
|
||||
/** Allows the user to get the port number for
|
||||
* the data_listen_device
|
||||
*
|
||||
* @return the port number
|
||||
*/
|
||||
int get_data_listen_device_port() ;
|
||||
|
||||
/** Allows the user to get the port number for
|
||||
* the connection_device
|
||||
*
|
||||
@ -765,12 +729,8 @@ namespace Trick {
|
||||
*/
|
||||
int get_connection_device_port() ;
|
||||
|
||||
/** Allows the user to get the port number for
|
||||
* the data_connection_device
|
||||
*
|
||||
* @return the port number
|
||||
*/
|
||||
int get_data_connection_device_port() ;
|
||||
int write(char* data, int size);
|
||||
int read(char* data, int size);
|
||||
|
||||
#if 0
|
||||
/**
|
||||
@ -811,14 +771,9 @@ namespace Trick {
|
||||
/** Receives from any slaves that are ready to return results. */
|
||||
void receive_results();
|
||||
|
||||
/** Receives the results from the slave */
|
||||
void receive_slave_results() ;
|
||||
|
||||
void read_machine_name(MonteSlave *curr_slave);
|
||||
|
||||
void set_disconnected_state(MonteSlave *curr_slave);
|
||||
|
||||
void read_slave_port(MonteSlave *curr_slave);
|
||||
void handle_initialization(MonteSlave& slave);
|
||||
void handle_run_data(MonteSlave& slave);
|
||||
void set_disconnected_state(MonteSlave& slave);
|
||||
|
||||
/**
|
||||
* Handles the retrying of the current run of the specified slave with the specified exit status.
|
||||
@ -828,7 +783,7 @@ namespace Trick {
|
||||
*
|
||||
* @see max_tries
|
||||
*/
|
||||
void handle_retry(MonteSlave *slave, MonteRun::ExitStatus exit_status);
|
||||
void handle_retry(MonteSlave& slave, MonteRun::ExitStatus exit_status);
|
||||
|
||||
/**
|
||||
* Resolves the current run of the specified slave with the specified exit status.
|
||||
@ -836,7 +791,7 @@ namespace Trick {
|
||||
* @param slave the slave processing the run
|
||||
* @param exit_status the exit status of the run
|
||||
*/
|
||||
void resolve_run(MonteSlave *slave, MonteRun::ExitStatus exit_status);
|
||||
void resolve_run(MonteSlave& slave, MonteRun::ExitStatus exit_status);
|
||||
|
||||
/** Checks dispatched runs for timeouts. */
|
||||
void check_timeouts();
|
||||
@ -916,7 +871,7 @@ namespace Trick {
|
||||
*
|
||||
* @return 0 on success
|
||||
*/
|
||||
int slave();
|
||||
int execute_as_slave();
|
||||
|
||||
/** Processes an incoming run. */
|
||||
int slave_process_run();
|
||||
|
@ -199,13 +199,13 @@ void mc_stop_slave(unsigned int id);
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Gets the data_connection_device and tc_write()s the given buffer.
|
||||
* Gets the connection_device and tc_write()s the given buffer.
|
||||
*/
|
||||
void mc_write(char *buffer, int size);
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Gets the data_connection_device and tc_read()s the incoming string.
|
||||
* Gets the connection_device and tc_read()s the incoming string.
|
||||
*/
|
||||
void mc_read(char *buffer, int size);
|
||||
|
||||
@ -221,60 +221,30 @@ unsigned int mc_get_current_run() ;
|
||||
*/
|
||||
void mc_set_current_run(int num_run) ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Gets the data_connection_device.
|
||||
*/
|
||||
TCDevice* mc_get_connection_device() ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Sets the port for the listen_device.
|
||||
*/
|
||||
void mc_set_listen_device_port(int port_number) ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Sets the port for the data_listen_device.
|
||||
*/
|
||||
void mc_set_data_listen_device_port(int port_number) ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Sets the port for the connection_device.
|
||||
*/
|
||||
void mc_set_connection_device_port(int port_number) ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Sets the port for the data_connection_device.
|
||||
*/
|
||||
void mc_set_data_connection_device_port(int port_number) ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Gets the port for the listen_device.
|
||||
*/
|
||||
int mc_get_listen_device_port() ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Gets the port for the data_listen_device.
|
||||
*/
|
||||
int mc_get_data_listen_device_port() ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Gets the port for the connection_device.
|
||||
*/
|
||||
int mc_get_connection_device_port() ;
|
||||
|
||||
/**
|
||||
* @relates Trick::MonteCarlo
|
||||
* Gets the port for the data_connection_device.
|
||||
*/
|
||||
int mc_get_data_connection_device_port() ;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
@ -28,8 +28,6 @@ Trick::MonteCarlo::MonteCarlo() :
|
||||
|
||||
memset(&listen_device, 0, sizeof(TCDevice)) ;
|
||||
memset(&connection_device, 0, sizeof(TCDevice)) ;
|
||||
memset(&data_listen_device, 0, sizeof(TCDevice)) ;
|
||||
memset(&data_connection_device, 0, sizeof(TCDevice)) ;
|
||||
|
||||
listen_device.port = 7200;
|
||||
connection_device.port = 7200;
|
||||
@ -37,16 +35,8 @@ Trick::MonteCarlo::MonteCarlo() :
|
||||
listen_device.disable_handshaking = TC_COMM_TRUE;
|
||||
connection_device.disable_handshaking = TC_COMM_TRUE;
|
||||
|
||||
data_listen_device.port = 7400;
|
||||
data_connection_device.port = 7400;
|
||||
|
||||
data_listen_device.disable_handshaking = TC_COMM_TRUE;
|
||||
data_connection_device.disable_handshaking = TC_COMM_TRUE;
|
||||
|
||||
tc_error(&listen_device, 0);
|
||||
tc_error(&connection_device, 0);
|
||||
tc_error(&data_listen_device, 0);
|
||||
tc_error(&data_connection_device, 0);
|
||||
|
||||
int num_classes = 0;
|
||||
class_map["monte_master_init"] = num_classes;
|
||||
@ -79,12 +69,8 @@ Trick::MonteCarlo::~MonteCarlo() {
|
||||
/* tc_error allocates memory in the constructor */
|
||||
free(listen_device.error_handler) ;
|
||||
free(connection_device.error_handler) ;
|
||||
free(data_listen_device.error_handler) ;
|
||||
free(data_connection_device.error_handler) ;
|
||||
listen_device.error_handler = NULL ;
|
||||
connection_device.error_handler = NULL ;
|
||||
data_listen_device.error_handler = NULL ;
|
||||
data_connection_device.error_handler = NULL ;
|
||||
}
|
||||
|
||||
|
||||
|
@ -209,25 +209,16 @@ extern "C" void mc_stop_slave(unsigned int id) {
|
||||
|
||||
extern "C" void mc_write(char *buffer, int size) {
|
||||
if ( the_mc != NULL ) {
|
||||
TCDevice *data_conn = the_mc->get_data_connection_device();
|
||||
tc_write(data_conn, buffer, size) ;
|
||||
the_mc->write(buffer, size) ;
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" void mc_read(char *buffer, int size) {
|
||||
if ( the_mc != NULL ) {
|
||||
TCDevice *data_conn = the_mc->get_data_connection_device();
|
||||
tc_read(data_conn, buffer, size) ;
|
||||
the_mc->read(buffer, size) ;
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" TCDevice* mc_get_connection_device() {
|
||||
if ( the_mc != NULL ) {
|
||||
return (the_mc->get_data_connection_device());
|
||||
}
|
||||
return NULL ;
|
||||
}
|
||||
|
||||
extern "C" unsigned int mc_get_current_run() {
|
||||
if ( the_mc != NULL ) {
|
||||
return the_mc->get_current_run();
|
||||
@ -247,24 +238,12 @@ extern "C" void mc_set_listen_device_port(int port_number) {
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" void mc_set_data_listen_device_port(int port_number) {
|
||||
if ( the_mc != NULL ) {
|
||||
the_mc->set_data_listen_device_port(port_number);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" void mc_set_connection_device_port(int port_number) {
|
||||
if ( the_mc != NULL ) {
|
||||
the_mc->set_connection_device_port(port_number);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" void mc_set_data_connection_device_port(int port_number) {
|
||||
if ( the_mc != NULL ) {
|
||||
the_mc->set_data_connection_device_port(port_number);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" int mc_get_listen_device_port() {
|
||||
if ( the_mc != NULL ) {
|
||||
return the_mc->get_listen_device_port();
|
||||
@ -272,24 +251,9 @@ extern "C" int mc_get_listen_device_port() {
|
||||
return -1 ;
|
||||
}
|
||||
|
||||
extern "C" int mc_get_data_listen_device_port() {
|
||||
if ( the_mc != NULL ) {
|
||||
return the_mc->get_data_listen_device_port();
|
||||
}
|
||||
return -1 ;
|
||||
}
|
||||
|
||||
extern "C" int mc_get_connection_device_port() {
|
||||
if ( the_mc != NULL ) {
|
||||
return the_mc->get_connection_device_port();
|
||||
}
|
||||
return -1 ;
|
||||
}
|
||||
|
||||
extern "C" int mc_get_data_connection_device_port() {
|
||||
if ( the_mc != NULL ) {
|
||||
return the_mc->get_data_connection_device_port();
|
||||
}
|
||||
return -1 ;
|
||||
}
|
||||
|
||||
|
@ -8,30 +8,30 @@
|
||||
#include "trick/message_proto.h"
|
||||
#include "trick/message_type.h"
|
||||
|
||||
void Trick::MonteCarlo::dispatch_run_to_slave(MonteRun *in_run, MonteSlave *in_slave) {
|
||||
if (in_slave && in_run) {
|
||||
current_run = in_run->id;
|
||||
if (prepare_run(in_run) == -1) {
|
||||
void Trick::MonteCarlo::dispatch_run_to_slave(MonteRun *run, MonteSlave *slave) {
|
||||
if (slave && run) {
|
||||
current_run = run->id;
|
||||
if (prepare_run(run) == -1) {
|
||||
return;
|
||||
}
|
||||
in_slave->state = MonteSlave::RUNNING;
|
||||
connection_device.hostname = (char*)in_slave->machine_name.c_str();
|
||||
connection_device.port = in_slave->port;
|
||||
slave->state = MonteSlave::RUNNING;
|
||||
connection_device.hostname = (char*)slave->machine_name.c_str();
|
||||
connection_device.port = slave->port;
|
||||
if (tc_connect(&connection_device) == TC_SUCCESS) {
|
||||
std::stringstream buffer_stream;
|
||||
buffer_stream << run_directory << "/RUN_" << std::setw(5) << std::setfill('0') << in_run->id;
|
||||
buffer_stream << run_directory << "/RUN_" << std::setw(5) << std::setfill('0') << run->id;
|
||||
std::string buffer = "";
|
||||
for (std::vector<std::string>::size_type j = 0; j < in_run->variables.size(); ++j) {
|
||||
buffer += in_run->variables[j] + "\n";
|
||||
for (std::vector<std::string>::size_type j = 0; j < run->variables.size(); ++j) {
|
||||
buffer += run->variables[j] + "\n";
|
||||
}
|
||||
buffer += std::string("trick.set_output_dir(\"") + buffer_stream.str() + std::string("\")\n");
|
||||
buffer_stream.str("");
|
||||
buffer_stream << in_run->id ;
|
||||
buffer_stream << run->id ;
|
||||
buffer += std::string("trick.mc_set_current_run(") + buffer_stream.str() + std::string(")\n");
|
||||
|
||||
if (verbosity >= INFORMATIONAL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Dispatching run %d to %s:%d.\n",
|
||||
in_run->id, in_slave->machine_name.c_str(), in_slave->id) ;
|
||||
run->id, slave->machine_name.c_str(), slave->id) ;
|
||||
}
|
||||
|
||||
int command = htonl(MonteSlave::PROCESS_RUN);
|
||||
@ -41,23 +41,23 @@ void Trick::MonteCarlo::dispatch_run_to_slave(MonteRun *in_run, MonteSlave *in_s
|
||||
tc_write(&connection_device, (char*)buffer.c_str(), (int)buffer.length());
|
||||
|
||||
if (verbosity >= INFORMATIONAL) {
|
||||
message_publish(MSG_INFO, "Parameterization of run %d :\n%s\n", in_run->id, buffer.c_str()) ;
|
||||
message_publish(MSG_INFO, "Parameterization of run %d :\n%s\n", run->id, buffer.c_str()) ;
|
||||
}
|
||||
|
||||
tc_disconnect(&connection_device);
|
||||
|
||||
++in_slave->num_dispatches;
|
||||
in_slave->current_run = in_run;
|
||||
++slave->num_dispatches;
|
||||
slave->current_run = run;
|
||||
|
||||
struct timeval time_val;
|
||||
gettimeofday(&time_val, NULL);
|
||||
in_run->start_time = time_val.tv_sec + (double)time_val.tv_usec / 1000000;
|
||||
++in_run->num_tries;
|
||||
run->start_time = time_val.tv_sec + (double)time_val.tv_usec / 1000000;
|
||||
++run->num_tries;
|
||||
} else {
|
||||
in_slave->state = Trick::MonteSlave::DISCONNECTED;
|
||||
slave->state = Trick::MonteSlave::DISCONNECTED;
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] Lost connection to %s:%d while dispatching run.\n",
|
||||
in_slave->machine_name.c_str(), in_slave->id) ;
|
||||
message_publish(MSG_ERROR, "Monte [Master] Failed to connect to %s:%d to dispatch run.\n",
|
||||
slave->machine_name.c_str(), slave->id) ;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ int Trick::MonteCarlo::execute_monte() {
|
||||
master();
|
||||
} else {
|
||||
slave_init();
|
||||
slave();
|
||||
execute_as_slave();
|
||||
}
|
||||
}
|
||||
return(0);
|
||||
|
@ -204,16 +204,16 @@ void Trick::MonteCarlo::add_slave(Trick::MonteSlave *in_slave) {
|
||||
* This function has an effect only if the slave exists and is in the STOPPING, UNRESPONSIVE_STOPPING, or STOPPED state.
|
||||
*/
|
||||
void Trick::MonteCarlo::start_slave(unsigned int id) {
|
||||
if (MonteSlave *curr_slave = get_slave(id)) {
|
||||
if (MonteSlave *slave = get_slave(id)) {
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Starting %s:%d.\n", curr_slave->machine_name.c_str(), curr_slave->id) ;
|
||||
message_publish(MSG_INFO, "Monte [Master] Starting %s:%d.\n", slave->machine_name.c_str(), slave->id) ;
|
||||
}
|
||||
if (curr_slave->state == Trick::MonteSlave::STOPPING) {
|
||||
curr_slave->state = Trick::MonteSlave::RUNNING;
|
||||
} else if (curr_slave->state == Trick::MonteSlave::UNRESPONSIVE_STOPPING) {
|
||||
curr_slave->state = Trick::MonteSlave::UNRESPONSIVE_RUNNING;
|
||||
} else if (curr_slave->state == Trick::MonteSlave::STOPPED) {
|
||||
curr_slave->state = Trick::MonteSlave::READY;
|
||||
if (slave->state == Trick::MonteSlave::STOPPING) {
|
||||
slave->state = Trick::MonteSlave::RUNNING;
|
||||
} else if (slave->state == Trick::MonteSlave::UNRESPONSIVE_STOPPING) {
|
||||
slave->state = Trick::MonteSlave::UNRESPONSIVE_RUNNING;
|
||||
} else if (slave->state == Trick::MonteSlave::STOPPED) {
|
||||
slave->state = Trick::MonteSlave::READY;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -223,16 +223,16 @@ void Trick::MonteCarlo::start_slave(unsigned int id) {
|
||||
* This function has an effect only if the slave exists and is in the READY, RUNNING, or UNRESPONSIVE_RUNNING state.
|
||||
*/
|
||||
void Trick::MonteCarlo::stop_slave(unsigned int id) {
|
||||
if (MonteSlave *curr_slave = get_slave(id)) {
|
||||
if (MonteSlave *slave = get_slave(id)) {
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Stopping %s:%d.\n", curr_slave->machine_name.c_str(), curr_slave->id) ;
|
||||
message_publish(MSG_INFO, "Monte [Master] Stopping %s:%d.\n", slave->machine_name.c_str(), slave->id) ;
|
||||
}
|
||||
if (curr_slave->state == Trick::MonteSlave::READY) {
|
||||
curr_slave->state = Trick::MonteSlave::STOPPED;
|
||||
} else if (curr_slave->state == Trick::MonteSlave::RUNNING) {
|
||||
curr_slave->state = Trick::MonteSlave::STOPPING;
|
||||
} else if (curr_slave->state == Trick::MonteSlave::UNRESPONSIVE_RUNNING) {
|
||||
curr_slave->state = Trick::MonteSlave::UNRESPONSIVE_STOPPING;
|
||||
if (slave->state == Trick::MonteSlave::READY) {
|
||||
slave->state = Trick::MonteSlave::STOPPED;
|
||||
} else if (slave->state == Trick::MonteSlave::RUNNING) {
|
||||
slave->state = Trick::MonteSlave::STOPPING;
|
||||
} else if (slave->state == Trick::MonteSlave::UNRESPONSIVE_RUNNING) {
|
||||
slave->state = Trick::MonteSlave::UNRESPONSIVE_STOPPING;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -262,13 +262,8 @@ int Trick::MonteCarlo::process_sim_args() {
|
||||
for (int i = 2; i < argc; ++i) {
|
||||
if (!strncmp("--monte_host", argv[i], 12)) {
|
||||
connection_device.hostname = strdup(argv[++i]);
|
||||
data_connection_device.hostname = strdup(argv[i]);
|
||||
} else if (!strncmp("--monte_sync_port", argv[i], 17)) {
|
||||
sscanf(argv[++i], "%d", &master_port);
|
||||
connection_device.port = master_port;
|
||||
} else if (!strncmp("--monte_data_port", argv[i], 17)) {
|
||||
sscanf(argv[++i], "%d", &data_port);
|
||||
data_connection_device.port = data_port;
|
||||
} else if (!strncmp("--monte_client_id", argv[i], 12)) {
|
||||
sscanf(argv[++i], "%d", &slave_id);
|
||||
}
|
||||
@ -281,17 +276,25 @@ int Trick::MonteCarlo::process_sim_args() {
|
||||
int Trick::MonteCarlo::shutdown() {
|
||||
/** <ul><li> If this is a slave, run the shutdown jobs. */
|
||||
if (enabled && is_slave()) {
|
||||
data_connection_device.port = data_port;
|
||||
if (tc_connect(&data_connection_device) == TC_SUCCESS) {
|
||||
connection_device.port = master_port;
|
||||
if (tc_connect(&connection_device) == TC_SUCCESS) {
|
||||
int exit_status = MonteRun::COMPLETE;
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] Sending run exit status to master: %d\n",
|
||||
machine_name.c_str(), slave_id, exit_status) ;
|
||||
}
|
||||
int id = htonl(slave_id);
|
||||
tc_write(&data_connection_device, (char *)&id, (int)sizeof(id));
|
||||
int run_num = htonl(current_run);
|
||||
tc_write(&data_connection_device, (char *)&run_num, (int)sizeof(run_num));
|
||||
run_queue(&slave_post_queue, "in slave_post queue") ;
|
||||
tc_disconnect(&data_connection_device);
|
||||
tc_write(&connection_device, (char*)&id, (int)sizeof(id));
|
||||
exit_status = htonl(exit_status);
|
||||
tc_write(&connection_device, (char*)&exit_status, (int)sizeof(exit_status));
|
||||
run_queue(&slave_post_queue, "in slave_post queue");
|
||||
tc_disconnect(&connection_device);
|
||||
} else {
|
||||
if (verbosity >= ERROR)
|
||||
message_publish(MSG_ERROR, "Monte ERROR: Child failed to connect to data connection.\n") ;
|
||||
message_publish(
|
||||
MSG_ERROR,
|
||||
"Monte [%s:%d] Failed to connect to master.\n",
|
||||
machine_name.c_str(), slave_id);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
@ -305,41 +308,41 @@ int Trick::MonteCarlo::socket_init(TCDevice *in_listen_device) {
|
||||
return tc_init(in_listen_device);
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::handle_retry(MonteSlave *curr_slave, MonteRun::ExitStatus exit_status) {
|
||||
if (max_tries <= 0 || curr_slave->current_run->num_tries < max_tries) {
|
||||
void Trick::MonteCarlo::handle_retry(MonteSlave& slave, MonteRun::ExitStatus exit_status) {
|
||||
if (max_tries <= 0 || slave.current_run->num_tries < max_tries) {
|
||||
// Add the run to the retry queue.
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] Queueing run %d for retry.\n", curr_slave->current_run->id) ;
|
||||
message_publish(MSG_ERROR, "Monte [Master] Queueing run %d for retry.\n", slave.current_run->id) ;
|
||||
}
|
||||
runs.push_back(curr_slave->current_run);
|
||||
runs.push_back(slave.current_run);
|
||||
} else {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] Run %d has reached its maximum allowed tries and has been skipped.\n",
|
||||
curr_slave->current_run->id) ;
|
||||
slave.current_run->id) ;
|
||||
}
|
||||
resolve_run(curr_slave, exit_status);
|
||||
resolve_run(slave, exit_status);
|
||||
}
|
||||
}
|
||||
|
||||
/** @par Detailed Design: */
|
||||
void Trick::MonteCarlo::resolve_run(MonteSlave *curr_slave, MonteRun::ExitStatus exit_status) {
|
||||
void Trick::MonteCarlo::resolve_run(MonteSlave& slave, MonteRun::ExitStatus exit_status) {
|
||||
if (exit_status != MonteRun::COMPLETE) {
|
||||
failed_runs.push_back(curr_slave->current_run);
|
||||
failed_runs.push_back(slave.current_run);
|
||||
}
|
||||
|
||||
/** <li> Update the bookkeeping. */
|
||||
struct timeval time_val;
|
||||
gettimeofday(&time_val, NULL);
|
||||
curr_slave->current_run->end_time = time_val.tv_sec + (double)time_val.tv_usec / 1000000;
|
||||
curr_slave->current_run->exit_status = exit_status;
|
||||
slave.current_run->end_time = time_val.tv_sec + (double)time_val.tv_usec / 1000000;
|
||||
slave.current_run->exit_status = exit_status;
|
||||
|
||||
++curr_slave->num_results;
|
||||
curr_slave->cpu_time += curr_slave->current_run->end_time - curr_slave->current_run->start_time;
|
||||
++slave.num_results;
|
||||
slave.cpu_time += slave.current_run->end_time - slave.current_run->start_time;
|
||||
|
||||
++num_results;
|
||||
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Run %d has been resolved as: %d.\n",curr_slave->current_run->id, exit_status) ;
|
||||
message_publish(MSG_INFO, "Monte [Master] Run %d has been resolved as: %d.\n",slave.current_run->id, exit_status) ;
|
||||
}
|
||||
}
|
||||
|
||||
@ -362,7 +365,7 @@ void Trick::MonteCarlo::check_timeouts() {
|
||||
message_publish(MSG_ERROR, "Monte [Master] %s:%d has not responded for run %d.\n",
|
||||
slaves[i]->machine_name.c_str(), slaves[i]->id, slaves[i]->current_run->id) ;
|
||||
}
|
||||
handle_retry(slaves[i], MonteRun::TIMEDOUT);
|
||||
handle_retry(*slaves[i], MonteRun::TIMEDOUT);
|
||||
}
|
||||
/** </ul><li> Update the slave's state. */
|
||||
slaves[i]->state = slaves[i]->state == MonteSlave::RUNNING ?
|
||||
@ -524,20 +527,11 @@ void Trick::MonteCarlo::set_current_run(int run_num) {
|
||||
current_run = run_num ;
|
||||
}
|
||||
|
||||
TCDevice* Trick::MonteCarlo::get_data_connection_device() {
|
||||
return (&data_connection_device);
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::set_listen_device_port(int port_number) {
|
||||
listen_device.port = port_number ;
|
||||
default_port_flag = false ;
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::set_data_listen_device_port(int port_number) {
|
||||
data_listen_device.port = port_number ;
|
||||
default_port_flag = false ;
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::set_connection_device_port(int port_number) {
|
||||
// This port is passed to slave as an argument, do not override
|
||||
if (is_master()) {
|
||||
@ -546,30 +540,14 @@ void Trick::MonteCarlo::set_connection_device_port(int port_number) {
|
||||
}
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::set_data_connection_device_port(int port_number) {
|
||||
// This port is passed to slave as an argument, do not override
|
||||
if (is_master()) {
|
||||
data_connection_device.port = port_number ;
|
||||
default_port_flag = false ;
|
||||
}
|
||||
}
|
||||
|
||||
int Trick::MonteCarlo::get_listen_device_port() {
|
||||
return listen_device.port ;
|
||||
}
|
||||
|
||||
int Trick::MonteCarlo::get_data_listen_device_port() {
|
||||
return data_listen_device.port ;
|
||||
}
|
||||
|
||||
int Trick::MonteCarlo::get_connection_device_port() {
|
||||
return connection_device.port ;
|
||||
}
|
||||
|
||||
int Trick::MonteCarlo::get_data_connection_device_port() {
|
||||
return data_connection_device.port ;
|
||||
}
|
||||
|
||||
int Trick::MonteCarlo::instrument_job_before( Trick::JobData* instrument_job) {
|
||||
|
||||
int count = 0 ;
|
||||
@ -640,3 +618,11 @@ int Trick::MonteCarlo::write_s_job_execution(FILE *fp) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int Trick::MonteCarlo::write(char* data, int size) {
|
||||
return tc_write(&connection_device, data, size);
|
||||
}
|
||||
|
||||
int Trick::MonteCarlo::read(char* data, int size) {
|
||||
return tc_read(&connection_device, data, size);
|
||||
}
|
||||
|
@ -13,8 +13,6 @@ int Trick::MonteCarlo::initialize_sockets() {
|
||||
/** set tc_error to zero for TCDevices to turn off advisory messages. */
|
||||
tc_error(&listen_device, 0);
|
||||
tc_error(&connection_device, 0);
|
||||
tc_error(&data_listen_device, 0);
|
||||
tc_error(&data_connection_device, 0);
|
||||
|
||||
/** <ul><li> Initialize the sockets for communication with slaves. */
|
||||
int return_value = socket_init(&listen_device);
|
||||
@ -24,14 +22,7 @@ int Trick::MonteCarlo::initialize_sockets() {
|
||||
}
|
||||
return return_value;
|
||||
}
|
||||
|
||||
return_value = socket_init(&data_listen_device);
|
||||
if (return_value != TC_SUCCESS) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] Failed to initialize data communication socket.\n") ;
|
||||
}
|
||||
return return_value;
|
||||
}
|
||||
tc_blockio(&listen_device, TC_COMM_NOBLOCKIO);
|
||||
|
||||
/** <li> If no slaves were specified, add one on localhost. */
|
||||
if (slaves.empty()) {
|
||||
|
@ -23,7 +23,7 @@ int Trick::MonteCarlo::master_init() {
|
||||
/** <li> If this is a dry run return else initialize sockets: */
|
||||
if (dry_run) {
|
||||
return 0 ;
|
||||
} else {
|
||||
return initialize_sockets() ;
|
||||
}
|
||||
|
||||
return initialize_sockets() ;
|
||||
}
|
||||
|
@ -1,4 +1,3 @@
|
||||
|
||||
#include "trick/MonteCarlo.hh"
|
||||
#include "trick/message_proto.h"
|
||||
#include "trick/message_type.h"
|
||||
@ -9,58 +8,188 @@
|
||||
* This function performs a non-blocking accept on the listening socket.
|
||||
*/
|
||||
void Trick::MonteCarlo::receive_results() {
|
||||
fd_set rfds;
|
||||
struct timeval select_time;
|
||||
struct timeval timeout_time = {5,0};
|
||||
int max_socket;
|
||||
Trick::MonteSlave *curr_slave = NULL;
|
||||
unsigned int curr_slave_id;
|
||||
|
||||
FD_ZERO(&rfds);
|
||||
select_time = timeout_time;
|
||||
/** <li> While there are pending connections: */
|
||||
while (tc_accept(&listen_device, &connection_device) == TC_SUCCESS) {
|
||||
|
||||
// add data listen device
|
||||
FD_SET(data_listen_device.socket, &rfds);
|
||||
max_socket = data_listen_device.socket;
|
||||
|
||||
// add status listen device
|
||||
FD_SET(listen_device.socket, &rfds);
|
||||
if (max_socket < listen_device.socket) {
|
||||
max_socket = listen_device.socket;
|
||||
}
|
||||
|
||||
select(max_socket + 1, &rfds, NULL, NULL, &select_time);
|
||||
|
||||
if (FD_ISSET(listen_device.socket, &rfds)) {
|
||||
receive_slave_results() ;
|
||||
} // end of FD_ISSET(&listen_device.socket)
|
||||
|
||||
/** <li> Receive results from child */
|
||||
while (tc_listen(&data_listen_device)) {
|
||||
tc_accept(&data_listen_device, &data_connection_device);
|
||||
/** <ul><li> Read the id of incoming curr_slave. */
|
||||
tc_read(&data_connection_device, (char*)&curr_slave_id, (int)sizeof(curr_slave_id));
|
||||
curr_slave_id = ntohl(curr_slave_id);
|
||||
curr_slave = get_slave(curr_slave_id);
|
||||
tc_read(&data_connection_device, (char*)¤t_run, (int)sizeof(current_run));
|
||||
current_run = ntohl(current_run);
|
||||
/** <ul><li> Read the slave id. */
|
||||
int id;
|
||||
tc_read(&connection_device, (char*)&id, (int)sizeof(id));
|
||||
id = ntohl(id);
|
||||
MonteSlave* slave = get_slave(id);
|
||||
if (!slave) {
|
||||
message_publish(
|
||||
MSG_ERROR,
|
||||
"Monte [Master] Slave returned an invalid id (%d)\n",
|
||||
id) ;
|
||||
tc_disconnect(&connection_device);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* <ul><li> This run may have already been resolved by another curr_slave if
|
||||
* this curr_slave was marked as having timed out. If that is the case,
|
||||
* discard these results.
|
||||
* <li> If the slave is in the INITIALIZING state, it is sending us the
|
||||
* machine name and port over which it is listening for new runs.
|
||||
*/
|
||||
if (curr_slave->current_run->exit_status != MonteRun::INCOMPLETE) {
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Run %d has already been resolved. Discarding results.\n",
|
||||
curr_slave->current_run->id) ;
|
||||
}
|
||||
/** <li> else run optimization jobs */
|
||||
} else {
|
||||
curr_slave_id = curr_slave->id;
|
||||
run_queue(&master_post_queue, "in master_post queue") ;
|
||||
curr_slave_id = 0;
|
||||
if (slave->state == MonteSlave::INITIALIZING) {
|
||||
handle_initialization(*slave);
|
||||
}
|
||||
/** <li> Otherwise, it's sending us run data. */
|
||||
else {
|
||||
handle_run_data(*slave);
|
||||
}
|
||||
tc_disconnect(&data_connection_device);
|
||||
}
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::handle_initialization(Trick::MonteSlave& slave) {
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(
|
||||
MSG_INFO,
|
||||
"Monte [Master] Receiving initialization information from %s:%d.\n",
|
||||
slave.machine_name.c_str(), slave.id);
|
||||
}
|
||||
|
||||
int size;
|
||||
if (tc_read(&connection_device, (char*)&size, (int)sizeof(size)) != (int)sizeof(size)) {
|
||||
set_disconnected_state(slave);
|
||||
return;
|
||||
}
|
||||
size = ntohl(size);
|
||||
|
||||
char name[size + 1] = {};
|
||||
if (tc_read(&connection_device, name, size) != size) {
|
||||
set_disconnected_state(slave);
|
||||
return;
|
||||
}
|
||||
slave.machine_name = std::string(name);
|
||||
|
||||
size = (int)sizeof(slave.port);
|
||||
if (tc_read(&connection_device, (char*)&slave.port, size) != size) {
|
||||
set_disconnected_state(slave) ;
|
||||
return;
|
||||
}
|
||||
slave.port = ntohl(slave.port);
|
||||
|
||||
slave.state = MonteSlave::READY;
|
||||
tc_disconnect(&connection_device);
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::handle_run_data(Trick::MonteSlave& slave) {
|
||||
if (verbosity >= INFORMATIONAL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Receiving results for run %d from %s:%d.\n",
|
||||
slave.current_run->id, slave.machine_name.c_str(), slave.id) ;
|
||||
}
|
||||
|
||||
/**
|
||||
* <ul><li> This run may have already been resolved by another slave if
|
||||
* this slave was marked as having timed out. If that is the case,
|
||||
* discard these results.
|
||||
*/
|
||||
if (slave.current_run->exit_status != MonteRun::INCOMPLETE) {
|
||||
// TODO: If a slave times out or core dumps in it's monte_slave_post
|
||||
// jobs, the master will receive a COMPLETE status from the slave's
|
||||
// child process and then an error status from the parent, rendering
|
||||
// this message incorrect.
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(
|
||||
MSG_INFO,
|
||||
"Monte [Master] Run %d has already been resolved. Discarding results.\n",
|
||||
slave.current_run->id) ;
|
||||
}
|
||||
tc_disconnect(&connection_device);
|
||||
return;
|
||||
}
|
||||
|
||||
/** <li> Otherwise, check the exit status: */
|
||||
int exit_status;
|
||||
int size = sizeof(exit_status);
|
||||
if (tc_read(&connection_device, (char*)&exit_status, size) != size) {
|
||||
set_disconnected_state(slave) ;
|
||||
return;
|
||||
}
|
||||
exit_status = ntohl(exit_status);
|
||||
|
||||
switch (exit_status) {
|
||||
|
||||
case MonteRun::COMPLETE:
|
||||
resolve_run(slave, MonteRun::COMPLETE);
|
||||
run_queue(&master_post_queue, "in master_post queue") ;
|
||||
break;
|
||||
|
||||
case MonteRun::BAD_INPUT:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(
|
||||
MSG_ERROR,
|
||||
"Monte [Master] %s:%d reported bad input for run %d. Skipping.\n",
|
||||
slave.machine_name.c_str(), slave.id, slave.current_run->id) ;
|
||||
}
|
||||
resolve_run(slave, MonteRun::BAD_INPUT);
|
||||
break;
|
||||
|
||||
case MonteRun::CORED:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(
|
||||
MSG_ERROR,
|
||||
"Monte [Master] %s:%d reported core dump for run %d. Skipping.\n",
|
||||
slave.machine_name.c_str(), slave.id, slave.current_run->id) ;
|
||||
}
|
||||
resolve_run(slave, MonteRun::CORED);
|
||||
break;
|
||||
|
||||
case MonteRun::NO_PERM:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(
|
||||
MSG_ERROR,
|
||||
"Monte [Master] %s:%d reported a failure to create output directories for run %d.\n",
|
||||
slave.machine_name.c_str(), slave.id, slave.current_run->id);
|
||||
}
|
||||
handle_retry(slave, MonteRun::NO_PERM);
|
||||
break;
|
||||
|
||||
/**
|
||||
* <li> Timeouts and are redispatched. However, we must first check to
|
||||
* see if this run has already been processed in #check_timeouts, which
|
||||
* can occur when the master determines that a slave has timed out, and
|
||||
* then that slave itself reports a timeout. </ul>
|
||||
*/
|
||||
case MonteRun::TIMEDOUT:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(
|
||||
MSG_ERROR,
|
||||
"Monte [Master] %s:%d reported a timeout for run %d.\n",
|
||||
slave.machine_name.c_str(), slave.id, slave.current_run->id);
|
||||
}
|
||||
if (slave.state != MonteSlave::UNRESPONSIVE_RUNNING &&
|
||||
slave.state != MonteSlave::UNRESPONSIVE_STOPPING) {
|
||||
handle_retry(slave, MonteRun::TIMEDOUT);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(
|
||||
MSG_ERROR,
|
||||
"Monte [Master] %s:%d reported unrecognized exit status (%d) for run %d. Skipping.\n",
|
||||
slave.machine_name.c_str(), slave.id, exit_status, slave.current_run->id);
|
||||
}
|
||||
resolve_run(slave, MonteRun::UNKNOWN);
|
||||
break;
|
||||
}
|
||||
|
||||
tc_disconnect(&connection_device);
|
||||
|
||||
/** <li> Update the slave's state. */
|
||||
if (slave.state == MonteSlave::RUNNING || slave.state == MonteSlave::UNRESPONSIVE_RUNNING) {
|
||||
slave.state = MonteSlave::READY;
|
||||
} else if (slave.state == MonteSlave::STOPPING || slave.state == MonteSlave::UNRESPONSIVE_STOPPING) {
|
||||
slave.state = MonteSlave::STOPPED;
|
||||
}
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::set_disconnected_state(Trick::MonteSlave& slave) {
|
||||
slave.state = Trick::MonteSlave::DISCONNECTED;
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] Lost connection to %s:%d.\n",
|
||||
slave.machine_name.c_str(), slave.id) ;
|
||||
}
|
||||
tc_disconnect(&connection_device);
|
||||
}
|
||||
|
@ -1,160 +0,0 @@
|
||||
|
||||
#include "trick/MonteCarlo.hh"
|
||||
#include "trick/message_proto.h"
|
||||
#include "trick/message_type.h"
|
||||
#include "trick/tc_proto.h"
|
||||
|
||||
/**
|
||||
* @par Detailed Design:
|
||||
* This function performs a non-blocking accept on the listening socket.
|
||||
*/
|
||||
void Trick::MonteCarlo::receive_slave_results() {
|
||||
Trick::MonteSlave *curr_slave = NULL;
|
||||
int exit_status;
|
||||
unsigned int curr_slave_id;
|
||||
|
||||
/** <ul><li> While there are slaves waiting to connect: */
|
||||
tc_accept(&listen_device, &connection_device);
|
||||
|
||||
/** <ul><li> Read the id of incoming slave. */
|
||||
tc_read(&connection_device, (char*)&curr_slave_id, (int)sizeof(curr_slave_id));
|
||||
curr_slave_id = ntohl(curr_slave_id);
|
||||
curr_slave = get_slave(curr_slave_id);
|
||||
if (curr_slave == NULL) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] the slave returned an invalid slave id\n") ;
|
||||
}
|
||||
/**
|
||||
* <li> If the slave is in the INITIALIZING state, it will send us the
|
||||
* machine name and port over which it is listening for new runs.
|
||||
*/
|
||||
if (curr_slave->state == MonteSlave::INITIALIZING) {
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Receiving initialization information from %s:%d.\n",
|
||||
curr_slave->machine_name.c_str(), curr_slave->id) ;
|
||||
}
|
||||
read_machine_name(curr_slave) ;
|
||||
read_slave_port(curr_slave) ;
|
||||
/** <li> Slave state is not INITIALIZING */
|
||||
} else {
|
||||
if (verbosity >= INFORMATIONAL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Receiving results for run %d from %s:%d.\n",
|
||||
curr_slave->current_run->id, curr_slave->machine_name.c_str(), curr_slave->id) ;
|
||||
}
|
||||
/** <li> Otherwise, it is sending us the exit status for its current run. */
|
||||
tc_read(&connection_device, (char*)&exit_status, (int)sizeof(exit_status));
|
||||
exit_status = ntohl(exit_status);
|
||||
tc_disconnect(&connection_device);
|
||||
/**
|
||||
* <ul><li> This run may have already been resolved by another curr_slave if
|
||||
* this curr_slave was marked as having timed out. If that is the case,
|
||||
* discard these results.
|
||||
*/
|
||||
if (curr_slave->current_run->exit_status != MonteRun::INCOMPLETE) {
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [Master] Run %d has already been resolved. Discarding results.\n",
|
||||
curr_slave->current_run->id) ;
|
||||
}
|
||||
/** <li> Otherwise, check the exit status: */
|
||||
} else {
|
||||
switch (exit_status) {
|
||||
/** <ul><li> Unkown errors, core dumps, and
|
||||
* successfully completed runs are not redispatched.
|
||||
*/
|
||||
default:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] %s:%d reported unrecognized exit status %d\n"
|
||||
"for run %d. Skipping.\n",
|
||||
curr_slave->machine_name.c_str(), curr_slave->id, exit_status, curr_slave->current_run->id) ;
|
||||
}
|
||||
resolve_run(curr_slave, MonteRun::UNKNOWN);
|
||||
break;
|
||||
case MonteRun::BAD_INPUT:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] %s:%d reported bad input for run %d. Skipping.\n",
|
||||
curr_slave->machine_name.c_str(), curr_slave->id, curr_slave->current_run->id) ;
|
||||
}
|
||||
resolve_run(curr_slave, MonteRun::BAD_INPUT);
|
||||
break;
|
||||
case MonteRun::CORED:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] %s:%d reported core dump for run %d. Skipping.\n",
|
||||
curr_slave->machine_name.c_str(), curr_slave->id, curr_slave->current_run->id) ;
|
||||
}
|
||||
resolve_run(curr_slave, MonteRun::CORED);
|
||||
break;
|
||||
case MonteRun::COMPLETE:
|
||||
resolve_run(curr_slave, MonteRun::COMPLETE);
|
||||
break;
|
||||
/**
|
||||
* <li> Timeouts and permission errors are redispatched. However,
|
||||
* we must first check to see if this run has already been processed
|
||||
* in #check_timeouts, which can occur when the master determines
|
||||
* that a curr_slave has timed out, and then that curr_slave itself reports
|
||||
* a timeout or permission error. </ul>
|
||||
*/
|
||||
case MonteRun::TIMEDOUT:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] %s:%d reported a timeout for run %d.\n",
|
||||
curr_slave->machine_name.c_str(), curr_slave->id, curr_slave->current_run->id) ;
|
||||
}
|
||||
if (!(curr_slave->state == MonteSlave::UNRESPONSIVE_RUNNING ||
|
||||
curr_slave->state == MonteSlave::UNRESPONSIVE_STOPPING))
|
||||
{
|
||||
handle_retry(curr_slave, MonteRun::TIMEDOUT);
|
||||
}
|
||||
break;
|
||||
case MonteRun::NO_PERM:
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] %s:%d reported a failure to create output directories for run %d.\n",
|
||||
curr_slave->machine_name.c_str(), curr_slave->id, curr_slave->current_run->id) ;
|
||||
}
|
||||
if (!(curr_slave->state == MonteSlave::UNRESPONSIVE_RUNNING ||
|
||||
curr_slave->state == MonteSlave::UNRESPONSIVE_STOPPING))
|
||||
{
|
||||
handle_retry(curr_slave, MonteRun::NO_PERM);
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // end of checking curr_slaves exit_status
|
||||
/** <li> Update the curr_slave's state. */
|
||||
if (curr_slave->state == MonteSlave::RUNNING || curr_slave->state == MonteSlave::UNRESPONSIVE_RUNNING) {
|
||||
curr_slave->state = MonteSlave::READY;
|
||||
} else if (curr_slave->state == MonteSlave::STOPPING || curr_slave->state == MonteSlave::UNRESPONSIVE_STOPPING) {
|
||||
curr_slave->state = MonteSlave::STOPPED;
|
||||
}
|
||||
} // end of Slave state is not INITIALIZING
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::read_slave_port(Trick::MonteSlave *curr_slave) {
|
||||
int bytes_read ;
|
||||
bytes_read = tc_read(&connection_device, (char *)&curr_slave->port, (int)sizeof(curr_slave->port));
|
||||
if (bytes_read == (int)sizeof(curr_slave->port)) {
|
||||
curr_slave->port = ntohl(curr_slave->port);
|
||||
tc_disconnect(&connection_device);
|
||||
} else {
|
||||
set_disconnected_state(curr_slave) ;
|
||||
}
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::read_machine_name(Trick::MonteSlave *curr_slave) {
|
||||
int num_bytes;
|
||||
char slave_name[_POSIX_HOST_NAME_MAX];
|
||||
|
||||
tc_read(&connection_device, (char *)&num_bytes, (int)sizeof(num_bytes));
|
||||
num_bytes = ntohl(num_bytes) ;
|
||||
if (tc_read(&connection_device, slave_name, num_bytes) == num_bytes ) {
|
||||
curr_slave->state = MonteSlave::READY;
|
||||
slave_name[num_bytes] = '\0';
|
||||
curr_slave->machine_name = std::string(slave_name);
|
||||
} else {
|
||||
set_disconnected_state(curr_slave) ;
|
||||
}
|
||||
}
|
||||
|
||||
void Trick::MonteCarlo::set_disconnected_state(Trick::MonteSlave *curr_slave) {
|
||||
curr_slave->state = Trick::MonteSlave::DISCONNECTED;
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [Master] Lost connection to %s:%d during initialization.\n",
|
||||
curr_slave->machine_name.c_str(), curr_slave->id) ;
|
||||
}
|
||||
}
|
@ -5,18 +5,18 @@
|
||||
#include "trick/tc_proto.h"
|
||||
|
||||
/** @par Detailed Design: */
|
||||
int Trick::MonteCarlo::slave() {
|
||||
int Trick::MonteCarlo::execute_as_slave() {
|
||||
|
||||
/** <li> Forever: */
|
||||
while (true) {
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] : Waiting for new run.\n",
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] Waiting for new run.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
/** <ul><li> On a blocking read, wait for a MonteSlave::Command from the master. */
|
||||
if (tc_accept(&listen_device, &connection_device) != TC_SUCCESS) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Lost connection to Master.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Lost connection to Master.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
@ -24,7 +24,7 @@ int Trick::MonteCarlo::slave() {
|
||||
int command;
|
||||
if (tc_read(&connection_device, (char *)&command, (int)sizeof(command)) != (int)sizeof(command)) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Lost connection to Master while receiving instructions.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Lost connection to Master while receiving instructions.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
@ -44,7 +44,7 @@ int Trick::MonteCarlo::slave() {
|
||||
case MonteSlave::SHUTDOWN:
|
||||
/** <li> MonteSlave::SHUTDOWN: Call #slave_shutdown. */
|
||||
if (verbosity >= INFORMATIONAL) {
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] : Shutdown command received from Master.\nShutting down.\n",
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] Shutdown command received from Master.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
@ -52,7 +52,7 @@ int Trick::MonteCarlo::slave() {
|
||||
case MonteSlave::DIE:
|
||||
/** <li> MonteSlave::DIE: Call #slave_die. */
|
||||
if (verbosity >= INFORMATIONAL) {
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] : Killed by Master.\n",
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] Killed by Master.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_die();
|
||||
@ -60,7 +60,7 @@ int Trick::MonteCarlo::slave() {
|
||||
default:
|
||||
/** <li> default: Call #slave_shutdown. */
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Unrecognized command %d received from Master.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Unrecognized command %d received from Master.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id, command) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
|
@ -16,7 +16,7 @@ int Trick::MonteCarlo::slave_init() {
|
||||
if (access(run_directory.c_str(), F_OK) != 0) {
|
||||
if (mkdir(run_directory.c_str(), 0775) == -1) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Unable to create directory %s.\nTerminating.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Unable to create directory %s.\nTerminating.\n",
|
||||
run_directory.c_str(), machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
exit(-1);
|
||||
@ -29,29 +29,28 @@ int Trick::MonteCarlo::slave_init() {
|
||||
/** <li> Initialize the sockets. */
|
||||
tc_error(&listen_device, 0);
|
||||
tc_error(&connection_device, 0);
|
||||
tc_error(&data_listen_device, 0);
|
||||
tc_error(&data_connection_device, 0);
|
||||
socket_init(&listen_device);
|
||||
listen_device.disable_handshaking = TC_COMM_TRUE;
|
||||
|
||||
/** <li> Connect to the master and write the port over which we are listening for new runs. */
|
||||
connection_device.port = master_port;
|
||||
if (tc_connect(&connection_device) != TC_SUCCESS) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Failed to initialize communication sockets.\nTerminating.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Failed to initialize communication sockets.\nTerminating.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] : Making initial connection with Master.\n",
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] Making initial connection with Master.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
|
||||
int id = htonl(slave_id);
|
||||
tc_write(&connection_device, (char *)&id, (int)sizeof(id));
|
||||
|
||||
char hostname[_POSIX_HOST_NAME_MAX];
|
||||
char hostname[_POSIX_HOST_NAME_MAX] = {};
|
||||
gethostname(hostname, sizeof(hostname)-1);
|
||||
|
||||
int num_bytes = htonl(strlen(hostname));
|
||||
|
@ -17,7 +17,7 @@ int Trick::MonteCarlo::slave_process_run() {
|
||||
/** <ul><li> Read the length of the incoming message. */
|
||||
if (tc_read(&connection_device, (char *)&size, (int)sizeof(size)) != (int)sizeof(size) || (size = ntohl(size)) < 0) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Lost connection to Master while receiving new run.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Lost connection to Master while receiving new run.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
@ -26,7 +26,7 @@ int Trick::MonteCarlo::slave_process_run() {
|
||||
/** <li> Read the incoming message. */
|
||||
if (tc_read(&connection_device, input, size) != size) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Lost connection to Master while receiving new run.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Lost connection to Master while receiving new run.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
@ -41,7 +41,7 @@ int Trick::MonteCarlo::slave_process_run() {
|
||||
pid_t pid = fork();
|
||||
if (pid == -1) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Unable to fork new process for run.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Unable to fork new process for run.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
@ -53,39 +53,39 @@ int Trick::MonteCarlo::slave_process_run() {
|
||||
/* (Alex) On the Mac this check gives a lot of false positives. I've commented out the code for now. */
|
||||
/*
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Error while waiting for run to finish.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Error while waiting for run to finish.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
*/
|
||||
}
|
||||
/** <li> Extract the exit status of the child. */
|
||||
MonteRun::ExitStatus exit_status;
|
||||
|
||||
if (WIFEXITED(return_value)) {
|
||||
exit_status = (MonteRun::ExitStatus)WEXITSTATUS(return_value);
|
||||
if (exit_status == 0) {
|
||||
exit_status = MonteRun::COMPLETE;
|
||||
}
|
||||
} else {
|
||||
int signal = WTERMSIG(return_value);
|
||||
exit_status = signal == SIGALRM ? MonteRun::TIMEDOUT : MonteRun::CORED;
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Run killed by signal %d.\n",
|
||||
machine_name.c_str(), slave_id, signal) ;
|
||||
}
|
||||
// A successful sim sends its exit status to the master itself in
|
||||
// its shutdown job. Users can subvert this by calling exit, in
|
||||
// which case the master will eventually deem this run to have
|
||||
// timed out. But who would do that?!
|
||||
return 0;
|
||||
}
|
||||
|
||||
int signal = WTERMSIG(return_value);
|
||||
/** <li> Extract the exit status of the child. */
|
||||
MonteRun::ExitStatus exit_status = signal == SIGALRM ? MonteRun::TIMEDOUT : MonteRun::CORED;
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Run killed by signal %d: %s\n",
|
||||
machine_name.c_str(), slave_id, signal, strsignal(signal)) ;
|
||||
}
|
||||
connection_device.port = master_port;
|
||||
if (tc_connect(&connection_device) != TC_SUCCESS) {
|
||||
if (verbosity >= ERROR) {
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] : Lost connection to Master before results could be returned.\nShutting down.\n",
|
||||
message_publish(MSG_ERROR, "Monte [%s:%d] Lost connection to Master before results could be returned.\nShutting down.\n",
|
||||
machine_name.c_str(), slave_id) ;
|
||||
}
|
||||
slave_shutdown();
|
||||
}
|
||||
if (verbosity >= ALL) {
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] : Sending run exit status to master %d.\n",
|
||||
message_publish(MSG_INFO, "Monte [%s:%d] Sending run exit status to master %d.\n",
|
||||
machine_name.c_str(), slave_id, exit_status) ;
|
||||
|
||||
}
|
||||
/** <li> Write the slaves id to the master. </ul> */
|
||||
int id = htonl(slave_id);
|
||||
@ -97,7 +97,6 @@ int Trick::MonteCarlo::slave_process_run() {
|
||||
return 0;
|
||||
/** <li> Child process: */
|
||||
} else {
|
||||
|
||||
input[size] = '\0';
|
||||
if ( ip_parse(input) != 0 ) {
|
||||
exit(MonteRun::BAD_INPUT);
|
||||
|
@ -48,7 +48,6 @@ void Trick::MonteCarlo::initialize_slave(Trick::MonteSlave* slave_to_init) {
|
||||
<< " " << command_line_args_get_input_file()
|
||||
<< " --monte_host " << machine_name
|
||||
<< " --monte_sync_port " << listen_device.port
|
||||
<< " --monte_data_port " << data_listen_device.port
|
||||
<< " --monte_client_id " << slave_to_init->id
|
||||
<< " -O " << run_directory;
|
||||
buffer += ss.str();
|
||||
|
Loading…
x
Reference in New Issue
Block a user