From 03d89ef56a0939b46dca22a65464eb2a8bfe3685 Mon Sep 17 00:00:00 2001 From: Alex Lin Date: Wed, 2 Dec 2015 10:35:25 -0600 Subject: [PATCH] Add additional thread synchronization mechanisms Created a thread trigger container class to hold all of the various thread triggering mechanisms. I wanted to avoid allocating/freeing trigger mechanisms. I added an eventfd and futex trigger types. Updated SIM_threads_simple test sim to use show how to set the thread trigger type. refs #148 Conflicts: trick_source/sim_services/Executive/include/Threads.hh trick_source/sim_services/Executive/src/Executive_set_thread_rt_semaphore.cpp trick_source/sim_services/Executive/src/Threads.cpp --- .../SIM_threads_simple/RUN_test/input.py | 3 + .../sim_services/Executive/ThreadTrigger.cpp | 148 ++++++++++++++++++ .../Executive/include/ThreadTrigger.hh | 99 ++++++++++++ .../sim_services/Executive/include/Threads.hh | 17 +- .../src/Executive_loop_multi_thread.cpp | 8 +- .../src/Executive_set_thread_rt_semaphore.cpp | 17 +- .../sim_services/Executive/src/Threads.cpp | 6 +- .../Executive/src/Threads_child.cpp | 12 +- 8 files changed, 264 insertions(+), 46 deletions(-) create mode 100644 trick_source/sim_services/Executive/ThreadTrigger.cpp create mode 100644 trick_source/sim_services/Executive/include/ThreadTrigger.hh diff --git a/trick_sims/SIM_threads_simple/RUN_test/input.py b/trick_sims/SIM_threads_simple/RUN_test/input.py index d71b701a..f2107290 100644 --- a/trick_sims/SIM_threads_simple/RUN_test/input.py +++ b/trick_sims/SIM_threads_simple/RUN_test/input.py @@ -4,9 +4,12 @@ def main(): trick.real_time_enable() trick.itimer_enable() trick.exec_set_software_frame(0.10) + trick.stop(10) trick.exec_set_thread_process_type(1, trick.PROCESS_TYPE_AMF_CHILD) trick.exec_set_thread_amf_cycle_time(1, 0.5) + thr_con = trick.exec_get_thread(1).trigger_container + thr_con.setThreadTrigger(trick.TT_MUTEX) trick.exec_set_thread_process_type(2, trick.PROCESS_TYPE_AMF_CHILD) trick.exec_set_thread_amf_cycle_time(2, 0.1) diff --git a/trick_source/sim_services/Executive/ThreadTrigger.cpp b/trick_source/sim_services/Executive/ThreadTrigger.cpp new file mode 100644 index 00000000..0ca7fcd3 --- /dev/null +++ b/trick_source/sim_services/Executive/ThreadTrigger.cpp @@ -0,0 +1,148 @@ + +#include "trick/ThreadTrigger.hh" +#include "trick/message_proto.h" + +/* ThreadTriggerBase empty routines */ +Trick::ThreadTriggerBase::ThreadTriggerBase(ThreadTriggerType in_tt_type) : tt_type(in_tt_type) {} +Trick::ThreadTriggerBase::~ThreadTriggerBase() {} +void Trick::ThreadTriggerBase::init() {} + +/* ThreadTriggerMutex */ +Trick::ThreadTriggerMutex::ThreadTriggerMutex() : ThreadTriggerBase(TT_MUTEX) { + pthread_cond_init(&go_cv, NULL); + pthread_mutex_init(&go_mutex, NULL); +} + +void Trick::ThreadTriggerMutex::init() { + pthread_mutex_lock(&go_mutex); +} + +void Trick::ThreadTriggerMutex::fire() { + pthread_mutex_lock(&go_mutex); + pthread_cond_signal(&go_cv); + pthread_mutex_unlock(&go_mutex); +} + +void Trick::ThreadTriggerMutex::wait() { + pthread_cond_wait(&go_cv, &go_mutex); +} + +void Trick::ThreadTriggerMutex::dump(std::ostream & oss) { + oss << " trigger type = mutex" << std::endl ; +} + +/* ThreadTriggerFlag */ +Trick::ThreadTriggerFlag::ThreadTriggerFlag() : ThreadTriggerBase(TT_FLAG) , frame_trigger(false) {} + +void Trick::ThreadTriggerFlag::fire() { + frame_trigger = true ; +} + +void Trick::ThreadTriggerFlag::wait() { + while (frame_trigger == false) ; + frame_trigger = false ; +} + +void Trick::ThreadTriggerFlag::dump(std::ostream & oss) { + oss << " trigger type = flag" << std::endl ; +} + +#if __linux +#include +#include +#include + +/* ThreadTriggerEventFD */ +Trick::ThreadTriggerEventFD::ThreadTriggerEventFD() : ThreadTriggerBase(TT_EVENTFD) { + efd = eventfd(0,0); +} + +void Trick::ThreadTriggerEventFD::fire() { + uint64_t value = 1; + write(efd, &value, sizeof(uint64_t)); +} + +void Trick::ThreadTriggerEventFD::wait() { + uint64_t value ; + read(efd, &value, sizeof(uint64_t)); +} + +void Trick::ThreadTriggerEventFD::dump(std::ostream & oss) { + oss << " trigger type = eventfd" << std::endl ; +} + +#include +#include + +/* ThreadTriggerFutex */ +Trick::ThreadTriggerFutex::ThreadTriggerFutex() : ThreadTriggerBase(TT_FUTEX), futex_addr(0) {} + +void Trick::ThreadTriggerFutex::fire() { + syscall(SYS_futex, &futex_addr, FUTEX_WAKE, 1, NULL, NULL, 0); +} + +void Trick::ThreadTriggerFutex::wait() { + futex_addr = 0; + syscall(SYS_futex, &futex_addr, FUTEX_WAIT, 0, NULL, NULL, 0); +} + +void Trick::ThreadTriggerFutex::dump(std::ostream & oss) { + oss << " trigger type = futex" << std::endl ; +} +#else + +/* Empty implementations for OSes other than Linux */ + +Trick::ThreadTriggerEventFD::ThreadTriggerEventFD() : ThreadTriggerBase(TT_EVENTFD) , efd(0) {} +void Trick::ThreadTriggerEventFD::fire() {} +void Trick::ThreadTriggerEventFD::wait() {} +void Trick::ThreadTriggerEventFD::dump(std::ostream & oss) { + oss << " trigger type = eventfd (non-functional). How did you get here?" << std::endl ; +} + +Trick::ThreadTriggerFutex::ThreadTriggerFutex() : ThreadTriggerBase(TT_FUTEX) , futex_addr(0) {} +void Trick::ThreadTriggerFutex::fire() {} +void Trick::ThreadTriggerFutex::wait() {} +void Trick::ThreadTriggerFutex::dump(std::ostream & oss) { + oss << " trigger type = futex (non-functional). How did you get here?" << std::endl ; +} + +#endif + +/* ThreadTriggerContainer */ +Trick::ThreadTriggerContainer::ThreadTriggerContainer() : ttBase(&ttMutex) {} + +Trick::ThreadTriggerBase * Trick::ThreadTriggerContainer::getThreadTrigger() { + return ttBase ; +} + +/* Routine to change trigger type */ +void Trick::ThreadTriggerContainer::setThreadTrigger( ThreadTriggerType in_trigger_type ) { + trigger_type = in_trigger_type ; + switch ( trigger_type ) { + case TT_MUTEX : + default: + ttBase = &ttMutex ; + break ; + case TT_FLAG : + ttBase = &ttFlag ; + break ; + case TT_EVENTFD : +#if __linux + ttBase = &ttEventFD ; +#else + message_publish(MSG_ERROR, "EventFD thread trigger type not available, using mutex\n") ; + ttBase = &ttMutex ; +#endif + break ; + case TT_FUTEX : +#if __linux + ttBase = &ttFutex ; +#else + message_publish(MSG_ERROR, "Futex thread trigger type not available, using mutex\n") ; + ttBase = &ttMutex ; +#endif + break ; + } +} + diff --git a/trick_source/sim_services/Executive/include/ThreadTrigger.hh b/trick_source/sim_services/Executive/include/ThreadTrigger.hh new file mode 100644 index 00000000..6ce77639 --- /dev/null +++ b/trick_source/sim_services/Executive/include/ThreadTrigger.hh @@ -0,0 +1,99 @@ +/* + PURPOSE: + (Trick Thread trigger implementation) +*/ + +#ifndef THREADTRIGGER_HH +#define THREADTRIGGER_HH + +#include +#include + +namespace Trick { + + enum ThreadTriggerType { + TT_MUTEX, + TT_FLAG, + TT_EVENTFD, + TT_FUTEX + } ; + + class ThreadTriggerBase { + public: + ThreadTriggerType tt_type ; + ThreadTriggerBase(ThreadTriggerType in_tt_type) ; + virtual ~ThreadTriggerBase() ; + virtual void init() ; + virtual void fire() = 0 ; + virtual void wait() = 0 ; + virtual void dump( std::ostream & oss ) = 0 ; + } ; + + class ThreadTriggerMutex : public Trick::ThreadTriggerBase { + public: + ThreadTriggerMutex() ; + virtual void init() ; + virtual void fire() ; + virtual void wait() ; + virtual void dump( std::ostream & oss ) ; + protected: + /** Condition variable to control start of thread processing */ + pthread_cond_t go_cv; /**< trick_io(**) */ + /** Mutex to control start of thread processing */ + pthread_mutex_t go_mutex; /**< trick_io(**) */ + } ; + + class ThreadTriggerFlag : public Trick::ThreadTriggerBase { + public: + ThreadTriggerFlag() ; + virtual void fire() ; + virtual void wait() ; + virtual void dump( std::ostream & oss ) ; + protected: + /** Trigger from master to start thread processing */ + volatile bool frame_trigger; /**< trick_io(**) */ + } ; + + class ThreadTriggerEventFD : public Trick::ThreadTriggerBase { + public: + ThreadTriggerEventFD() ; + virtual void fire() ; + virtual void wait() ; + virtual void dump( std::ostream & oss ) ; + protected: + /** condition variable to control start of thread using eventfd */ + int efd; /**< trick_io(**) */ + } ; + + class ThreadTriggerFutex : public Trick::ThreadTriggerBase { + public: + ThreadTriggerFutex() ; + virtual void fire() ; + virtual void wait() ; + virtual void dump( std::ostream & oss ) ; + protected: + /** condition variable for futex */ + int futex_addr; /**< trick_io(**) */ + } ; + + /* Container to hold all of the trigger types. This avoids having to + allocate each trigger type when changing from one type to another */ + class ThreadTriggerContainer { + + public: + ThreadTriggerContainer() ; + ThreadTriggerBase * getThreadTrigger() ; + void setThreadTrigger( ThreadTriggerType trigger_type ) ; + + protected: + ThreadTriggerType trigger_type ; + ThreadTriggerMutex ttMutex ; + ThreadTriggerFlag ttFlag ; + ThreadTriggerEventFD ttEventFD ; + ThreadTriggerFutex ttFutex ; + + ThreadTriggerBase * ttBase ; + + } ; +} +#endif diff --git a/trick_source/sim_services/Executive/include/Threads.hh b/trick_source/sim_services/Executive/include/Threads.hh index 9c7f7690..cca53793 100644 --- a/trick_source/sim_services/Executive/include/Threads.hh +++ b/trick_source/sim_services/Executive/include/Threads.hh @@ -15,6 +15,7 @@ #include #include +#include "sim_services/Executive/include/ThreadTrigger.hh" #include "sim_services/ThreadBase/include/ThreadBase.hh" #include "sim_services/SimObject/include/SimObject.hh" #include "sim_services/ScheduledJobQueue/include/ScheduledJobQueue.hh" @@ -85,7 +86,8 @@ namespace Trick { */ int time_tic_changed(long long old_time_tic_value, long long time_tic_value) ; - /** * Sets the wait at shutdown for asynchronous threads flag + /** + * Sets the wait at shutdown for asynchronous threads flag * @param yes_no - do we wait for the thread at shutdown or not? * @return always 0 */ @@ -141,20 +143,11 @@ namespace Trick { /** Thread has completed all jobs for this time step */ volatile bool child_complete; /**< trick_io(**) */ - /** Trigger from master to start thread processing */ - volatile int frame_trigger; /**< trick_units(--) */ - /** True if the thread was started without errors. */ bool running; /**< trick_units(--) */ - /** Use a mutex to signal the start of thread processing */ - bool rt_semaphores; /**< trick_units(--) */ - - /** Condition variable to control start of thread processing */ - pthread_cond_t go_cv; /**< trick_io(**) */ - - /** Mutex to control start of thread processing */ - pthread_mutex_t go_mutex; /**< trick_io(**) */ + /** Trigger to start frame. Paused on by the thread, fired by the master */ + ThreadTriggerContainer trigger_container ; /**< trick_units(--) */ /** Wait for asynchronous jobs to finish at shutdown */ bool shutdown_wait_async; /**< trick_units(--) */ diff --git a/trick_source/sim_services/Executive/src/Executive_loop_multi_thread.cpp b/trick_source/sim_services/Executive/src/Executive_loop_multi_thread.cpp index 808bf6e6..085f35e9 100644 --- a/trick_source/sim_services/Executive/src/Executive_loop_multi_thread.cpp +++ b/trick_source/sim_services/Executive/src/Executive_loop_multi_thread.cpp @@ -168,14 +168,8 @@ int Trick::Executive::loop_multi_thread() { curr_thread->curr_time_tics = time_tics ; curr_thread->child_complete = false ; curr_thread->amf_next_tics += curr_thread->amf_cycle_tics ; + curr_thread->trigger_container.getThreadTrigger()->fire() ; - if (curr_thread->rt_semaphores == true ) { - pthread_mutex_lock(&(curr_thread->go_mutex)); - pthread_cond_signal(&(curr_thread->go_cv)); - pthread_mutex_unlock(&(curr_thread->go_mutex)); - } else { - curr_thread->frame_trigger = true ; - } } } diff --git a/trick_source/sim_services/Executive/src/Executive_set_thread_rt_semaphore.cpp b/trick_source/sim_services/Executive/src/Executive_set_thread_rt_semaphore.cpp index 5d854782..bdb9e68b 100644 --- a/trick_source/sim_services/Executive/src/Executive_set_thread_rt_semaphore.cpp +++ b/trick_source/sim_services/Executive/src/Executive_set_thread_rt_semaphore.cpp @@ -1,19 +1,10 @@ #include "sim_services/Executive/include/Executive.hh" +#include "sim_services/Message/include/message_proto.h" int Trick::Executive::set_thread_rt_semaphores(unsigned int thread_id , bool yes_no) { - - int ret = 0 ; - - /** @par Detailed Design */ - if ( (thread_id +1) > threads.size() ) { - /** @li If the thread_id does not exist, return an error */ - ret = -2 ; - } else { - threads[thread_id]->rt_semaphores = yes_no ; - } - - return(ret) ; - + message_publish(MSG_WARNING, "set_thread_rt_semaphores has been deprecated\n" + "Use exec_get_thread(thread_id).set_trigger_type(Trick::ThreadTriggerType tt_type)") ; + return -1 ; } diff --git a/trick_source/sim_services/Executive/src/Threads.cpp b/trick_source/sim_services/Executive/src/Threads.cpp index b9da21b5..71bfb31f 100644 --- a/trick_source/sim_services/Executive/src/Threads.cpp +++ b/trick_source/sim_services/Executive/src/Threads.cpp @@ -17,13 +17,8 @@ Trick::Threads::Threads(int in_id , bool in_rt_nap) : rt_nap(in_rt_nap) , process_type(PROCESS_TYPE_SCHEDULED) , child_complete(false) , - frame_trigger(0) , running(false) , - rt_semaphores(true) , shutdown_wait_async(false) { - pthread_cond_init(&go_cv, NULL); - pthread_mutex_init(&go_mutex, NULL); - std::stringstream oss ; oss << "Child_" << in_id ; name = oss.str() ; @@ -51,6 +46,7 @@ void Trick::Threads::dump( std::ostream & oss ) { case PROCESS_TYPE_ASYNC_CHILD: oss << "asynchronous" << std::endl ; break ; case PROCESS_TYPE_AMF_CHILD: oss << "asynchronous must finish with amf_cycle = " << amf_cycle << std::endl ; break ; } + trigger_container.getThreadTrigger()->dump(oss) ; oss << " number of scheduled jobs = " << job_queue.size() << std::endl ; Trick::ThreadBase::dump(oss) ; } diff --git a/trick_source/sim_services/Executive/src/Threads_child.cpp b/trick_source/sim_services/Executive/src/Threads_child.cpp index 50b737d2..548e021d 100644 --- a/trick_source/sim_services/Executive/src/Threads_child.cpp +++ b/trick_source/sim_services/Executive/src/Threads_child.cpp @@ -95,7 +95,7 @@ static int call_next_job(Trick::JobData * curr_job, Trick::ScheduledJobQueue & j void * Trick::Threads::thread_body() { /* Lock the go mutex so the master has to wait until this child is ready before staring execution. */ - pthread_mutex_lock(&go_mutex); + trigger_container.getThreadTrigger()->init() ; /* signal the master that the child is ready and running */ child_complete = true; @@ -104,14 +104,8 @@ void * Trick::Threads::thread_body() { try { do { - /* Block child on go mutex or frame trigger until master signals. */ - if (rt_semaphores == true) { - pthread_cond_wait(&go_cv, &go_mutex); - } else { - /* Else the child process frame is being started when a shared memory flag... */ - while (frame_trigger == false) {} ; - frame_trigger = false ; - } + /* Block child on trigger until master signals. */ + trigger_container.getThreadTrigger()->wait() ; if ( enabled ) {