Unintended performance issue with integ_loop at end of scheduled job classes #243

Created a new executive job that waits for threads to finish and readies them
for their next frame of execution.  Created a new job class system_thread_sync
after the top of frame jobs and before the input processor is run to sync the
threads.  Along the way cleaned up instrumentation jobs on the threads to fix #290.
This commit is contained in:
Alex Lin 2016-08-24 10:55:27 -05:00
parent f9f1847385
commit 0072e7d6f0
10 changed files with 109 additions and 59 deletions

View File

@ -178,6 +178,9 @@ namespace Trick {
/** Queue to hold end of frame jobs.\n */
Trick::ScheduledJobQueue end_of_frame_queue ; /**< trick_io(**) */
/** Queue to hold thread sync.\n */
Trick::ScheduledJobQueue thread_sync_queue ; /**< trick_io(**) */
/** Queue to hold shutdown jobs.\n */
Trick::ScheduledJobQueue shutdown_queue ; /**< trick_io(**) */
@ -915,6 +918,12 @@ namespace Trick {
*/
virtual int advance_sim_time() ;
/**
* Job to synchronize AMF and ASYNC threads to the master.
* @return always 0
*/
virtual int thread_sync() ;
/**
* @brief freeze_init job that initialized the freeze_scheduled loop
* @return always 0

View File

@ -160,6 +160,7 @@ class SysSimObject : public Trick::SimObject {
// required job to advance sim time
{TRK} ("system_advance_sim_time") sched.advance_sim_time() ;
{TRK} ("system_thread_sync") sched.thread_sync() ;
}
private:

View File

@ -2,18 +2,23 @@ import os
def main():
trick.real_time_enable()
trick.itimer_enable()
#trick.itimer_enable()
trick.exec_set_software_frame(0.10)
trick.stop(10)
trick.stop(4)
trick.exec_set_thread_process_type(1, trick.PROCESS_TYPE_AMF_CHILD)
trick.exec_set_thread_amf_cycle_time(1, 0.5)
thr_con = trick.exec_get_thread(1).trigger_container
thr_con.setThreadTrigger(trick.TT_MUTEX)
#thr_con = trick.exec_get_thread(1).trigger_container
#thr_con.setThreadTrigger(trick.TT_MUTEX)
trick.exec_set_thread_process_type(2, trick.PROCESS_TYPE_AMF_CHILD)
trick.exec_set_thread_amf_cycle_time(2, 0.1)
trick.echo_jobs_on()
trick.frame_log_on()
trick.add_read(2.0,"trick.echo_jobs_off()")
if __name__ == "__main__":
main()

View File

@ -8,10 +8,17 @@ class testSimObject : public Trick::SimObject {
return 0 ;
} ;
/* This job takes longer than 0.1 seconds to run */
int slow_print_time (int thread) {
message_publish(1, "thread %d: time = %8.2f\n", thread, exec_get_sim_time()) ;
usleep(100000) ;
return 0 ;
} ;
testSimObject() {
(10.0, "scheduled") print_time(0) ;
C1 (5.0, "scheduled") print_time(1) ;
C2 (1.0, "scheduled") print_time(2) ;
(1.0, "scheduled") print_time(0) ;
C1 (0.5, "scheduled") print_time(1) ;
C2 (0.1, "scheduled") slow_print_time(2) ;
}
} ;

View File

@ -97,6 +97,9 @@ Trick::Executive::Executive() {
class_map["input_processor_run"] = num_classes ;
class_to_queue[num_classes++] = &input_processor_run_queue ;
class_map["system_thread_sync"] = num_classes ;
class_to_queue[num_classes++] = &thread_sync_queue ;
class_map["top_of_frame"] = num_classes ;
class_to_queue[num_classes++] = &top_of_frame_queue ;
@ -124,6 +127,7 @@ Trick::Executive::Executive() {
class_map["exec_time_tic_changed"] = num_classes ;
class_to_queue[num_classes++] = &time_tic_changed_queue ;
// Initialize all of default signal handlers
init_signal_handlers() ;
}

View File

@ -67,19 +67,6 @@ int Trick::Executive::advance_sim_time() {
time_tics = terminate_time ;
}
/* Wait for async_must finish (previous pass) to complete at the current time_tics */
for (ii = 1; ii < threads.size() ; ii++) {
Threads * curr_thread = threads[ii] ;
if ( (curr_thread->process_type == PROCESS_TYPE_AMF_CHILD) &&
(curr_thread->amf_next_tics == time_tics )) {
while (curr_thread->child_complete == false ) {
if (rt_nap == true) {
RELEASE();
}
}
}
}
// set the default next job call time to the next software frame ;
input_processor_run_queue.set_next_job_call_time(time_tics + software_frame_tics) ;
threads[0]->job_queue.set_next_job_call_time(time_tics + software_frame_tics) ;

View File

@ -38,6 +38,7 @@ int Trick::Executive::instrument_job_before( Trick::JobData * instrument_job ) {
count += freeze_queue.instrument_before(instrument_job) ;
count += unfreeze_queue.instrument_before(instrument_job) ;
count += time_tic_changed_queue.instrument_before(instrument_job) ;
count += thread_sync_queue.instrument_before(instrument_job) ;
for ( ii = 0 ; ii < other_schedulers.size() ; ii++ ) {
count += other_schedulers[ii]->instrument_job_before( instrument_job ) ;
@ -84,6 +85,7 @@ int Trick::Executive::instrument_job_after( Trick::JobData * instrument_job) {
count += freeze_queue.instrument_after(instrument_job) ;
count += unfreeze_queue.instrument_after(instrument_job) ;
count += time_tic_changed_queue.instrument_after(instrument_job) ;
count += thread_sync_queue.instrument_after(instrument_job) ;
for ( ii = 0 ; ii < other_schedulers.size() ; ii++ ) {
count += other_schedulers[ii]->instrument_job_after( instrument_job ) ;
@ -112,9 +114,12 @@ int Trick::Executive::instrument_job_remove(std::string in_job) {
freeze_queue.instrument_remove(in_job) ;
unfreeze_queue.instrument_remove(in_job) ;
time_tic_changed_queue.instrument_remove(in_job) ;
thread_sync_queue.instrument_remove(in_job) ;
for ( ii = 0 ; ii < threads.size() ; ii++ ) {
threads[ii]->job_queue.instrument_remove(in_job) ;
threads[ii]->top_of_frame_queue.instrument_remove(in_job) ;
threads[ii]->end_of_frame_queue.instrument_remove(in_job) ;
}
for ( ii = 0 ; ii < other_schedulers.size() ; ii++ ) {

View File

@ -83,23 +83,6 @@ int Trick::Executive::loop_multi_thread() {
while (1) {
/* Give aynchronous jobs to the top of the next time start to finish executing previous frame */
for (ii = 0; ii < threads.size() ; ii++) {
Trick::Threads * curr_thread = threads[ii] ;
if ( curr_thread->process_type == PROCESS_TYPE_ASYNC_CHILD ) {
if ( curr_thread->child_complete == true ) {
if (curr_thread->amf_cycle_tics != 0 ) {
// catch up async next_tic time to a time greater than the time last pass
while ( curr_thread->amf_next_tics < time_tics ) {
curr_thread->amf_next_tics += curr_thread->amf_cycle_tics ;
}
}
}
}
}
/* Call freeze_loop() if commanded by freeze() or a <CTRL-C> signal was caught. */
if (exec_command == FreezeCmd) {
exec_command = NoCmd;
@ -118,7 +101,16 @@ int Trick::Executive::loop_multi_thread() {
frame_count++ ;
}
/* Call the input_processor_run queue jobs. */
/* Call thread sync jobs (wait for threads that are scheduled to finish by current time) */
thread_sync_queue.reset_curr_index() ;
while ( (curr_job = thread_sync_queue.get_next_job()) != NULL ) {
ret = curr_job->call() ;
if ( ret != 0 ) {
exec_terminate_with_return(ret , curr_job->name.c_str() , 0 , "thread_sync job did not return 0") ;
}
}
/* Call the input_processor_run queue jobs. Run between threads ending and restarting */
input_processor_run_queue.reset_curr_index() ;
while ( (curr_job = input_processor_run_queue.find_next_job( time_tics )) != NULL ) {
ret = curr_job->call() ;
@ -129,33 +121,14 @@ int Trick::Executive::loop_multi_thread() {
// will adjust the next call time for this queue
input_processor_run_queue.test_next_job_call_time(curr_job , time_tics) ;
/* System jobs next call time are not set until after they run.
/* System jobs next call time are not set until after they run.
Test their next job call time after they have been called */
if ( curr_job->system_job_class ) {
main_sched_queue->test_next_job_call_time(curr_job , time_tics) ;
}
}
/* Go through all of the job queues and mark all jobs that are to run this time step to not complete. */
for (ii = 0; ii < threads.size() ; ii++) {
Trick::Threads * curr_thread = threads[ii] ;
/* For all threads that are waiting to start the next cycle (child_complete == true)
reset job completion flags */
if ( isThreadReadyToRun(curr_thread, time_tics) ) {
/* For all jobs in all threads that will run for this time_tic, */
/* Set job complete flags to false. */
/* The job complete flags are used for job depends_on checks. */
curr_thread->job_queue.reset_curr_index();
while ( (curr_job = curr_thread->job_queue.find_job(time_tics)) != NULL ) {
curr_job->complete = false;
}
}
}
/* After all jobs on all threads that are going to run are set not complete, start the threads */
/* Start threads that are ready to run */
for (ii = 1; ii < threads.size() ; ii++) {
Trick::Threads * curr_thread = threads[ii] ;

View File

@ -0,0 +1,58 @@
#include <iostream>
#include "trick/Executive.hh"
#include "trick/release.h"
/**
@design
-# Loop through all threads
-# If the thread is asynchronous must finish and the next sync time matches the sim time
-# Wait for the thread to finish
-# Reset the thread queue of jobs
-# clear all job complete flags
-# If the thread is asynchronous and the thread is finished
-# If the thread has a cycle time advance the next sync time beyond the current time
by multiples of the cycle time.
-# Reset the thread queue of jobs
-# clear all job complete flags
*/
int Trick::Executive::thread_sync() {
unsigned int ii ;
/* Wait for async_must finish to complete at the current time_tics */
for (ii = 1; ii < threads.size() ; ii++) {
Threads * curr_thread = threads[ii] ;
if ( (curr_thread->process_type == PROCESS_TYPE_AMF_CHILD) &&
(curr_thread->amf_next_tics == time_tics )) {
while (curr_thread->child_complete == false ) {
if (rt_nap == true) {
RELEASE();
}
}
curr_thread->job_queue.reset_curr_index();
while ( (curr_job = curr_thread->job_queue.find_job(time_tics)) != NULL ) {
curr_job->complete = false;
}
}
else if ( curr_thread->process_type == PROCESS_TYPE_ASYNC_CHILD ) {
if ( curr_thread->child_complete == true ) {
if (curr_thread->amf_cycle_tics != 0 ) {
// catch up async next_tic time to a time greater than the time last pass
while ( curr_thread->amf_next_tics < time_tics ) {
curr_thread->amf_next_tics += curr_thread->amf_cycle_tics ;
}
}
curr_thread->job_queue.reset_curr_index();
while ( (curr_job = curr_thread->job_queue.find_job(time_tics)) != NULL ) {
curr_job->complete = false;
}
}
}
}
return(0) ;
}

View File

@ -107,6 +107,7 @@ void Trick::FrameLog::add_recording_vars_for_jobs() {
(! all_jobs_vector[ii]->job_class_name.compare("derivative")) ||
(! all_jobs_vector[ii]->job_class_name.compare("dynamic_event")) ||
(! all_jobs_vector[ii]->job_class_name.compare("post_integration")) ||
(! all_jobs_vector[ii]->job_class_name.compare("system_thread_sync")) ||
(! all_jobs_vector[ii]->job_class_name.compare("top_of_frame")) ||
(! all_jobs_vector[ii]->job_class_name.compare("end_of_frame")))
) {