From bf288636cbeb430b334e75b310765750465d8ef2 Mon Sep 17 00:00:00 2001 From: "Penn, John M 047828115" Date: Tue, 8 Oct 2019 14:14:18 -0500 Subject: [PATCH] Efficiently handle time-consistent and non-time consistent message modes in web server, plus lots of refactoring. ref #883 --- trick_sims/Cannon/SIM_cannon_numeric/S_define | 7 +- .../Cannon/SIM_cannon_numeric/S_overrides.mk | 4 +- .../SIM_cannon_numeric/www/apps/wsexp.html | 3 + .../include/VariableServerSession.hh | 35 ++----- .../include/WebSocketSession.hh | 8 +- .../mongoose_httpd/include/http_server.hh | 17 +++- .../src/VariableServerSession.cpp | 61 ++++++++---- .../models/mongoose_httpd/src/http_server.cpp | 97 ++++++++++++------- 8 files changed, 136 insertions(+), 96 deletions(-) diff --git a/trick_sims/Cannon/SIM_cannon_numeric/S_define b/trick_sims/Cannon/SIM_cannon_numeric/S_define index d7852fa7..3ba5e762 100644 --- a/trick_sims/Cannon/SIM_cannon_numeric/S_define +++ b/trick_sims/Cannon/SIM_cannon_numeric/S_define @@ -10,15 +10,16 @@ LIBRARY DEPENDENCIES: #include "sim_objects/default_trick_sys.sm" -// Uncomment to get a webserver. Also uncomment lines in S_overrise.mk to link in mongoose lib. -// #include "mongoose_httpd/mongoose_httpd.sm" +#ifdef TRICK_HTTP +#include "mongoose_httpd/mongoose_httpd.sm" +#endif ##include "cannon/gravity/include/cannon_numeric.h" class CannonSimObject : public Trick::SimObject { public: CANNON cannon ; - + int foo; CannonSimObject() { ("default_data") cannon_default_data( &cannon ) ; ("initialization") cannon_init( &cannon ) ; diff --git a/trick_sims/Cannon/SIM_cannon_numeric/S_overrides.mk b/trick_sims/Cannon/SIM_cannon_numeric/S_overrides.mk index 20930921..7a61ba06 100644 --- a/trick_sims/Cannon/SIM_cannon_numeric/S_overrides.mk +++ b/trick_sims/Cannon/SIM_cannon_numeric/S_overrides.mk @@ -1,6 +1,8 @@ TRICK_CFLAGS += -I../models TRICK_CXXFLAGS += -I../models -TRICK_SFLAGS += -I../models +# Uncomment these for an embedded webserver. +#TRICK_SFLAGS += -DTRICK_HTTP +#TRICK_SFLAGS += -I../models #TRICK_LDFLAGS += -L/usr/local/lib #TRICK_USER_LINK_LIBS += -lmongoose diff --git a/trick_sims/Cannon/SIM_cannon_numeric/www/apps/wsexp.html b/trick_sims/Cannon/SIM_cannon_numeric/www/apps/wsexp.html index 2807553a..c866a9ef 100644 --- a/trick_sims/Cannon/SIM_cannon_numeric/www/apps/wsexp.html +++ b/trick_sims/Cannon/SIM_cannon_numeric/www/apps/wsexp.html @@ -68,6 +68,9 @@ addVariable("dyn.cannon.pos[1]", 0.0); addVariable("dyn.cannon.vel[0]", 0.0); addVariable("dyn.cannon.vel[1]", 0.0); + addVariable("dyn.cannon.time", 0.0); + addVariable("dyn.cannon.timeRate", 0.0); + addVariable("dyn.cannon.impact", 0.0); addVariable("I.dont.exist", 0.0); sendMessage("{\"cmd\":\"var_unpause\"}"); }; diff --git a/trick_sims/Cannon/models/mongoose_httpd/include/VariableServerSession.hh b/trick_sims/Cannon/models/mongoose_httpd/include/VariableServerSession.hh index 1da366d6..e38f353c 100644 --- a/trick_sims/Cannon/models/mongoose_httpd/include/VariableServerSession.hh +++ b/trick_sims/Cannon/models/mongoose_httpd/include/VariableServerSession.hh @@ -1,32 +1,5 @@ /************************************************************************* PURPOSE: (Represent the state of a variable server websocket connection.) - - Messages sent from Client to Server - ================================ - { "cmd" : "var_add", - "var_name" : - } - { "cmd" : "var_pause" } - { "cmd" : "var_unpause" } - { "cmd" : "var_send" } - { "cmd" : "var_clear" } - { "cmd" : "var_exit" } - { "cmd" : "var_cycle", - "period" : - } - { "cmd" : "python", - "pycode" : - } - - Messages sent from Server to Client - ================================= - { "msg_type" : "error", - "error_text" : - } - { "msg_type" : "var_list" - "time" : - "values" : [] - } **************************************************************************/ #ifndef WSSESSION_HH @@ -42,13 +15,13 @@ class VariableServerSession : public WebSocketSession { public: VariableServerSession(struct mg_connection *nc); ~VariableServerSession(); - void stageData(); /* -- base */ + void marshallData(); /* -- base */ void sendMessage(); /* -- base */ int handleMessage(std::string); /* -- base */ void setTimeInterval(unsigned int milliseconds); void addVariable(char* vname); - void stageVariableValues(); + void stageValues(); void pause(); void unpause(); void clear(); @@ -60,9 +33,13 @@ class VariableServerSession : public WebSocketSession { int sendErrorMessage(const char* fmt, ... ); REF2* make_error_ref(const char* in_name); double stageTime; + bool dataStaged; + std::vector sessionVariables; bool cyclicSendEnabled; long long nextTime; long long intervalTimeTics; }; + +WebSocketSession* makeVariableServerSession( struct mg_connection *nc ); #endif diff --git a/trick_sims/Cannon/models/mongoose_httpd/include/WebSocketSession.hh b/trick_sims/Cannon/models/mongoose_httpd/include/WebSocketSession.hh index cd9392f4..0080ed48 100644 --- a/trick_sims/Cannon/models/mongoose_httpd/include/WebSocketSession.hh +++ b/trick_sims/Cannon/models/mongoose_httpd/include/WebSocketSession.hh @@ -11,7 +11,13 @@ class WebSocketSession { public: WebSocketSession(struct mg_connection *nc):connection(nc){}; virtual ~WebSocketSession() {}; - virtual void stageData()=0; + + /** + When HTTP_Server::time_homogeneous is set, WebSocketSession::marshallData() is called from the main + sim thread in a "top_of_frame" job, so that all of the data can be staged at + the same sim-time, in other words it's time-homogeneous. + */ + virtual void marshallData()=0; virtual void sendMessage()=0; virtual int handleMessage(std::string)=0; diff --git a/trick_sims/Cannon/models/mongoose_httpd/include/http_server.hh b/trick_sims/Cannon/models/mongoose_httpd/include/http_server.hh index 40ae22a9..c75908a2 100644 --- a/trick_sims/Cannon/models/mongoose_httpd/include/http_server.hh +++ b/trick_sims/Cannon/models/mongoose_httpd/include/http_server.hh @@ -26,14 +26,20 @@ class HTTP_Server { std::map< std::string, httpMethodHandler> httpGETHandlerMap; /* ** */ pthread_mutex_t httpGETHandlerMapLock; /* ** */ + std::map< std::string, WebSocketSessionMaker> WebSocketSessionMakerMap; /* ** */ pthread_mutex_t WebSocketSessionMakerMapLock; /* ** */ - std::map sessionMap; /* ** */ - pthread_mutex_t sessionMapLock; /* ** */ + + std::map webSocketSessionMap; /* ** */ + pthread_mutex_t webSocketSessionMapLock; /* ** */ + pthread_mutex_t serviceLock; /* ** */ - struct mg_serve_http_opts http_server_options; /* ** mongoose*/ - struct mg_bind_opts bind_opts; /* ** mongoose*/ + struct mg_serve_http_opts http_server_options; /* ** */ + struct mg_bind_opts bind_opts; /* ** */ pthread_cond_t serviceConnections; /* ** */ + bool service_websocket; + bool time_homogeneous; + bool sessionDataMarshalled; // Trick Job-functions int http_default_data(); @@ -44,7 +50,7 @@ class HTTP_Server { void installWebSocketSessionMaker(std::string name, WebSocketSessionMaker maker); void installHTTPGEThandler(std::string handlerName, httpMethodHandler handler); - // These are internals, and should be considered public. They are not private only + // These are internals, and should not be considered public. They are not private only // because they need to be callable from the servers event handler. void sendWebSocketSessionMessages(struct mg_connection *nc); void handleWebSocketClientMessage(struct mg_connection *nc, std::string msg); @@ -52,5 +58,6 @@ class HTTP_Server { void deleteWebSocketSession(struct mg_connection *nc); WebSocketSession* makeWebSocketSession(struct mg_connection *nc, std::string name); void handleHTTPGETrequest(struct mg_connection *nc, http_message *hm, std::string handlerName); + void marshallWebSocketSessionData(); }; #endif diff --git a/trick_sims/Cannon/models/mongoose_httpd/src/VariableServerSession.cpp b/trick_sims/Cannon/models/mongoose_httpd/src/VariableServerSession.cpp index 8db8c226..ab39a765 100644 --- a/trick_sims/Cannon/models/mongoose_httpd/src/VariableServerSession.cpp +++ b/trick_sims/Cannon/models/mongoose_httpd/src/VariableServerSession.cpp @@ -6,6 +6,7 @@ LIBRARY DEPENDENCIES: ) **************************************************************************/ #include +#include #include #include // for setprecision #include @@ -15,11 +16,10 @@ LIBRARY DEPENDENCIES: #include "../include/VariableServerSession.hh" #include "../include/simpleJSON.hh" - // CONSTRUCTOR VariableServerSession::VariableServerSession( struct mg_connection *nc ) : WebSocketSession(nc) { intervalTimeTics = exec_get_time_tic_value(); // Default time interval is one second. - nextTime = LLONG_MAX; + nextTime = 0; cyclicSendEnabled = false; } @@ -28,32 +28,45 @@ VariableServerSession::~VariableServerSession() { clear(); } -// Base class virtual function. -void VariableServerSession::stageData() { +/* Base class virtual function: marshallData + When HTTP_Server::time_homogeneous is set, WebSocketSession::marshallData() is + called from the main sim thread in a "top_of_frame" job, to ensure that all of + the data is staged at the same sim-time, in other words that it's time-homogeneous. +*/ +/* VariableServerSession::marshallData() conditionally stages message data when + sim_time has reached the next integer multiple of intervalTimeTics + (The specified period between messages). +*/ +void VariableServerSession::marshallData() { long long simulation_time_tics = exec_get_time_tics(); if ( cyclicSendEnabled && ( simulation_time_tics >= nextTime )) { - stageVariableValues(); + stageValues(); + nextTime = (simulation_time_tics - (simulation_time_tics % intervalTimeTics) + intervalTimeTics); } - nextTime = (simulation_time_tics - (simulation_time_tics % intervalTimeTics) + intervalTimeTics); } -// Base class virtual function. +/* Base class virtual function: sendMessage + if data is staged/marshalled, then compose and send a message containing that data. + */ void VariableServerSession::sendMessage() { std::vector::iterator it; std::stringstream ss; - ss << "{ \"msg_type\" : \"values\",\n"; - ss << " \"time\" : " << std::setprecision(16) << stageTime << ",\n"; - ss << " \"values\" : [\n"; + if (dataStaged) { + ss << "{ \"msg_type\" : \"values\",\n"; + ss << " \"time\" : " << std::setprecision(16) << stageTime << ",\n"; + ss << " \"values\" : [\n"; - for (it = sessionVariables.begin(); it != sessionVariables.end(); it++ ) { - if (it != sessionVariables.begin()) ss << ",\n"; - (*it)->writeValue(ss); - } - ss << "]}" << std::endl; - std::string tmp = ss.str(); - const char * message = tmp.c_str(); - mg_send_websocket_frame(connection, WEBSOCKET_OP_TEXT, message, strlen(message)); + for (it = sessionVariables.begin(); it != sessionVariables.end(); it++ ) { + if (it != sessionVariables.begin()) ss << ",\n"; + (*it)->writeValue(ss); + } + ss << "]}" << std::endl; + std::string tmp = ss.str(); + const char * message = tmp.c_str(); + mg_send_websocket_frame(connection, WEBSOCKET_OP_TEXT, message, strlen(message)); + dataStaged = false; + } } // Base class virtual function. @@ -91,7 +104,8 @@ int VariableServerSession::handleMessage(std::string client_msg) { } else if (cmd == "var_unpause") { unpause(); } else if (cmd == "var_send") { - stageVariableValues(); + // var_send responses are not guarenteed to be time-consistent. + stageValues(); sendMessage(); } else if (cmd == "var_clear") { clear(); @@ -111,6 +125,7 @@ int VariableServerSession::handleMessage(std::string client_msg) { } void VariableServerSession::setTimeInterval(unsigned int milliseconds) { + // CONSIDER: should we compare this with the realtime frame, and limit accordingly. intervalTimeTics = exec_get_time_tic_value() * milliseconds / 1000; } @@ -145,12 +160,13 @@ void VariableServerSession::addVariable(char* vname){ } } -void VariableServerSession::stageVariableValues() { +void VariableServerSession::stageValues() { stageTime = (double)exec_get_time_tics() / exec_get_time_tic_value(); std::vector::iterator it; for (it = sessionVariables.begin(); it != sessionVariables.end(); it++ ) { (*it)->stageValue(); } + dataStaged = true; } void VariableServerSession::pause() { cyclicSendEnabled = false; } @@ -202,3 +218,8 @@ REF2* VariableServerSession::make_error_ref(const char* in_name) { new_ref->attr->size = sizeof(int) ; return new_ref; } + +// WebSocketSessionMaker function for a VariableServerSession. +WebSocketSession* makeVariableServerSession( struct mg_connection *nc ) { + return new VariableServerSession(nc); +} diff --git a/trick_sims/Cannon/models/mongoose_httpd/src/http_server.cpp b/trick_sims/Cannon/models/mongoose_httpd/src/http_server.cpp index 4839d448..926f0d23 100644 --- a/trick_sims/Cannon/models/mongoose_httpd/src/http_server.cpp +++ b/trick_sims/Cannon/models/mongoose_httpd/src/http_server.cpp @@ -8,7 +8,7 @@ LIBRARY DEPENDENCIES: #include "../include/http_server.hh" #include "../include/http_GET_handlers.hh" #include "../include/VariableServerSession.hh" - +#include static const struct mg_str s_get_method = MG_MK_STR("GET"); static const struct mg_str s_put_method = MG_MK_STR("PUT"); static const struct mg_str s_delete_method = MG_MK_STR("DELETE"); @@ -91,14 +91,20 @@ static void* connectionAttendant (void* arg) { while(1) { pthread_mutex_lock(&S->serviceLock); // Wait here until the serviceConnections condition is signaled by the top_of_frame job. - pthread_cond_wait(&S->serviceConnections, &S->serviceLock); - + while (!S->service_websocket && !S->shutting_down) { + pthread_cond_wait(&S->serviceConnections, &S->serviceLock); + } if (S->shutting_down) { pthread_mutex_unlock(&S->serviceLock); return NULL; } else { + if (!S->sessionDataMarshalled) { + S->marshallWebSocketSessionData(); + } + // mg_mgr_poll returns the number of connections that still need to be serviced. while(mg_mgr_poll(&S->mgr, 50)); } + S->service_websocket= false; pthread_mutex_unlock(&S->serviceLock); } return NULL; @@ -132,8 +138,8 @@ void HTTP_Server::installHTTPGEThandler(std::string handlerName, httpMethodHandl pthread_mutex_unlock(&httpGETHandlerMapLock); } -// Lookup the appropriate httpMethodHandler by name, and execute it for the given connection -// and http_message. +/* Lookup the appropriate httpMethodHandler by name, and execute it for the + given connection and http_message. */ void HTTP_Server::handleHTTPGETrequest(struct mg_connection *nc, http_message *hm, std::string handlerName) { std::map::iterator iter; iter = httpGETHandlerMap.find(handlerName); @@ -145,26 +151,42 @@ void HTTP_Server::handleHTTPGETrequest(struct mg_connection *nc, http_message *h } } -// Find the session that goes with the given websocket connection, -// and tell it to send its values to the client (web browser). -void HTTP_Server::sendWebSocketSessionMessages(struct mg_connection *nc) { +/* Tell each of the sessions to marshall any data that they have to send. */ +void HTTP_Server::marshallWebSocketSessionData() { std::map::iterator iter; - iter = sessionMap.find(nc); - if (iter != sessionMap.end()) { + pthread_mutex_lock(&webSocketSessionMapLock); + for (iter = webSocketSessionMap.begin(); iter != webSocketSessionMap.end(); iter++ ) { + WebSocketSession* session = iter->second; + session->marshallData(); + } + sessionDataMarshalled = true; + pthread_mutex_unlock(&webSocketSessionMapLock); +} + +// Find the session that goes with the given websocket connection, +// and tell it to send any messages it may have, to the client (web browser). +void HTTP_Server::sendWebSocketSessionMessages(struct mg_connection *nc) { + + std::map::iterator iter; + pthread_mutex_lock(&webSocketSessionMapLock); + iter = webSocketSessionMap.find(nc); + if (iter != webSocketSessionMap.end()) { WebSocketSession* session = iter->second; session->sendMessage(); } + sessionDataMarshalled = false; + pthread_mutex_unlock(&webSocketSessionMapLock); } -// Delete the WebSocketSession associated with the given connection-pointer, and -// erase its pointer from the sessionMap. +/* Delete the WebSocketSession associated with the given connection-pointer, + and erase its pointer from the webSocketSessionMap. */ void HTTP_Server::deleteWebSocketSession(struct mg_connection *nc) { std::map::iterator iter; - iter = sessionMap.find(nc); - if (iter != sessionMap.end()) { + iter = webSocketSessionMap.find(nc); + if (iter != webSocketSessionMap.end()) { WebSocketSession* session = iter->second; delete session; - sessionMap.erase(iter); + webSocketSessionMap.erase(iter); } } @@ -172,8 +194,8 @@ void HTTP_Server::deleteWebSocketSession(struct mg_connection *nc) { // the given message to it. void HTTP_Server::handleWebSocketClientMessage(struct mg_connection *nc, std::string msg) { std::map::iterator iter; - iter = sessionMap.find(nc); - if (iter != sessionMap.end()) { + iter = webSocketSessionMap.find(nc); + if (iter != webSocketSessionMap.end()) { WebSocketSession* session = iter->second; session->handleMessage(msg); } @@ -181,15 +203,9 @@ void HTTP_Server::handleWebSocketClientMessage(struct mg_connection *nc, std::st // Install a WebSocketSession with a connection-pointer, the key by which it can be retrieved. void HTTP_Server::addWebSocketSession(struct mg_connection *nc, WebSocketSession* session) { - pthread_mutex_lock(&sessionMapLock); - sessionMap.insert( std::pair(nc, session) ); - - pthread_mutex_unlock(&sessionMapLock); -} - -// WebSocketSessionMaker function for a VariableServerSession. -static WebSocketSession* makeVariableServerSession( struct mg_connection *nc ) { - return new VariableServerSession(nc); + pthread_mutex_lock(&webSocketSessionMapLock); + webSocketSessionMap.insert( std::pair(nc, session) ); + pthread_mutex_unlock(&webSocketSessionMapLock); } // ========================================================================= @@ -199,19 +215,22 @@ static WebSocketSession* makeVariableServerSession( struct mg_connection *nc ) { // Trick "default_data" job. int HTTP_Server::http_default_data() { port = "8888"; + //port = "0"; document_root = "www"; + time_homogeneous = false; + service_websocket = false; shutting_down = false; + sessionDataMarshalled = false; installHTTPGEThandler("vs_connections", &handle_HTTP_GET_vs_connections); installHTTPGEThandler("alloc_info", &handle_HTTP_GET_alloc_info); installWebSocketSessionMaker("VariableServer", &makeVariableServerSession); - //installWebSocketSessionMaker("VariableServer", [](struct mg_connection *nc) -> WebSocketSession* { return new VariableServerSession(nc); } ); + return 0; } // Trick "initialization" job. int HTTP_Server::http_init() { - http_server_options.document_root = document_root; http_server_options.enable_directory_listing = "yes"; @@ -221,8 +240,15 @@ int HTTP_Server::http_init() { bind_opts.user_data = this; listener = mg_bind_opt( &mgr, port, ev_handler, bind_opts); + // Determine the ACTUAL listen port as opposed to the requested port. + // Note that if we specify port = "0" in the mg_bind_opt() call, then + // a port number will be chosen for us, and we have to find out what it actually is. + char buf[32]; + mg_conn_addr_to_str( listener, buf, 32, MG_SOCK_STRINGIFY_PORT); + port = strdup(buf); + if (listener != NULL) { - std::cout << "Trick Webserver: Starting, and listening on port " << port << ".\n" + std::cout << "Trick Webserver: Listening on port = " << port << ".\n" << "Trick Webserver: Document root = \"" << document_root << "\"" << std::endl; } else { @@ -239,16 +265,13 @@ int HTTP_Server::http_init() { int HTTP_Server::http_top_of_frame() { if (listener != NULL) { - // Have all of the sessions stage their data. We do this here, in a - // top_of_frame job, so that all of the data is time-homogeneous. - std::map::iterator iter; - pthread_mutex_lock(&sessionMapLock); - for (iter = sessionMap.begin(); iter != sessionMap.end(); iter++ ) { - WebSocketSession* session = iter->second; - session->stageData(); + if (time_homogeneous) { + /* Have all of the sessions stage their data in a top_of_frame job, so + that it's time-homogeneous. */ + marshallWebSocketSessionData(); } - pthread_mutex_unlock(&sessionMapLock); // Signal the server thread to construct and send the values-message to the client. + service_websocket= true; pthread_cond_signal( &serviceConnections ); } return 0;