Efficiently handle time-consistent and non-time consistent message modes in web server, plus lots of refactoring. ref #883

This commit is contained in:
Penn, John M 047828115 2019-10-08 14:14:18 -05:00
parent 5e11a40a69
commit bf288636cb
8 changed files with 136 additions and 96 deletions

View File

@ -10,15 +10,16 @@ LIBRARY DEPENDENCIES:
#include "sim_objects/default_trick_sys.sm"
// Uncomment to get a webserver. Also uncomment lines in S_overrise.mk to link in mongoose lib.
// #include "mongoose_httpd/mongoose_httpd.sm"
#ifdef TRICK_HTTP
#include "mongoose_httpd/mongoose_httpd.sm"
#endif
##include "cannon/gravity/include/cannon_numeric.h"
class CannonSimObject : public Trick::SimObject {
public:
CANNON cannon ;
int foo;
CannonSimObject() {
("default_data") cannon_default_data( &cannon ) ;
("initialization") cannon_init( &cannon ) ;

View File

@ -1,6 +1,8 @@
TRICK_CFLAGS += -I../models
TRICK_CXXFLAGS += -I../models
TRICK_SFLAGS += -I../models
# Uncomment these for an embedded webserver.
#TRICK_SFLAGS += -DTRICK_HTTP
#TRICK_SFLAGS += -I../models
#TRICK_LDFLAGS += -L/usr/local/lib
#TRICK_USER_LINK_LIBS += -lmongoose

View File

@ -68,6 +68,9 @@
addVariable("dyn.cannon.pos[1]", 0.0);
addVariable("dyn.cannon.vel[0]", 0.0);
addVariable("dyn.cannon.vel[1]", 0.0);
addVariable("dyn.cannon.time", 0.0);
addVariable("dyn.cannon.timeRate", 0.0);
addVariable("dyn.cannon.impact", 0.0);
addVariable("I.dont.exist", 0.0);
sendMessage("{\"cmd\":\"var_unpause\"}");
};

View File

@ -1,32 +1,5 @@
/*************************************************************************
PURPOSE: (Represent the state of a variable server websocket connection.)
Messages sent from Client to Server
================================
{ "cmd" : "var_add",
"var_name" : <str>
}
{ "cmd" : "var_pause" }
{ "cmd" : "var_unpause" }
{ "cmd" : "var_send" }
{ "cmd" : "var_clear" }
{ "cmd" : "var_exit" }
{ "cmd" : "var_cycle",
"period" : <int>
}
{ "cmd" : "python",
"pycode" : <str>
}
Messages sent from Server to Client
=================================
{ "msg_type" : "error",
"error_text" : <str>
}
{ "msg_type" : "var_list"
"time" : <double>
"values" : []
}
**************************************************************************/
#ifndef WSSESSION_HH
@ -42,13 +15,13 @@ class VariableServerSession : public WebSocketSession {
public:
VariableServerSession(struct mg_connection *nc);
~VariableServerSession();
void stageData(); /* -- base */
void marshallData(); /* -- base */
void sendMessage(); /* -- base */
int handleMessage(std::string); /* -- base */
void setTimeInterval(unsigned int milliseconds);
void addVariable(char* vname);
void stageVariableValues();
void stageValues();
void pause();
void unpause();
void clear();
@ -60,9 +33,13 @@ class VariableServerSession : public WebSocketSession {
int sendErrorMessage(const char* fmt, ... );
REF2* make_error_ref(const char* in_name);
double stageTime;
bool dataStaged;
std::vector<VariableServerVariable*> sessionVariables;
bool cyclicSendEnabled;
long long nextTime;
long long intervalTimeTics;
};
WebSocketSession* makeVariableServerSession( struct mg_connection *nc );
#endif

View File

@ -11,7 +11,13 @@ class WebSocketSession {
public:
WebSocketSession(struct mg_connection *nc):connection(nc){};
virtual ~WebSocketSession() {};
virtual void stageData()=0;
/**
When HTTP_Server::time_homogeneous is set, WebSocketSession::marshallData() is called from the main
sim thread in a "top_of_frame" job, so that all of the data can be staged at
the same sim-time, in other words it's time-homogeneous.
*/
virtual void marshallData()=0;
virtual void sendMessage()=0;
virtual int handleMessage(std::string)=0;

View File

@ -26,14 +26,20 @@ class HTTP_Server {
std::map< std::string, httpMethodHandler> httpGETHandlerMap; /* ** */
pthread_mutex_t httpGETHandlerMapLock; /* ** */
std::map< std::string, WebSocketSessionMaker> WebSocketSessionMakerMap; /* ** */
pthread_mutex_t WebSocketSessionMakerMapLock; /* ** */
std::map<mg_connection*, WebSocketSession*> sessionMap; /* ** */
pthread_mutex_t sessionMapLock; /* ** */
std::map<mg_connection*, WebSocketSession*> webSocketSessionMap; /* ** */
pthread_mutex_t webSocketSessionMapLock; /* ** */
pthread_mutex_t serviceLock; /* ** */
struct mg_serve_http_opts http_server_options; /* ** mongoose*/
struct mg_bind_opts bind_opts; /* ** mongoose*/
struct mg_serve_http_opts http_server_options; /* ** */
struct mg_bind_opts bind_opts; /* ** */
pthread_cond_t serviceConnections; /* ** */
bool service_websocket;
bool time_homogeneous;
bool sessionDataMarshalled;
// Trick Job-functions
int http_default_data();
@ -44,7 +50,7 @@ class HTTP_Server {
void installWebSocketSessionMaker(std::string name, WebSocketSessionMaker maker);
void installHTTPGEThandler(std::string handlerName, httpMethodHandler handler);
// These are internals, and should be considered public. They are not private only
// These are internals, and should not be considered public. They are not private only
// because they need to be callable from the servers event handler.
void sendWebSocketSessionMessages(struct mg_connection *nc);
void handleWebSocketClientMessage(struct mg_connection *nc, std::string msg);
@ -52,5 +58,6 @@ class HTTP_Server {
void deleteWebSocketSession(struct mg_connection *nc);
WebSocketSession* makeWebSocketSession(struct mg_connection *nc, std::string name);
void handleHTTPGETrequest(struct mg_connection *nc, http_message *hm, std::string handlerName);
void marshallWebSocketSessionData();
};
#endif

View File

@ -6,6 +6,7 @@ LIBRARY DEPENDENCIES:
)
**************************************************************************/
#include <string>
#include <iostream>
#include <sstream>
#include <iomanip> // for setprecision
#include <algorithm>
@ -15,11 +16,10 @@ LIBRARY DEPENDENCIES:
#include "../include/VariableServerSession.hh"
#include "../include/simpleJSON.hh"
// CONSTRUCTOR
VariableServerSession::VariableServerSession( struct mg_connection *nc ) : WebSocketSession(nc) {
intervalTimeTics = exec_get_time_tic_value(); // Default time interval is one second.
nextTime = LLONG_MAX;
nextTime = 0;
cyclicSendEnabled = false;
}
@ -28,32 +28,45 @@ VariableServerSession::~VariableServerSession() {
clear();
}
// Base class virtual function.
void VariableServerSession::stageData() {
/* Base class virtual function: marshallData
When HTTP_Server::time_homogeneous is set, WebSocketSession::marshallData() is
called from the main sim thread in a "top_of_frame" job, to ensure that all of
the data is staged at the same sim-time, in other words that it's time-homogeneous.
*/
/* VariableServerSession::marshallData() conditionally stages message data when
sim_time has reached the next integer multiple of intervalTimeTics
(The specified period between messages).
*/
void VariableServerSession::marshallData() {
long long simulation_time_tics = exec_get_time_tics();
if ( cyclicSendEnabled && ( simulation_time_tics >= nextTime )) {
stageVariableValues();
stageValues();
nextTime = (simulation_time_tics - (simulation_time_tics % intervalTimeTics) + intervalTimeTics);
}
nextTime = (simulation_time_tics - (simulation_time_tics % intervalTimeTics) + intervalTimeTics);
}
// Base class virtual function.
/* Base class virtual function: sendMessage
if data is staged/marshalled, then compose and send a message containing that data.
*/
void VariableServerSession::sendMessage() {
std::vector<VariableServerVariable*>::iterator it;
std::stringstream ss;
ss << "{ \"msg_type\" : \"values\",\n";
ss << " \"time\" : " << std::setprecision(16) << stageTime << ",\n";
ss << " \"values\" : [\n";
if (dataStaged) {
ss << "{ \"msg_type\" : \"values\",\n";
ss << " \"time\" : " << std::setprecision(16) << stageTime << ",\n";
ss << " \"values\" : [\n";
for (it = sessionVariables.begin(); it != sessionVariables.end(); it++ ) {
if (it != sessionVariables.begin()) ss << ",\n";
(*it)->writeValue(ss);
}
ss << "]}" << std::endl;
std::string tmp = ss.str();
const char * message = tmp.c_str();
mg_send_websocket_frame(connection, WEBSOCKET_OP_TEXT, message, strlen(message));
for (it = sessionVariables.begin(); it != sessionVariables.end(); it++ ) {
if (it != sessionVariables.begin()) ss << ",\n";
(*it)->writeValue(ss);
}
ss << "]}" << std::endl;
std::string tmp = ss.str();
const char * message = tmp.c_str();
mg_send_websocket_frame(connection, WEBSOCKET_OP_TEXT, message, strlen(message));
dataStaged = false;
}
}
// Base class virtual function.
@ -91,7 +104,8 @@ int VariableServerSession::handleMessage(std::string client_msg) {
} else if (cmd == "var_unpause") {
unpause();
} else if (cmd == "var_send") {
stageVariableValues();
// var_send responses are not guarenteed to be time-consistent.
stageValues();
sendMessage();
} else if (cmd == "var_clear") {
clear();
@ -111,6 +125,7 @@ int VariableServerSession::handleMessage(std::string client_msg) {
}
void VariableServerSession::setTimeInterval(unsigned int milliseconds) {
// CONSIDER: should we compare this with the realtime frame, and limit accordingly.
intervalTimeTics = exec_get_time_tic_value() * milliseconds / 1000;
}
@ -145,12 +160,13 @@ void VariableServerSession::addVariable(char* vname){
}
}
void VariableServerSession::stageVariableValues() {
void VariableServerSession::stageValues() {
stageTime = (double)exec_get_time_tics() / exec_get_time_tic_value();
std::vector<VariableServerVariable*>::iterator it;
for (it = sessionVariables.begin(); it != sessionVariables.end(); it++ ) {
(*it)->stageValue();
}
dataStaged = true;
}
void VariableServerSession::pause() { cyclicSendEnabled = false; }
@ -202,3 +218,8 @@ REF2* VariableServerSession::make_error_ref(const char* in_name) {
new_ref->attr->size = sizeof(int) ;
return new_ref;
}
// WebSocketSessionMaker function for a VariableServerSession.
WebSocketSession* makeVariableServerSession( struct mg_connection *nc ) {
return new VariableServerSession(nc);
}

View File

@ -8,7 +8,7 @@ LIBRARY DEPENDENCIES:
#include "../include/http_server.hh"
#include "../include/http_GET_handlers.hh"
#include "../include/VariableServerSession.hh"
#include <string.h>
static const struct mg_str s_get_method = MG_MK_STR("GET");
static const struct mg_str s_put_method = MG_MK_STR("PUT");
static const struct mg_str s_delete_method = MG_MK_STR("DELETE");
@ -91,14 +91,20 @@ static void* connectionAttendant (void* arg) {
while(1) {
pthread_mutex_lock(&S->serviceLock);
// Wait here until the serviceConnections condition is signaled by the top_of_frame job.
pthread_cond_wait(&S->serviceConnections, &S->serviceLock);
while (!S->service_websocket && !S->shutting_down) {
pthread_cond_wait(&S->serviceConnections, &S->serviceLock);
}
if (S->shutting_down) {
pthread_mutex_unlock(&S->serviceLock);
return NULL;
} else {
if (!S->sessionDataMarshalled) {
S->marshallWebSocketSessionData();
}
// mg_mgr_poll returns the number of connections that still need to be serviced.
while(mg_mgr_poll(&S->mgr, 50));
}
S->service_websocket= false;
pthread_mutex_unlock(&S->serviceLock);
}
return NULL;
@ -132,8 +138,8 @@ void HTTP_Server::installHTTPGEThandler(std::string handlerName, httpMethodHandl
pthread_mutex_unlock(&httpGETHandlerMapLock);
}
// Lookup the appropriate httpMethodHandler by name, and execute it for the given connection
// and http_message.
/* Lookup the appropriate httpMethodHandler by name, and execute it for the
given connection and http_message. */
void HTTP_Server::handleHTTPGETrequest(struct mg_connection *nc, http_message *hm, std::string handlerName) {
std::map<std::string, httpMethodHandler>::iterator iter;
iter = httpGETHandlerMap.find(handlerName);
@ -145,26 +151,42 @@ void HTTP_Server::handleHTTPGETrequest(struct mg_connection *nc, http_message *h
}
}
// Find the session that goes with the given websocket connection,
// and tell it to send its values to the client (web browser).
void HTTP_Server::sendWebSocketSessionMessages(struct mg_connection *nc) {
/* Tell each of the sessions to marshall any data that they have to send. */
void HTTP_Server::marshallWebSocketSessionData() {
std::map<mg_connection*, WebSocketSession*>::iterator iter;
iter = sessionMap.find(nc);
if (iter != sessionMap.end()) {
pthread_mutex_lock(&webSocketSessionMapLock);
for (iter = webSocketSessionMap.begin(); iter != webSocketSessionMap.end(); iter++ ) {
WebSocketSession* session = iter->second;
session->marshallData();
}
sessionDataMarshalled = true;
pthread_mutex_unlock(&webSocketSessionMapLock);
}
// Find the session that goes with the given websocket connection,
// and tell it to send any messages it may have, to the client (web browser).
void HTTP_Server::sendWebSocketSessionMessages(struct mg_connection *nc) {
std::map<mg_connection*, WebSocketSession*>::iterator iter;
pthread_mutex_lock(&webSocketSessionMapLock);
iter = webSocketSessionMap.find(nc);
if (iter != webSocketSessionMap.end()) {
WebSocketSession* session = iter->second;
session->sendMessage();
}
sessionDataMarshalled = false;
pthread_mutex_unlock(&webSocketSessionMapLock);
}
// Delete the WebSocketSession associated with the given connection-pointer, and
// erase its pointer from the sessionMap.
/* Delete the WebSocketSession associated with the given connection-pointer,
and erase its pointer from the webSocketSessionMap. */
void HTTP_Server::deleteWebSocketSession(struct mg_connection *nc) {
std::map<mg_connection*, WebSocketSession*>::iterator iter;
iter = sessionMap.find(nc);
if (iter != sessionMap.end()) {
iter = webSocketSessionMap.find(nc);
if (iter != webSocketSessionMap.end()) {
WebSocketSession* session = iter->second;
delete session;
sessionMap.erase(iter);
webSocketSessionMap.erase(iter);
}
}
@ -172,8 +194,8 @@ void HTTP_Server::deleteWebSocketSession(struct mg_connection *nc) {
// the given message to it.
void HTTP_Server::handleWebSocketClientMessage(struct mg_connection *nc, std::string msg) {
std::map<mg_connection*, WebSocketSession*>::iterator iter;
iter = sessionMap.find(nc);
if (iter != sessionMap.end()) {
iter = webSocketSessionMap.find(nc);
if (iter != webSocketSessionMap.end()) {
WebSocketSession* session = iter->second;
session->handleMessage(msg);
}
@ -181,15 +203,9 @@ void HTTP_Server::handleWebSocketClientMessage(struct mg_connection *nc, std::st
// Install a WebSocketSession with a connection-pointer, the key by which it can be retrieved.
void HTTP_Server::addWebSocketSession(struct mg_connection *nc, WebSocketSession* session) {
pthread_mutex_lock(&sessionMapLock);
sessionMap.insert( std::pair<mg_connection*, WebSocketSession*>(nc, session) );
pthread_mutex_unlock(&sessionMapLock);
}
// WebSocketSessionMaker function for a VariableServerSession.
static WebSocketSession* makeVariableServerSession( struct mg_connection *nc ) {
return new VariableServerSession(nc);
pthread_mutex_lock(&webSocketSessionMapLock);
webSocketSessionMap.insert( std::pair<mg_connection*, WebSocketSession*>(nc, session) );
pthread_mutex_unlock(&webSocketSessionMapLock);
}
// =========================================================================
@ -199,19 +215,22 @@ static WebSocketSession* makeVariableServerSession( struct mg_connection *nc ) {
// Trick "default_data" job.
int HTTP_Server::http_default_data() {
port = "8888";
//port = "0";
document_root = "www";
time_homogeneous = false;
service_websocket = false;
shutting_down = false;
sessionDataMarshalled = false;
installHTTPGEThandler("vs_connections", &handle_HTTP_GET_vs_connections);
installHTTPGEThandler("alloc_info", &handle_HTTP_GET_alloc_info);
installWebSocketSessionMaker("VariableServer", &makeVariableServerSession);
//installWebSocketSessionMaker("VariableServer", [](struct mg_connection *nc) -> WebSocketSession* { return new VariableServerSession(nc); } );
return 0;
}
// Trick "initialization" job.
int HTTP_Server::http_init() {
http_server_options.document_root = document_root;
http_server_options.enable_directory_listing = "yes";
@ -221,8 +240,15 @@ int HTTP_Server::http_init() {
bind_opts.user_data = this;
listener = mg_bind_opt( &mgr, port, ev_handler, bind_opts);
// Determine the ACTUAL listen port as opposed to the requested port.
// Note that if we specify port = "0" in the mg_bind_opt() call, then
// a port number will be chosen for us, and we have to find out what it actually is.
char buf[32];
mg_conn_addr_to_str( listener, buf, 32, MG_SOCK_STRINGIFY_PORT);
port = strdup(buf);
if (listener != NULL) {
std::cout << "Trick Webserver: Starting, and listening on port " << port << ".\n"
std::cout << "Trick Webserver: Listening on port = " << port << ".\n"
<< "Trick Webserver: Document root = \"" << document_root << "\""
<< std::endl;
} else {
@ -239,16 +265,13 @@ int HTTP_Server::http_init() {
int HTTP_Server::http_top_of_frame() {
if (listener != NULL) {
// Have all of the sessions stage their data. We do this here, in a
// top_of_frame job, so that all of the data is time-homogeneous.
std::map<mg_connection*, WebSocketSession*>::iterator iter;
pthread_mutex_lock(&sessionMapLock);
for (iter = sessionMap.begin(); iter != sessionMap.end(); iter++ ) {
WebSocketSession* session = iter->second;
session->stageData();
if (time_homogeneous) {
/* Have all of the sessions stage their data in a top_of_frame job, so
that it's time-homogeneous. */
marshallWebSocketSessionData();
}
pthread_mutex_unlock(&sessionMapLock);
// Signal the server thread to construct and send the values-message to the client.
service_websocket= true;
pthread_cond_signal( &serviceConnections );
}
return 0;