mirror of
https://github.com/nasa/trick.git
synced 2024-12-29 17:38:54 +00:00
Add additional thread synchronization mechanisms
Created a thread trigger container class to hold all of the various thread triggering mechanisms. I wanted to avoid allocating/freeing trigger mechanisms. I added an eventfd and futex trigger types. Updated SIM_threads_simple test sim to use show how to set the thread trigger type. refs #148 Conflicts: trick_source/sim_services/Executive/include/Threads.hh trick_source/sim_services/Executive/src/Executive_set_thread_rt_semaphore.cpp trick_source/sim_services/Executive/src/Threads.cpp
This commit is contained in:
parent
5011fdd1eb
commit
03d89ef56a
@ -4,9 +4,12 @@ def main():
|
||||
trick.real_time_enable()
|
||||
trick.itimer_enable()
|
||||
trick.exec_set_software_frame(0.10)
|
||||
trick.stop(10)
|
||||
|
||||
trick.exec_set_thread_process_type(1, trick.PROCESS_TYPE_AMF_CHILD)
|
||||
trick.exec_set_thread_amf_cycle_time(1, 0.5)
|
||||
thr_con = trick.exec_get_thread(1).trigger_container
|
||||
thr_con.setThreadTrigger(trick.TT_MUTEX)
|
||||
|
||||
trick.exec_set_thread_process_type(2, trick.PROCESS_TYPE_AMF_CHILD)
|
||||
trick.exec_set_thread_amf_cycle_time(2, 0.1)
|
||||
|
148
trick_source/sim_services/Executive/ThreadTrigger.cpp
Normal file
148
trick_source/sim_services/Executive/ThreadTrigger.cpp
Normal file
@ -0,0 +1,148 @@
|
||||
|
||||
#include "trick/ThreadTrigger.hh"
|
||||
#include "trick/message_proto.h"
|
||||
|
||||
/* ThreadTriggerBase empty routines */
|
||||
Trick::ThreadTriggerBase::ThreadTriggerBase(ThreadTriggerType in_tt_type) : tt_type(in_tt_type) {}
|
||||
Trick::ThreadTriggerBase::~ThreadTriggerBase() {}
|
||||
void Trick::ThreadTriggerBase::init() {}
|
||||
|
||||
/* ThreadTriggerMutex */
|
||||
Trick::ThreadTriggerMutex::ThreadTriggerMutex() : ThreadTriggerBase(TT_MUTEX) {
|
||||
pthread_cond_init(&go_cv, NULL);
|
||||
pthread_mutex_init(&go_mutex, NULL);
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerMutex::init() {
|
||||
pthread_mutex_lock(&go_mutex);
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerMutex::fire() {
|
||||
pthread_mutex_lock(&go_mutex);
|
||||
pthread_cond_signal(&go_cv);
|
||||
pthread_mutex_unlock(&go_mutex);
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerMutex::wait() {
|
||||
pthread_cond_wait(&go_cv, &go_mutex);
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerMutex::dump(std::ostream & oss) {
|
||||
oss << " trigger type = mutex" << std::endl ;
|
||||
}
|
||||
|
||||
/* ThreadTriggerFlag */
|
||||
Trick::ThreadTriggerFlag::ThreadTriggerFlag() : ThreadTriggerBase(TT_FLAG) , frame_trigger(false) {}
|
||||
|
||||
void Trick::ThreadTriggerFlag::fire() {
|
||||
frame_trigger = true ;
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerFlag::wait() {
|
||||
while (frame_trigger == false) ;
|
||||
frame_trigger = false ;
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerFlag::dump(std::ostream & oss) {
|
||||
oss << " trigger type = flag" << std::endl ;
|
||||
}
|
||||
|
||||
#if __linux
|
||||
#include <sys/eventfd.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/* ThreadTriggerEventFD */
|
||||
Trick::ThreadTriggerEventFD::ThreadTriggerEventFD() : ThreadTriggerBase(TT_EVENTFD) {
|
||||
efd = eventfd(0,0);
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerEventFD::fire() {
|
||||
uint64_t value = 1;
|
||||
write(efd, &value, sizeof(uint64_t));
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerEventFD::wait() {
|
||||
uint64_t value ;
|
||||
read(efd, &value, sizeof(uint64_t));
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerEventFD::dump(std::ostream & oss) {
|
||||
oss << " trigger type = eventfd" << std::endl ;
|
||||
}
|
||||
|
||||
#include <linux/futex.h>
|
||||
#include <syscall.h>
|
||||
|
||||
/* ThreadTriggerFutex */
|
||||
Trick::ThreadTriggerFutex::ThreadTriggerFutex() : ThreadTriggerBase(TT_FUTEX), futex_addr(0) {}
|
||||
|
||||
void Trick::ThreadTriggerFutex::fire() {
|
||||
syscall(SYS_futex, &futex_addr, FUTEX_WAKE, 1, NULL, NULL, 0);
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerFutex::wait() {
|
||||
futex_addr = 0;
|
||||
syscall(SYS_futex, &futex_addr, FUTEX_WAIT, 0, NULL, NULL, 0);
|
||||
}
|
||||
|
||||
void Trick::ThreadTriggerFutex::dump(std::ostream & oss) {
|
||||
oss << " trigger type = futex" << std::endl ;
|
||||
}
|
||||
#else
|
||||
|
||||
/* Empty implementations for OSes other than Linux */
|
||||
|
||||
Trick::ThreadTriggerEventFD::ThreadTriggerEventFD() : ThreadTriggerBase(TT_EVENTFD) , efd(0) {}
|
||||
void Trick::ThreadTriggerEventFD::fire() {}
|
||||
void Trick::ThreadTriggerEventFD::wait() {}
|
||||
void Trick::ThreadTriggerEventFD::dump(std::ostream & oss) {
|
||||
oss << " trigger type = eventfd (non-functional). How did you get here?" << std::endl ;
|
||||
}
|
||||
|
||||
Trick::ThreadTriggerFutex::ThreadTriggerFutex() : ThreadTriggerBase(TT_FUTEX) , futex_addr(0) {}
|
||||
void Trick::ThreadTriggerFutex::fire() {}
|
||||
void Trick::ThreadTriggerFutex::wait() {}
|
||||
void Trick::ThreadTriggerFutex::dump(std::ostream & oss) {
|
||||
oss << " trigger type = futex (non-functional). How did you get here?" << std::endl ;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/* ThreadTriggerContainer */
|
||||
Trick::ThreadTriggerContainer::ThreadTriggerContainer() : ttBase(&ttMutex) {}
|
||||
|
||||
Trick::ThreadTriggerBase * Trick::ThreadTriggerContainer::getThreadTrigger() {
|
||||
return ttBase ;
|
||||
}
|
||||
|
||||
/* Routine to change trigger type */
|
||||
void Trick::ThreadTriggerContainer::setThreadTrigger( ThreadTriggerType in_trigger_type ) {
|
||||
trigger_type = in_trigger_type ;
|
||||
switch ( trigger_type ) {
|
||||
case TT_MUTEX :
|
||||
default:
|
||||
ttBase = &ttMutex ;
|
||||
break ;
|
||||
case TT_FLAG :
|
||||
ttBase = &ttFlag ;
|
||||
break ;
|
||||
case TT_EVENTFD :
|
||||
#if __linux
|
||||
ttBase = &ttEventFD ;
|
||||
#else
|
||||
message_publish(MSG_ERROR, "EventFD thread trigger type not available, using mutex\n") ;
|
||||
ttBase = &ttMutex ;
|
||||
#endif
|
||||
break ;
|
||||
case TT_FUTEX :
|
||||
#if __linux
|
||||
ttBase = &ttFutex ;
|
||||
#else
|
||||
message_publish(MSG_ERROR, "Futex thread trigger type not available, using mutex\n") ;
|
||||
ttBase = &ttMutex ;
|
||||
#endif
|
||||
break ;
|
||||
}
|
||||
}
|
||||
|
99
trick_source/sim_services/Executive/include/ThreadTrigger.hh
Normal file
99
trick_source/sim_services/Executive/include/ThreadTrigger.hh
Normal file
@ -0,0 +1,99 @@
|
||||
/*
|
||||
PURPOSE:
|
||||
(Trick Thread trigger implementation)
|
||||
*/
|
||||
|
||||
#ifndef THREADTRIGGER_HH
|
||||
#define THREADTRIGGER_HH
|
||||
|
||||
#include <iostream>
|
||||
#include <pthread.h>
|
||||
|
||||
namespace Trick {
|
||||
|
||||
enum ThreadTriggerType {
|
||||
TT_MUTEX,
|
||||
TT_FLAG,
|
||||
TT_EVENTFD,
|
||||
TT_FUTEX
|
||||
} ;
|
||||
|
||||
class ThreadTriggerBase {
|
||||
public:
|
||||
ThreadTriggerType tt_type ;
|
||||
ThreadTriggerBase(ThreadTriggerType in_tt_type) ;
|
||||
virtual ~ThreadTriggerBase() ;
|
||||
virtual void init() ;
|
||||
virtual void fire() = 0 ;
|
||||
virtual void wait() = 0 ;
|
||||
virtual void dump( std::ostream & oss ) = 0 ;
|
||||
} ;
|
||||
|
||||
class ThreadTriggerMutex : public Trick::ThreadTriggerBase {
|
||||
public:
|
||||
ThreadTriggerMutex() ;
|
||||
virtual void init() ;
|
||||
virtual void fire() ;
|
||||
virtual void wait() ;
|
||||
virtual void dump( std::ostream & oss ) ;
|
||||
protected:
|
||||
/** Condition variable to control start of thread processing */
|
||||
pthread_cond_t go_cv; /**< trick_io(**) */
|
||||
/** Mutex to control start of thread processing */
|
||||
pthread_mutex_t go_mutex; /**< trick_io(**) */
|
||||
} ;
|
||||
|
||||
class ThreadTriggerFlag : public Trick::ThreadTriggerBase {
|
||||
public:
|
||||
ThreadTriggerFlag() ;
|
||||
virtual void fire() ;
|
||||
virtual void wait() ;
|
||||
virtual void dump( std::ostream & oss ) ;
|
||||
protected:
|
||||
/** Trigger from master to start thread processing */
|
||||
volatile bool frame_trigger; /**< trick_io(**) */
|
||||
} ;
|
||||
|
||||
class ThreadTriggerEventFD : public Trick::ThreadTriggerBase {
|
||||
public:
|
||||
ThreadTriggerEventFD() ;
|
||||
virtual void fire() ;
|
||||
virtual void wait() ;
|
||||
virtual void dump( std::ostream & oss ) ;
|
||||
protected:
|
||||
/** condition variable to control start of thread using eventfd */
|
||||
int efd; /**< trick_io(**) */
|
||||
} ;
|
||||
|
||||
class ThreadTriggerFutex : public Trick::ThreadTriggerBase {
|
||||
public:
|
||||
ThreadTriggerFutex() ;
|
||||
virtual void fire() ;
|
||||
virtual void wait() ;
|
||||
virtual void dump( std::ostream & oss ) ;
|
||||
protected:
|
||||
/** condition variable for futex */
|
||||
int futex_addr; /**< trick_io(**) */
|
||||
} ;
|
||||
|
||||
/* Container to hold all of the trigger types. This avoids having to
|
||||
allocate each trigger type when changing from one type to another */
|
||||
class ThreadTriggerContainer {
|
||||
|
||||
public:
|
||||
ThreadTriggerContainer() ;
|
||||
ThreadTriggerBase * getThreadTrigger() ;
|
||||
void setThreadTrigger( ThreadTriggerType trigger_type ) ;
|
||||
|
||||
protected:
|
||||
ThreadTriggerType trigger_type ;
|
||||
ThreadTriggerMutex ttMutex ;
|
||||
ThreadTriggerFlag ttFlag ;
|
||||
ThreadTriggerEventFD ttEventFD ;
|
||||
ThreadTriggerFutex ttFutex ;
|
||||
|
||||
ThreadTriggerBase * ttBase ;
|
||||
|
||||
} ;
|
||||
}
|
||||
#endif
|
@ -15,6 +15,7 @@
|
||||
#include <semaphore.h>
|
||||
#include <iostream>
|
||||
|
||||
#include "sim_services/Executive/include/ThreadTrigger.hh"
|
||||
#include "sim_services/ThreadBase/include/ThreadBase.hh"
|
||||
#include "sim_services/SimObject/include/SimObject.hh"
|
||||
#include "sim_services/ScheduledJobQueue/include/ScheduledJobQueue.hh"
|
||||
@ -85,7 +86,8 @@ namespace Trick {
|
||||
*/
|
||||
int time_tic_changed(long long old_time_tic_value, long long time_tic_value) ;
|
||||
|
||||
/** * Sets the wait at shutdown for asynchronous threads flag
|
||||
/**
|
||||
* Sets the wait at shutdown for asynchronous threads flag
|
||||
* @param yes_no - do we wait for the thread at shutdown or not?
|
||||
* @return always 0
|
||||
*/
|
||||
@ -141,20 +143,11 @@ namespace Trick {
|
||||
/** Thread has completed all jobs for this time step */
|
||||
volatile bool child_complete; /**< trick_io(**) */
|
||||
|
||||
/** Trigger from master to start thread processing */
|
||||
volatile int frame_trigger; /**< trick_units(--) */
|
||||
|
||||
/** True if the thread was started without errors. */
|
||||
bool running; /**< trick_units(--) */
|
||||
|
||||
/** Use a mutex to signal the start of thread processing */
|
||||
bool rt_semaphores; /**< trick_units(--) */
|
||||
|
||||
/** Condition variable to control start of thread processing */
|
||||
pthread_cond_t go_cv; /**< trick_io(**) */
|
||||
|
||||
/** Mutex to control start of thread processing */
|
||||
pthread_mutex_t go_mutex; /**< trick_io(**) */
|
||||
/** Trigger to start frame. Paused on by the thread, fired by the master */
|
||||
ThreadTriggerContainer trigger_container ; /**< trick_units(--) */
|
||||
|
||||
/** Wait for asynchronous jobs to finish at shutdown */
|
||||
bool shutdown_wait_async; /**< trick_units(--) */
|
||||
|
@ -168,14 +168,8 @@ int Trick::Executive::loop_multi_thread() {
|
||||
curr_thread->curr_time_tics = time_tics ;
|
||||
curr_thread->child_complete = false ;
|
||||
curr_thread->amf_next_tics += curr_thread->amf_cycle_tics ;
|
||||
curr_thread->trigger_container.getThreadTrigger()->fire() ;
|
||||
|
||||
if (curr_thread->rt_semaphores == true ) {
|
||||
pthread_mutex_lock(&(curr_thread->go_mutex));
|
||||
pthread_cond_signal(&(curr_thread->go_cv));
|
||||
pthread_mutex_unlock(&(curr_thread->go_mutex));
|
||||
} else {
|
||||
curr_thread->frame_trigger = true ;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,19 +1,10 @@
|
||||
|
||||
#include "sim_services/Executive/include/Executive.hh"
|
||||
#include "sim_services/Message/include/message_proto.h"
|
||||
|
||||
int Trick::Executive::set_thread_rt_semaphores(unsigned int thread_id , bool yes_no) {
|
||||
|
||||
int ret = 0 ;
|
||||
|
||||
/** @par Detailed Design */
|
||||
if ( (thread_id +1) > threads.size() ) {
|
||||
/** @li If the thread_id does not exist, return an error */
|
||||
ret = -2 ;
|
||||
} else {
|
||||
threads[thread_id]->rt_semaphores = yes_no ;
|
||||
}
|
||||
|
||||
return(ret) ;
|
||||
|
||||
message_publish(MSG_WARNING, "set_thread_rt_semaphores has been deprecated\n"
|
||||
"Use exec_get_thread(thread_id).set_trigger_type(Trick::ThreadTriggerType tt_type)") ;
|
||||
return -1 ;
|
||||
}
|
||||
|
||||
|
@ -17,13 +17,8 @@ Trick::Threads::Threads(int in_id , bool in_rt_nap) :
|
||||
rt_nap(in_rt_nap) ,
|
||||
process_type(PROCESS_TYPE_SCHEDULED) ,
|
||||
child_complete(false) ,
|
||||
frame_trigger(0) ,
|
||||
running(false) ,
|
||||
rt_semaphores(true) ,
|
||||
shutdown_wait_async(false) {
|
||||
pthread_cond_init(&go_cv, NULL);
|
||||
pthread_mutex_init(&go_mutex, NULL);
|
||||
|
||||
std::stringstream oss ;
|
||||
oss << "Child_" << in_id ;
|
||||
name = oss.str() ;
|
||||
@ -51,6 +46,7 @@ void Trick::Threads::dump( std::ostream & oss ) {
|
||||
case PROCESS_TYPE_ASYNC_CHILD: oss << "asynchronous" << std::endl ; break ;
|
||||
case PROCESS_TYPE_AMF_CHILD: oss << "asynchronous must finish with amf_cycle = " << amf_cycle << std::endl ; break ;
|
||||
}
|
||||
trigger_container.getThreadTrigger()->dump(oss) ;
|
||||
oss << " number of scheduled jobs = " << job_queue.size() << std::endl ;
|
||||
Trick::ThreadBase::dump(oss) ;
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ static int call_next_job(Trick::JobData * curr_job, Trick::ScheduledJobQueue & j
|
||||
void * Trick::Threads::thread_body() {
|
||||
|
||||
/* Lock the go mutex so the master has to wait until this child is ready before staring execution. */
|
||||
pthread_mutex_lock(&go_mutex);
|
||||
trigger_container.getThreadTrigger()->init() ;
|
||||
|
||||
/* signal the master that the child is ready and running */
|
||||
child_complete = true;
|
||||
@ -104,14 +104,8 @@ void * Trick::Threads::thread_body() {
|
||||
try {
|
||||
do {
|
||||
|
||||
/* Block child on go mutex or frame trigger until master signals. */
|
||||
if (rt_semaphores == true) {
|
||||
pthread_cond_wait(&go_cv, &go_mutex);
|
||||
} else {
|
||||
/* Else the child process frame is being started when a shared memory flag... */
|
||||
while (frame_trigger == false) {} ;
|
||||
frame_trigger = false ;
|
||||
}
|
||||
/* Block child on trigger until master signals. */
|
||||
trigger_container.getThreadTrigger()->wait() ;
|
||||
|
||||
if ( enabled ) {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user