Create Message client that writes to screen on a separate thread #553

Created a new message class that will copy incoming messages to a
buffer area.  The buffer area is pre allocated during simulation
initialization.  On a separate thread the buffer area is written
to std::cout.
This commit is contained in:
Alex Lin 2018-01-25 15:59:19 -06:00
parent 403621c139
commit 8f76605f3b
10 changed files with 202 additions and 8 deletions

View File

@ -59,7 +59,7 @@ namespace Trick {
@brief Initializes this subscriber. @brief Initializes this subscriber.
@return always 0 @return always 0
*/ */
int init() ; virtual int init() ;
protected: protected:
/** The output file stream. \n */ /** The output file stream. \n */

View File

@ -45,6 +45,11 @@ namespace Trick {
*/ */
virtual ~MessageSubscriber() {} ; virtual ~MessageSubscriber() {} ;
/**
@brief Initializes the subscriber
*/
virtual int init() { return 0 ; } ;
/** /**
@brief Get a message and send to output. This gets called every time when the message publisher @brief Get a message and send to output. This gets called every time when the message publisher
that this subscriber subscribes to publishes a message. Actual output done in the derived class. that this subscriber subscribes to publishes a message. Actual output done in the derived class.
@ -54,6 +59,11 @@ namespace Trick {
*/ */
virtual void update( unsigned int level , std::string header, std::string message ) = 0 ; virtual void update( unsigned int level , std::string header, std::string message ) = 0 ;
/**
@brief Shutdown the subscriber
*/
virtual int shutdown() { return 0 ; } ;
} ; } ;
} }

View File

@ -86,7 +86,7 @@ namespace Trick {
@brief Initializes this subscriber. @brief Initializes this subscriber.
@return always 0 @return always 0
*/ */
int init() ; virtual int init() ;
/** /**
@brief Restarts this subscriber. @brief Restarts this subscriber.
@ -98,7 +98,7 @@ namespace Trick {
@brief Shuts down this subscriber. @brief Shuts down this subscriber.
@return always 0 @return always 0
*/ */
int shutdown() ; virtual int shutdown() ;
/** The port number for message socket connection. Copied out from listen_thread.\n */ /** The port number for message socket connection. Copied out from listen_thread.\n */
int port ; /**< trick_units(--) */ int port ; /**< trick_units(--) */

View File

@ -0,0 +1,78 @@
/*
PURPOSE:
(Print messages to std::cout on a seperate thread)
*/
#ifndef MESSAGETHREADEDCOUT_HH
#define MESSAGETHREADEDCOUT_HH
#include <iostream>
#include "trick/ThreadBase.hh"
#include "trick/MessageSubscriber.hh"
namespace Trick {
/**
* This MessageThreadedCout is a class that inherits from MessageSubscriber.
* It defines a type of MessageSubscriber with its received message sending to the standard output stream.
*/
class MessageThreadedCout : public MessageSubscriber , public Trick::ThreadBase {
public:
/**
@brief Maximum number of items to hold in ring buffer
*/
unsigned int max_buffer_items ; // trick_units(--)
/**
@brief Maximum size of string to hold
*/
unsigned int max_buffer_size ; // trick_units(--)
std::string color_code ;
/**
@brief The constructor.
*/
MessageThreadedCout() ;
virtual ~MessageThreadedCout() {} ;
// From MessageSubscriber
virtual int init() ;
virtual void update( unsigned int level , std::string header , std::string message ) ;
virtual int shutdown() ;
// From Trick::ThreadBase
virtual void * thread_body() ;
virtual void dump( std::ostream & oss = std::cout ) ;
// Specific Classes
void write_pending_messages() ;
protected:
// After sim shutdown print immediately.
bool print_immediate ;
struct StringNode {
std::string buffer ;
size_t max_len ;
StringNode * next ;
StringNode(size_t str_len) : max_len(str_len) { buffer.reserve(max_len) ; } ;
void copy( std::string &header, std::string & color_code, std::string & message ) {
buffer.assign(header, 0, max_len) ;
buffer.append(color_code, 0, max_len - buffer.length()) ;
buffer.append(message, 0, max_len - buffer.length()) ;
buffer.append("\033[00m", 0, max_len - buffer.length()) ;
} ;
} ;
StringNode * copy_ptr ; /* trick_io(**) pointer to next buffer to copy data */
StringNode * write_ptr ; /* trick_io(**) pointer to next buffer to write to screen */
pthread_mutex_t write_mutex ; /* trick_io(**) mutex for writing */
} ;
}
#endif

View File

@ -25,6 +25,7 @@
#include "trick/MSSharedMem.hh" #include "trick/MSSharedMem.hh"
#include "trick/MemoryManager.hh" #include "trick/MemoryManager.hh"
#include "trick/MessageCout.hh" #include "trick/MessageCout.hh"
#include "trick/MessageThreadedCout.hh"
#include "trick/MessageFile.hh" #include "trick/MessageFile.hh"
#include "trick/MessageLCout.hh" #include "trick/MessageLCout.hh"
#include "trick/MessagePublisher.hh" #include "trick/MessagePublisher.hh"

View File

@ -1,8 +1,7 @@
/* /*
PURPOSE: PURPOSE:
(This header file defines the "RELEASE()" macro for supported platforms. (This header file defines the "RELEASE()" macro for supported platforms.
The release function should yield the processor from the processes that The release function should yield the processor from the processes that makes the call. This allows the UNIX schedular to arbitrate process
makes the call. This allows the UNIX schedular to arbitrate process
priorities during spinloops based on the status of processes waiting priorities during spinloops based on the status of processes waiting
to run) to run)
PROGRAMMER: PROGRAMMER:

View File

@ -47,6 +47,7 @@ a replacement SimObject will create an uncompilable sim.
##include "trick/MessagePublisher.hh" ##include "trick/MessagePublisher.hh"
##include "trick/MessageSubscriber.hh" ##include "trick/MessageSubscriber.hh"
##include "trick/MessageCout.hh" ##include "trick/MessageCout.hh"
##include "trick/MessageThreadedCout.hh"
##include "trick/MessageLCout.hh" ##include "trick/MessageLCout.hh"
##include "trick/MessageFile.hh" ##include "trick/MessageFile.hh"
##include "trick/MessageTCDevice.hh" ##include "trick/MessageTCDevice.hh"
@ -266,6 +267,7 @@ class MessageSimObject : public Trick::SimObject {
public: public:
Trick::MessagePublisher mpublisher ; Trick::MessagePublisher mpublisher ;
Trick::MessageCout mcout ; Trick::MessageCout mcout ;
Trick::MessageThreadedCout mtcout ;
Trick::MessageFile mfile ; Trick::MessageFile mfile ;
Trick::MessageTCDevice mdevice ; Trick::MessageTCDevice mdevice ;
Trick::PlaybackFile pfile ; Trick::PlaybackFile pfile ;
@ -273,11 +275,13 @@ class MessageSimObject : public Trick::SimObject {
MessageSimObject() { MessageSimObject() {
{TRK} ("default_data") mpublisher.subscribe(&mcout) ; {TRK} ("default_data") mpublisher.subscribe(&mcout) ;
//{TRK} ("default_data") mpublisher.subscribe(&mtcout) ;
{TRK} ("default_data") mpublisher.subscribe(&mfile) ; {TRK} ("default_data") mpublisher.subscribe(&mfile) ;
{TRK} ("default_data") mpublisher.subscribe(&mdevice) ; {TRK} ("default_data") mpublisher.subscribe(&mdevice) ;
{TRK} ("default_data") mpublisher.subscribe(&pfile) ; {TRK} ("default_data") mpublisher.subscribe(&pfile) ;
{TRK} ("default_data") mdevice.default_data() ; {TRK} ("default_data") mdevice.default_data() ;
{TRK} P1 ("initialization") mpublisher.init() ; {TRK} P1 ("initialization") mpublisher.init() ;
//{TRK} P1 ("initialization") mtcout.init() ;
{TRK} P1 ("initialization") mfile.init() ; {TRK} P1 ("initialization") mfile.init() ;
{TRK} P1 ("initialization") pfile.init() ; {TRK} P1 ("initialization") pfile.init() ;
{TRK} P1 ("initialization") mdevice.init() ; {TRK} P1 ("initialization") mdevice.init() ;
@ -287,6 +291,7 @@ class MessageSimObject : public Trick::SimObject {
#ifndef TRICK_NO_DMTCP #ifndef TRICK_NO_DMTCP
{TRK} P1 ("dmtcp_restart") mdevice.restart() ; {TRK} P1 ("dmtcp_restart") mdevice.restart() ;
#endif #endif
{TRK} ("shutdown") mtcout.shutdown() ;
{TRK} ("shutdown") mdevice.shutdown() ; {TRK} ("shutdown") mdevice.shutdown() ;
} }

View File

@ -71,9 +71,7 @@ int Trick::MessagePublisher::publish(int level , std::string message) {
// multithreaded sims from interleaving header and message elements. // multithreaded sims from interleaving header and message elements.
std::ostringstream oss; std::ostringstream oss;
oss << header << message ; oss << header << message ;
std::cout << oss.str() << std::flush ; std::cout << oss.str() << std::flush ; } return(0) ;
}
return(0) ;
} }

View File

@ -0,0 +1,102 @@
#include "trick/MessageThreadedCout.hh"
#include "trick/message_type.h"
#include "trick/release.h"
#include <sstream>
Trick::MessageThreadedCout::MessageThreadedCout() :
max_buffer_items(400),
max_buffer_size(4000) ,
print_immediate(false) ,
copy_ptr(NULL),
write_ptr(NULL) {
/** By default, this subscriber is enabled when it is created. */
Trick::MessageSubscriber::name = "threadedcout" ;
Trick::ThreadBase::name = "threadedcout" ;
color_code.reserve(6) ;
StringNode * temp = new StringNode(max_buffer_size) ;
write_ptr = copy_ptr = temp ;
for ( unsigned int ii = 1 ; ii < max_buffer_items ; ii++ ) {
temp = new StringNode(max_buffer_size) ;
copy_ptr->next = temp ;
copy_ptr = temp ;
}
copy_ptr->next = write_ptr ;
copy_ptr = write_ptr ;
pthread_mutex_init(&write_mutex, NULL);
}
int Trick::MessageThreadedCout::init() {
create_thread() ;
return 0 ;
}
void Trick::MessageThreadedCout::update( unsigned int level , std::string header , std::string message ) {
/** @li Prints the received message to the standard output stream. */
if (enabled && level < 100 ) {
switch (level) {
case MSG_NORMAL :
color_code = "\033[00m" ; // normal
break ;
case MSG_INFO :
color_code = "\033[32m" ; // green
break ;
case MSG_WARNING :
color_code = "\033[33m" ; // yellow
break ;
case MSG_ERROR :
color_code = "\033[31m" ; // red
break ;
case MSG_DEBUG :
color_code = "\033[36m" ; // cyan
break ;
default :
color_code = "\033[00m" ; // normal
break ;
}
if ( copy_ptr->next != write_ptr ) {
copy_ptr->copy(header, color_code, message) ;
copy_ptr = copy_ptr->next ;
// After shutdown the sim sends a termination message. print it now.
if ( print_immediate ) {
write_pending_messages() ;
}
}
}
}
void * Trick::MessageThreadedCout::thread_body() {
while(1) {
write_pending_messages() ;
RELEASE() ;
}
}
void Trick::MessageThreadedCout::write_pending_messages() {
pthread_mutex_lock(&write_mutex) ;
while ( write_ptr != copy_ptr ) {
std::cout << write_ptr->buffer << std::flush ;
write_ptr = write_ptr->next ;
}
pthread_mutex_unlock(&write_mutex) ;
}
int Trick::MessageThreadedCout::shutdown() {
write_pending_messages() ;
// After shutdown print any late incoming messages immediately.
print_immediate = true ;
//TODO: delete memory
return 0 ;
}
void Trick::MessageThreadedCout::dump( std::ostream & oss ) {
Trick::ThreadBase::dump(oss) ;
}

View File

@ -115,6 +115,7 @@
#include "trick/AttributesMap.hh" #include "trick/AttributesMap.hh"
#include "trick/sie_c_intf.h" #include "trick/sie_c_intf.h"
#include "trick/MessageCout.hh" #include "trick/MessageCout.hh"
#include "trick/MessageThreadedCout.hh"
#include "trick/MessageFile.hh" #include "trick/MessageFile.hh"
#include "trick/MessageLCout.hh" #include "trick/MessageLCout.hh"
#include "trick/MessagePublisher.hh" #include "trick/MessagePublisher.hh"