Add additional thread synchronization mechanisms

Created a thread trigger container class to hold all of the various thread triggering
mechanisms.  I wanted to avoid allocating/freeing trigger mechanisms.  I added
an eventfd and futex trigger types.  Updated SIM_threads_simple test sim to use
show how to set the thread trigger type.

refs #148
This commit is contained in:
Alex Lin 2015-12-02 10:35:25 -06:00
parent bdc24072e5
commit 1afac18399
8 changed files with 264 additions and 47 deletions

View File

@ -0,0 +1,99 @@
/*
PURPOSE:
(Trick Thread trigger implementation)
*/
#ifndef THREADTRIGGER_HH
#define THREADTRIGGER_HH
#include <iostream>
#include <pthread.h>
namespace Trick {
enum ThreadTriggerType {
TT_MUTEX,
TT_FLAG,
TT_EVENTFD,
TT_FUTEX
} ;
class ThreadTriggerBase {
public:
ThreadTriggerType tt_type ;
ThreadTriggerBase(ThreadTriggerType in_tt_type) ;
virtual ~ThreadTriggerBase() ;
virtual void init() ;
virtual void fire() = 0 ;
virtual void wait() = 0 ;
virtual void dump( std::ostream & oss ) = 0 ;
} ;
class ThreadTriggerMutex : public Trick::ThreadTriggerBase {
public:
ThreadTriggerMutex() ;
virtual void init() ;
virtual void fire() ;
virtual void wait() ;
virtual void dump( std::ostream & oss ) ;
protected:
/** Condition variable to control start of thread processing */
pthread_cond_t go_cv; /**< trick_io(**) */
/** Mutex to control start of thread processing */
pthread_mutex_t go_mutex; /**< trick_io(**) */
} ;
class ThreadTriggerFlag : public Trick::ThreadTriggerBase {
public:
ThreadTriggerFlag() ;
virtual void fire() ;
virtual void wait() ;
virtual void dump( std::ostream & oss ) ;
protected:
/** Trigger from master to start thread processing */
volatile bool frame_trigger; /**< trick_io(**) */
} ;
class ThreadTriggerEventFD : public Trick::ThreadTriggerBase {
public:
ThreadTriggerEventFD() ;
virtual void fire() ;
virtual void wait() ;
virtual void dump( std::ostream & oss ) ;
protected:
/** condition variable to control start of thread using eventfd */
int efd; /**< trick_io(**) */
} ;
class ThreadTriggerFutex : public Trick::ThreadTriggerBase {
public:
ThreadTriggerFutex() ;
virtual void fire() ;
virtual void wait() ;
virtual void dump( std::ostream & oss ) ;
protected:
/** condition variable for futex */
int futex_addr; /**< trick_io(**) */
} ;
/* Container to hold all of the trigger types. This avoids having to
allocate each trigger type when changing from one type to another */
class ThreadTriggerContainer {
public:
ThreadTriggerContainer() ;
ThreadTriggerBase * getThreadTrigger() ;
void setThreadTrigger( ThreadTriggerType trigger_type ) ;
protected:
ThreadTriggerType trigger_type ;
ThreadTriggerMutex ttMutex ;
ThreadTriggerFlag ttFlag ;
ThreadTriggerEventFD ttEventFD ;
ThreadTriggerFutex ttFutex ;
ThreadTriggerBase * ttBase ;
} ;
}
#endif

View File

@ -16,6 +16,7 @@
#include <iostream>
#include "trick/ThreadBase.hh"
#include "trick/ThreadTrigger.hh"
#include "trick/SimObject.hh"
#include "trick/ScheduledJobQueue.hh"
@ -85,7 +86,8 @@ namespace Trick {
*/
int time_tic_changed(long long old_time_tic_value, long long time_tic_value) ;
/** * Sets the wait at shutdown for asynchronous threads flag
/**
* Sets the wait at shutdown for asynchronous threads flag
* @param yes_no - do we wait for the thread at shutdown or not?
* @return always 0
*/
@ -141,20 +143,11 @@ namespace Trick {
/** Thread has completed all jobs for this time step */
volatile bool child_complete; /**< trick_io(**) */
/** Trigger from master to start thread processing */
volatile int frame_trigger; /**< trick_units(--) */
/** True if the thread was started without errors. */
bool running; /**< trick_units(--) */
/** Use a mutex to signal the start of thread processing */
bool rt_semaphores; /**< trick_units(--) */
/** Condition variable to control start of thread processing */
pthread_cond_t go_cv; /**< trick_io(**) */
/** Mutex to control start of thread processing */
pthread_mutex_t go_mutex; /**< trick_io(**) */
/** Trigger to start frame. Paused on by the thread, fired by the master */
ThreadTriggerContainer trigger_container ; /**< trick_units(--) */
/** Wait for asynchronous jobs to finish at shutdown */
bool shutdown_wait_async; /**< trick_units(--) */

View File

@ -4,9 +4,12 @@ def main():
trick.real_time_enable()
trick.itimer_enable()
trick.exec_set_software_frame(0.10)
trick.stop(10)
trick.exec_set_thread_process_type(1, trick.PROCESS_TYPE_AMF_CHILD)
trick.exec_set_thread_amf_cycle_time(1, 0.5)
thr_con = trick.exec_get_thread(1).trigger_container
thr_con.setThreadTrigger(trick.TT_MUTEX)
trick.exec_set_thread_process_type(2, trick.PROCESS_TYPE_AMF_CHILD)
trick.exec_set_thread_amf_cycle_time(2, 0.1)

View File

@ -167,14 +167,8 @@ int Trick::Executive::loop_multi_thread() {
curr_thread->curr_time_tics = time_tics ;
curr_thread->child_complete = false ;
curr_thread->amf_next_tics += curr_thread->amf_cycle_tics ;
curr_thread->trigger_container.getThreadTrigger()->fire() ;
if (curr_thread->rt_semaphores == true ) {
pthread_mutex_lock(&(curr_thread->go_mutex));
pthread_cond_signal(&(curr_thread->go_cv));
pthread_mutex_unlock(&(curr_thread->go_mutex));
} else {
curr_thread->frame_trigger = true ;
}
}
}

View File

@ -1,19 +1,10 @@
#include "trick/Executive.hh"
#include "trick/message_proto.h"
int Trick::Executive::set_thread_rt_semaphores(unsigned int thread_id , bool yes_no) {
int ret = 0 ;
/** @par Detailed Design */
if ( (thread_id +1) > threads.size() ) {
/** @li If the thread_id does not exist, return an error */
ret = -2 ;
} else {
threads[thread_id]->rt_semaphores = yes_no ;
}
return(ret) ;
message_publish(MSG_WARNING, "set_thread_rt_semaphores has been deprecated\n"
"Use exec_get_thread(thread_id).set_trigger_type(Trick::ThreadTriggerType tt_type)") ;
return -1 ;
}

View File

@ -0,0 +1,148 @@
#include "trick/ThreadTrigger.hh"
#include "trick/message_proto.h"
/* ThreadTriggerBase empty routines */
Trick::ThreadTriggerBase::ThreadTriggerBase(ThreadTriggerType in_tt_type) : tt_type(in_tt_type) {}
Trick::ThreadTriggerBase::~ThreadTriggerBase() {}
void Trick::ThreadTriggerBase::init() {}
/* ThreadTriggerMutex */
Trick::ThreadTriggerMutex::ThreadTriggerMutex() : ThreadTriggerBase(TT_MUTEX) {
pthread_cond_init(&go_cv, NULL);
pthread_mutex_init(&go_mutex, NULL);
}
void Trick::ThreadTriggerMutex::init() {
pthread_mutex_lock(&go_mutex);
}
void Trick::ThreadTriggerMutex::fire() {
pthread_mutex_lock(&go_mutex);
pthread_cond_signal(&go_cv);
pthread_mutex_unlock(&go_mutex);
}
void Trick::ThreadTriggerMutex::wait() {
pthread_cond_wait(&go_cv, &go_mutex);
}
void Trick::ThreadTriggerMutex::dump(std::ostream & oss) {
oss << " trigger type = mutex" << std::endl ;
}
/* ThreadTriggerFlag */
Trick::ThreadTriggerFlag::ThreadTriggerFlag() : ThreadTriggerBase(TT_FLAG) , frame_trigger(false) {}
void Trick::ThreadTriggerFlag::fire() {
frame_trigger = true ;
}
void Trick::ThreadTriggerFlag::wait() {
while (frame_trigger == false) ;
frame_trigger = false ;
}
void Trick::ThreadTriggerFlag::dump(std::ostream & oss) {
oss << " trigger type = flag" << std::endl ;
}
#if __linux
#include <sys/eventfd.h>
#include <sys/types.h>
#include <unistd.h>
/* ThreadTriggerEventFD */
Trick::ThreadTriggerEventFD::ThreadTriggerEventFD() : ThreadTriggerBase(TT_EVENTFD) {
efd = eventfd(0,0);
}
void Trick::ThreadTriggerEventFD::fire() {
uint64_t value = 1;
write(efd, &value, sizeof(uint64_t));
}
void Trick::ThreadTriggerEventFD::wait() {
uint64_t value ;
read(efd, &value, sizeof(uint64_t));
}
void Trick::ThreadTriggerEventFD::dump(std::ostream & oss) {
oss << " trigger type = eventfd" << std::endl ;
}
#include <linux/futex.h>
#include <syscall.h>
/* ThreadTriggerFutex */
Trick::ThreadTriggerFutex::ThreadTriggerFutex() : ThreadTriggerBase(TT_FUTEX), futex_addr(0) {}
void Trick::ThreadTriggerFutex::fire() {
syscall(SYS_futex, &futex_addr, FUTEX_WAKE, 1, NULL, NULL, 0);
}
void Trick::ThreadTriggerFutex::wait() {
futex_addr = 0;
syscall(SYS_futex, &futex_addr, FUTEX_WAIT, 0, NULL, NULL, 0);
}
void Trick::ThreadTriggerFutex::dump(std::ostream & oss) {
oss << " trigger type = futex" << std::endl ;
}
#else
/* Empty implementations for OSes other than Linux */
Trick::ThreadTriggerEventFD::ThreadTriggerEventFD() : ThreadTriggerBase(TT_EVENTFD) , efd(0) {}
void Trick::ThreadTriggerEventFD::fire() {}
void Trick::ThreadTriggerEventFD::wait() {}
void Trick::ThreadTriggerEventFD::dump(std::ostream & oss) {
oss << " trigger type = eventfd (non-functional). How did you get here?" << std::endl ;
}
Trick::ThreadTriggerFutex::ThreadTriggerFutex() : ThreadTriggerBase(TT_FUTEX) , futex_addr(0) {}
void Trick::ThreadTriggerFutex::fire() {}
void Trick::ThreadTriggerFutex::wait() {}
void Trick::ThreadTriggerFutex::dump(std::ostream & oss) {
oss << " trigger type = futex (non-functional). How did you get here?" << std::endl ;
}
#endif
/* ThreadTriggerContainer */
Trick::ThreadTriggerContainer::ThreadTriggerContainer() : ttBase(&ttMutex) {}
Trick::ThreadTriggerBase * Trick::ThreadTriggerContainer::getThreadTrigger() {
return ttBase ;
}
/* Routine to change trigger type */
void Trick::ThreadTriggerContainer::setThreadTrigger( ThreadTriggerType in_trigger_type ) {
trigger_type = in_trigger_type ;
switch ( trigger_type ) {
case TT_MUTEX :
default:
ttBase = &ttMutex ;
break ;
case TT_FLAG :
ttBase = &ttFlag ;
break ;
case TT_EVENTFD :
#if __linux
ttBase = &ttEventFD ;
#else
message_publish(MSG_ERROR, "EventFD thread trigger type not available, using mutex\n") ;
ttBase = &ttMutex ;
#endif
break ;
case TT_FUTEX :
#if __linux
ttBase = &ttFutex ;
#else
message_publish(MSG_ERROR, "Futex thread trigger type not available, using mutex\n") ;
ttBase = &ttMutex ;
#endif
break ;
}
}

View File

@ -4,7 +4,6 @@
#include <stdio.h>
#include "trick/Threads.hh"
#include "trick/message_proto.h"
Trick::Threads::Threads(int in_id , bool in_rt_nap) :
enabled(true) ,
@ -17,13 +16,8 @@ Trick::Threads::Threads(int in_id , bool in_rt_nap) :
rt_nap(in_rt_nap) ,
process_type(PROCESS_TYPE_SCHEDULED) ,
child_complete(false) ,
frame_trigger(0) ,
running(false) ,
rt_semaphores(true) ,
shutdown_wait_async(false) {
pthread_cond_init(&go_cv, NULL);
pthread_mutex_init(&go_mutex, NULL);
std::stringstream oss ;
oss << "Child_" << in_id ;
name = oss.str() ;
@ -51,6 +45,7 @@ void Trick::Threads::dump( std::ostream & oss ) {
case PROCESS_TYPE_ASYNC_CHILD: oss << "asynchronous" << std::endl ; break ;
case PROCESS_TYPE_AMF_CHILD: oss << "asynchronous must finish with amf_cycle = " << amf_cycle << std::endl ; break ;
}
trigger_container.getThreadTrigger()->dump(oss) ;
oss << " number of scheduled jobs = " << job_queue.size() << std::endl ;
Trick::ThreadBase::dump(oss) ;
}

View File

@ -95,7 +95,7 @@ static int call_next_job(Trick::JobData * curr_job, Trick::ScheduledJobQueue & j
void * Trick::Threads::thread_body() {
/* Lock the go mutex so the master has to wait until this child is ready before staring execution. */
pthread_mutex_lock(&go_mutex);
trigger_container.getThreadTrigger()->init() ;
/* signal the master that the child is ready and running */
child_complete = true;
@ -104,14 +104,8 @@ void * Trick::Threads::thread_body() {
try {
do {
/* Block child on go mutex or frame trigger until master signals. */
if (rt_semaphores == true) {
pthread_cond_wait(&go_cv, &go_mutex);
} else {
/* Else the child process frame is being started when a shared memory flag... */
while (frame_trigger == false) {} ;
frame_trigger = false ;
}
/* Block child on trigger until master signals. */
trigger_container.getThreadTrigger()->wait() ;
if ( enabled ) {