diff --git a/include/trick/Executive.hh b/include/trick/Executive.hh index 08b4073c..a885301b 100644 --- a/include/trick/Executive.hh +++ b/include/trick/Executive.hh @@ -178,6 +178,9 @@ namespace Trick { /** Queue to hold end of frame jobs.\n */ Trick::ScheduledJobQueue end_of_frame_queue ; /**< trick_io(**) */ + /** Queue to hold thread sync.\n */ + Trick::ScheduledJobQueue thread_sync_queue ; /**< trick_io(**) */ + /** Queue to hold shutdown jobs.\n */ Trick::ScheduledJobQueue shutdown_queue ; /**< trick_io(**) */ @@ -915,6 +918,12 @@ namespace Trick { */ virtual int advance_sim_time() ; + /** + * Job to synchronize AMF and ASYNC threads to the master. + * @return always 0 + */ + virtual int thread_sync() ; + /** * @brief freeze_init job that initialized the freeze_scheduled loop * @return always 0 diff --git a/share/trick/sim_objects/default_trick_sys.sm b/share/trick/sim_objects/default_trick_sys.sm index 1d3dc7d5..04886d06 100644 --- a/share/trick/sim_objects/default_trick_sys.sm +++ b/share/trick/sim_objects/default_trick_sys.sm @@ -160,6 +160,7 @@ class SysSimObject : public Trick::SimObject { // required job to advance sim time {TRK} ("system_advance_sim_time") sched.advance_sim_time() ; + {TRK} ("system_thread_sync") sched.thread_sync() ; } private: diff --git a/test/SIM_threads_simple/RUN_test/input.py b/test/SIM_threads_simple/RUN_test/input.py index f2107290..1a619dd7 100644 --- a/test/SIM_threads_simple/RUN_test/input.py +++ b/test/SIM_threads_simple/RUN_test/input.py @@ -2,18 +2,23 @@ import os def main(): trick.real_time_enable() - trick.itimer_enable() + #trick.itimer_enable() trick.exec_set_software_frame(0.10) - trick.stop(10) + trick.stop(4) trick.exec_set_thread_process_type(1, trick.PROCESS_TYPE_AMF_CHILD) trick.exec_set_thread_amf_cycle_time(1, 0.5) - thr_con = trick.exec_get_thread(1).trigger_container - thr_con.setThreadTrigger(trick.TT_MUTEX) + #thr_con = trick.exec_get_thread(1).trigger_container + #thr_con.setThreadTrigger(trick.TT_MUTEX) trick.exec_set_thread_process_type(2, trick.PROCESS_TYPE_AMF_CHILD) trick.exec_set_thread_amf_cycle_time(2, 0.1) + trick.echo_jobs_on() + trick.frame_log_on() + + trick.add_read(2.0,"trick.echo_jobs_off()") + if __name__ == "__main__": main() diff --git a/test/SIM_threads_simple/S_define b/test/SIM_threads_simple/S_define index eff0033c..b3c2bd74 100644 --- a/test/SIM_threads_simple/S_define +++ b/test/SIM_threads_simple/S_define @@ -8,10 +8,17 @@ class testSimObject : public Trick::SimObject { return 0 ; } ; + /* This job takes longer than 0.1 seconds to run */ + int slow_print_time (int thread) { + message_publish(1, "thread %d: time = %8.2f\n", thread, exec_get_sim_time()) ; + usleep(100000) ; + return 0 ; + } ; + testSimObject() { - (10.0, "scheduled") print_time(0) ; - C1 (5.0, "scheduled") print_time(1) ; - C2 (1.0, "scheduled") print_time(2) ; + (1.0, "scheduled") print_time(0) ; + C1 (0.5, "scheduled") print_time(1) ; + C2 (0.1, "scheduled") slow_print_time(2) ; } } ; diff --git a/trick_source/sim_services/Executive/Executive.cpp b/trick_source/sim_services/Executive/Executive.cpp index a9ea32dc..28fce058 100644 --- a/trick_source/sim_services/Executive/Executive.cpp +++ b/trick_source/sim_services/Executive/Executive.cpp @@ -97,6 +97,9 @@ Trick::Executive::Executive() { class_map["input_processor_run"] = num_classes ; class_to_queue[num_classes++] = &input_processor_run_queue ; + class_map["system_thread_sync"] = num_classes ; + class_to_queue[num_classes++] = &thread_sync_queue ; + class_map["top_of_frame"] = num_classes ; class_to_queue[num_classes++] = &top_of_frame_queue ; @@ -124,6 +127,7 @@ Trick::Executive::Executive() { class_map["exec_time_tic_changed"] = num_classes ; class_to_queue[num_classes++] = &time_tic_changed_queue ; + // Initialize all of default signal handlers init_signal_handlers() ; } diff --git a/trick_source/sim_services/Executive/Executive_advance_sim_time.cpp b/trick_source/sim_services/Executive/Executive_advance_sim_time.cpp index cc0fbeeb..f1f1e176 100644 --- a/trick_source/sim_services/Executive/Executive_advance_sim_time.cpp +++ b/trick_source/sim_services/Executive/Executive_advance_sim_time.cpp @@ -67,19 +67,6 @@ int Trick::Executive::advance_sim_time() { time_tics = terminate_time ; } - /* Wait for async_must finish (previous pass) to complete at the current time_tics */ - for (ii = 1; ii < threads.size() ; ii++) { - Threads * curr_thread = threads[ii] ; - if ( (curr_thread->process_type == PROCESS_TYPE_AMF_CHILD) && - (curr_thread->amf_next_tics == time_tics )) { - while (curr_thread->child_complete == false ) { - if (rt_nap == true) { - RELEASE(); - } - } - } - } - // set the default next job call time to the next software frame ; input_processor_run_queue.set_next_job_call_time(time_tics + software_frame_tics) ; threads[0]->job_queue.set_next_job_call_time(time_tics + software_frame_tics) ; diff --git a/trick_source/sim_services/Executive/Executive_instrument_job.cpp b/trick_source/sim_services/Executive/Executive_instrument_job.cpp index 91f7d03c..aa7ef9e1 100644 --- a/trick_source/sim_services/Executive/Executive_instrument_job.cpp +++ b/trick_source/sim_services/Executive/Executive_instrument_job.cpp @@ -38,6 +38,7 @@ int Trick::Executive::instrument_job_before( Trick::JobData * instrument_job ) { count += freeze_queue.instrument_before(instrument_job) ; count += unfreeze_queue.instrument_before(instrument_job) ; count += time_tic_changed_queue.instrument_before(instrument_job) ; + count += thread_sync_queue.instrument_before(instrument_job) ; for ( ii = 0 ; ii < other_schedulers.size() ; ii++ ) { count += other_schedulers[ii]->instrument_job_before( instrument_job ) ; @@ -84,6 +85,7 @@ int Trick::Executive::instrument_job_after( Trick::JobData * instrument_job) { count += freeze_queue.instrument_after(instrument_job) ; count += unfreeze_queue.instrument_after(instrument_job) ; count += time_tic_changed_queue.instrument_after(instrument_job) ; + count += thread_sync_queue.instrument_after(instrument_job) ; for ( ii = 0 ; ii < other_schedulers.size() ; ii++ ) { count += other_schedulers[ii]->instrument_job_after( instrument_job ) ; @@ -112,9 +114,12 @@ int Trick::Executive::instrument_job_remove(std::string in_job) { freeze_queue.instrument_remove(in_job) ; unfreeze_queue.instrument_remove(in_job) ; time_tic_changed_queue.instrument_remove(in_job) ; + thread_sync_queue.instrument_remove(in_job) ; for ( ii = 0 ; ii < threads.size() ; ii++ ) { threads[ii]->job_queue.instrument_remove(in_job) ; + threads[ii]->top_of_frame_queue.instrument_remove(in_job) ; + threads[ii]->end_of_frame_queue.instrument_remove(in_job) ; } for ( ii = 0 ; ii < other_schedulers.size() ; ii++ ) { diff --git a/trick_source/sim_services/Executive/Executive_loop_multi_thread.cpp b/trick_source/sim_services/Executive/Executive_loop_multi_thread.cpp index 39ea626c..843d4267 100644 --- a/trick_source/sim_services/Executive/Executive_loop_multi_thread.cpp +++ b/trick_source/sim_services/Executive/Executive_loop_multi_thread.cpp @@ -83,23 +83,6 @@ int Trick::Executive::loop_multi_thread() { while (1) { - /* Give aynchronous jobs to the top of the next time start to finish executing previous frame */ - for (ii = 0; ii < threads.size() ; ii++) { - - Trick::Threads * curr_thread = threads[ii] ; - - if ( curr_thread->process_type == PROCESS_TYPE_ASYNC_CHILD ) { - if ( curr_thread->child_complete == true ) { - if (curr_thread->amf_cycle_tics != 0 ) { - // catch up async next_tic time to a time greater than the time last pass - while ( curr_thread->amf_next_tics < time_tics ) { - curr_thread->amf_next_tics += curr_thread->amf_cycle_tics ; - } - } - } - } - } - /* Call freeze_loop() if commanded by freeze() or a signal was caught. */ if (exec_command == FreezeCmd) { exec_command = NoCmd; @@ -118,7 +101,16 @@ int Trick::Executive::loop_multi_thread() { frame_count++ ; } - /* Call the input_processor_run queue jobs. */ + /* Call thread sync jobs (wait for threads that are scheduled to finish by current time) */ + thread_sync_queue.reset_curr_index() ; + while ( (curr_job = thread_sync_queue.get_next_job()) != NULL ) { + ret = curr_job->call() ; + if ( ret != 0 ) { + exec_terminate_with_return(ret , curr_job->name.c_str() , 0 , "thread_sync job did not return 0") ; + } + } + + /* Call the input_processor_run queue jobs. Run between threads ending and restarting */ input_processor_run_queue.reset_curr_index() ; while ( (curr_job = input_processor_run_queue.find_next_job( time_tics )) != NULL ) { ret = curr_job->call() ; @@ -129,33 +121,14 @@ int Trick::Executive::loop_multi_thread() { // will adjust the next call time for this queue input_processor_run_queue.test_next_job_call_time(curr_job , time_tics) ; - /* System jobs next call time are not set until after they run. + /* System jobs next call time are not set until after they run. Test their next job call time after they have been called */ if ( curr_job->system_job_class ) { main_sched_queue->test_next_job_call_time(curr_job , time_tics) ; } } - /* Go through all of the job queues and mark all jobs that are to run this time step to not complete. */ - for (ii = 0; ii < threads.size() ; ii++) { - - Trick::Threads * curr_thread = threads[ii] ; - - /* For all threads that are waiting to start the next cycle (child_complete == true) - reset job completion flags */ - if ( isThreadReadyToRun(curr_thread, time_tics) ) { - - /* For all jobs in all threads that will run for this time_tic, */ - /* Set job complete flags to false. */ - /* The job complete flags are used for job depends_on checks. */ - curr_thread->job_queue.reset_curr_index(); - while ( (curr_job = curr_thread->job_queue.find_job(time_tics)) != NULL ) { - curr_job->complete = false; - } - } - } - - /* After all jobs on all threads that are going to run are set not complete, start the threads */ + /* Start threads that are ready to run */ for (ii = 1; ii < threads.size() ; ii++) { Trick::Threads * curr_thread = threads[ii] ; diff --git a/trick_source/sim_services/Executive/Executive_thread_sync.cpp b/trick_source/sim_services/Executive/Executive_thread_sync.cpp new file mode 100644 index 00000000..ba122f64 --- /dev/null +++ b/trick_source/sim_services/Executive/Executive_thread_sync.cpp @@ -0,0 +1,58 @@ + +#include + +#include "trick/Executive.hh" +#include "trick/release.h" + +/** +@design +-# Loop through all threads + -# If the thread is asynchronous must finish and the next sync time matches the sim time + -# Wait for the thread to finish + -# Reset the thread queue of jobs + -# clear all job complete flags + -# If the thread is asynchronous and the thread is finished + -# If the thread has a cycle time advance the next sync time beyond the current time + by multiples of the cycle time. + -# Reset the thread queue of jobs + -# clear all job complete flags +*/ + +int Trick::Executive::thread_sync() { + + unsigned int ii ; + + /* Wait for async_must finish to complete at the current time_tics */ + for (ii = 1; ii < threads.size() ; ii++) { + Threads * curr_thread = threads[ii] ; + if ( (curr_thread->process_type == PROCESS_TYPE_AMF_CHILD) && + (curr_thread->amf_next_tics == time_tics )) { + while (curr_thread->child_complete == false ) { + if (rt_nap == true) { + RELEASE(); + } + } + curr_thread->job_queue.reset_curr_index(); + while ( (curr_job = curr_thread->job_queue.find_job(time_tics)) != NULL ) { + curr_job->complete = false; + } + } + else if ( curr_thread->process_type == PROCESS_TYPE_ASYNC_CHILD ) { + if ( curr_thread->child_complete == true ) { + if (curr_thread->amf_cycle_tics != 0 ) { + // catch up async next_tic time to a time greater than the time last pass + while ( curr_thread->amf_next_tics < time_tics ) { + curr_thread->amf_next_tics += curr_thread->amf_cycle_tics ; + } + } + curr_thread->job_queue.reset_curr_index(); + while ( (curr_job = curr_thread->job_queue.find_job(time_tics)) != NULL ) { + curr_job->complete = false; + } + } + } + } + + return(0) ; +} + diff --git a/trick_source/sim_services/FrameLog/FrameLog.cpp b/trick_source/sim_services/FrameLog/FrameLog.cpp index 20518b65..66b7128e 100644 --- a/trick_source/sim_services/FrameLog/FrameLog.cpp +++ b/trick_source/sim_services/FrameLog/FrameLog.cpp @@ -107,6 +107,7 @@ void Trick::FrameLog::add_recording_vars_for_jobs() { (! all_jobs_vector[ii]->job_class_name.compare("derivative")) || (! all_jobs_vector[ii]->job_class_name.compare("dynamic_event")) || (! all_jobs_vector[ii]->job_class_name.compare("post_integration")) || + (! all_jobs_vector[ii]->job_class_name.compare("system_thread_sync")) || (! all_jobs_vector[ii]->job_class_name.compare("top_of_frame")) || (! all_jobs_vector[ii]->job_class_name.compare("end_of_frame"))) ) {