Add the ability to add new types of WebSocketSessions to make server extensible

This commit is contained in:
Penn, John M 047828115 2019-08-20 18:14:47 -05:00
parent 2998180231
commit 2e08379e42
10 changed files with 154 additions and 126 deletions

View File

@ -107,7 +107,7 @@ function updatePage(start, count) {
updateHeader(allocData);
}
}
xhr.open('GET', `/api/v1/alloc_info?start=${start}&count=${count}`);
xhr.open('GET', `/api/http/alloc_info?start=${start}&count=${count}`);
xhr.send(null);
}
@ -148,7 +148,7 @@ xhr.onreadystatechange = function() {
updateHeader(allocData);
}
}
xhr.open('GET', '/api/v1/alloc_info?start=0&count=10');
xhr.open('GET', '/api/http/alloc_info?start=0&count=10');
xhr.send(null);
</script>

View File

@ -67,7 +67,7 @@
showVSConnections(myObj);
}
}
xhr.open('GET', '/api/v1/vs_connections');
xhr.open('GET', '/api/http/vs_connections');
xhr.send(null);
</script>
</body>

View File

@ -57,7 +57,7 @@
}
var varTable = document.querySelector('table.variables');
var ws = new WebSocket("ws://localhost:8888", "myProtocol");
var ws = new WebSocket("ws://localhost:8888/api/ws/VariableServer", "myProtocol");
// WebSocket Event Handlers
ws.onopen = function(e) {

View File

@ -1,7 +1,32 @@
/*************************************************************************
PURPOSE: (Represent Websocket variable server connection.)
LIBRARY DEPENDENCIES:
( (../src/WSSession.o))
( (../src/VariableServerSession.o))
Messages sent from Client to Server
================================
{ "cmd" : "var_add",
"var_name" : <str>
}
{ "cmd" : "var_pause" }
{ "cmd" : "var_unpause" }
{ "cmd" : "var_send" }
{ "cmd" : "var_clear" }
{ "cmd" : "var_exit" }
{ "cmd" : "var_cycle",
"period" : <int>
}
Messages sent from Server to Client
=================================
{ "msg_type" : "error",
"error_text" : <str>
}
{ "msg_type" : "var_list"
"time" : <double>
"values" : []
}
**************************************************************************/
#ifndef WSSESSION_HH
#define WSSESSION_HH
@ -10,36 +35,32 @@ LIBRARY DEPENDENCIES:
#include <string>
#include <mongoose.h>
#include "WebSocketSession.hh"
#include "WSSessionVariable.hh"
#include "VariableServerVariable.hh"
class VariableServerSession : public WebSocketSession {
public:
VariableServerSession(struct mg_connection *nc);
void stageData(); /* -- base */
void sendMessage(); /* -- base */
int handleMessage(std::string); /* -- base */
void stageData(); /* -- virtual base */
void sendMessage(); /* -- virtual base */
int handleMessage(std::string); /* -- virtual base */
void setTimeInterval(unsigned int milliseconds); /* -- VariableServerSession specific */
void addVariable(char* vname); /* -- VariableServerSession specific */
void stageVariableValues(); /* -- VariableServerSession specific */
void pause(); /* -- VariableServerSession specific */
void unpause(); /* -- VariableServerSession specific */
void clear(); /* -- VariableServerSession specific */
void exit(); /* -- VariableServerSession specific */
void setTimeInterval(unsigned int milliseconds);
void addVariable(char* vname);
void stageVariableValues();
void pause();
void unpause();
void clear();
void exit();
static int bad_ref_int ;
private:
int sendErrorMessage(const char* fmt, ... );
//struct mg_connection* connection; /* -- Base */
REF2* make_error_ref(const char* in_name); /* -- VariableServerSession specific */
double stageTime; /* -- VariableServerSession specific */
std::vector<WSsessionVariable*> sessionVariables; /* -- VariableServerSession specific */
bool cyclicSendEnabled; /* -- VariableServerSession specific */
long long nextTime; /* -- VariableServerSession specific */
long long intervalTimeTics; /* -- VariableServerSession specific */
REF2* make_error_ref(const char* in_name);
double stageTime;
std::vector<VariableServerVariable*> sessionVariables;
bool cyclicSendEnabled;
long long nextTime;
long long intervalTimeTics;
};
#endif

View File

@ -1,10 +1,10 @@
/*************************************************************************
PURPOSE: (Represent Websocket variable server variable.)
LIBRARY DEPENDENCIES:
( (../src/WSSessionVariable.o))
( (../src/VariableServerVariable.o))
**************************************************************************/
#ifndef WSSESSIONVARIABLE_HH
#define WSSESSIONVARIABLE_HH
#ifndef VARIABLE_SERVER_VARIABLE_HH
#define VARIABLE_SERVER_VARIABLE_HH
#include <time.h>
#include <vector>
@ -14,17 +14,17 @@ LIBRARY DEPENDENCIES:
#define MAX_ARRAY_LENGTH 4096
class WSsessionVariable {
class VariableServerVariable {
public:
WSsessionVariable( REF2* variableType);
~WSsessionVariable();
VariableServerVariable( REF2* variableType);
~VariableServerVariable();
const char* getName();
void stageValue();
void writeValue( std::ostream& chkpnt_os );
private:
WSsessionVariable() {}
VariableServerVariable() {}
REF2 *varInfo;
void *address;
int size;

View File

@ -1,7 +1,5 @@
/*************************************************************************
PURPOSE: (Represent Websocket variable server connection.)
LIBRARY DEPENDENCIES:
( (../src/WSSession.o))
PURPOSE: (Represent Websocket connection.)
**************************************************************************/
#ifndef WEB_SOCKET_SESSION_HH
#define WEB_SOCKET_SESSION_HH

View File

@ -1,21 +1,19 @@
/*************************************************************************
PURPOSE: (Represent the state and initial conditions of an http server)
PURPOSE: (Represent the state and initial conditions of an http server.)
LIBRARY DEPENDENCIES:
( (../src/http_server.cpp))
**************************************************************************/
#ifndef HTTP_SERVER_H
#define HTTP_SERVER_H
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <mongoose.h>
#include <pthread.h>
#include <string>
#include <map>
#include <mongoose.h>
#include <pthread.h>
#include "../include/WebSocketSession.hh"
typedef void (*httpMethodHandler)(struct mg_connection *, struct http_message *);
typedef WebSocketSession* (*WebSocketSessionMaker)(struct mg_connection *nc);
class HTTP_Server {
public:
@ -26,8 +24,10 @@ class HTTP_Server {
pthread_t server_thread; /* ** */
bool shutting_down;
std::map< std::string, httpMethodHandler> httpMethodHandlerMap; /* ** */
pthread_mutex_t APIMapLock; /* ** */
std::map< std::string, httpMethodHandler> httpGETHandlerMap; /* ** */
pthread_mutex_t httpGETHandlerMapLock; /* ** */
std::map< std::string, WebSocketSessionMaker> WebSocketSessionMakerMap; /* ** */
pthread_mutex_t WebSocketSessionMakerMapLock; /* ** */
std::map<mg_connection*, WebSocketSession*> sessionMap; /* ** */
pthread_mutex_t sessionMapLock; /* ** */
struct mg_serve_http_opts http_server_options; /* ** mongoose*/
@ -40,11 +40,16 @@ class HTTP_Server {
int http_top_of_frame();
int http_shutdown();
void sendSessionMessages(struct mg_connection *nc);
void handleClientMessage(struct mg_connection *nc, std::string msg);
void addSession(struct mg_connection *nc, WebSocketSession* session);
void deleteSession(struct mg_connection *nc);
void install_API_GET_handler(std::string APIname, httpMethodHandler handler);
void handle_API_GET_request(struct mg_connection *nc, http_message *hm, std::string handlerName);
void installWebSocketSessionMaker(std::string name, WebSocketSessionMaker creater);
void installHTTPGEThandler(std::string APIname, httpMethodHandler handler);
// These are internals and should be considered public. They are not private only
// because they need to be callable from the servers event handler.
void sendWebSocketSessionMessages(struct mg_connection *nc);
void handleWebSocketClientMessage(struct mg_connection *nc, std::string msg);
void addWebSocketSession(struct mg_connection *nc, WebSocketSession* session);
void deleteWebSocketSession(struct mg_connection *nc);
WebSocketSession* makeWebSocketSession(struct mg_connection *nc, std::string name);
void handleHTTPGETrequest(struct mg_connection *nc, http_message *hm, std::string handlerName);
};
#endif

View File

@ -1,7 +1,9 @@
/************************************************************************
PURPOSE: (Represent the state and initial conditions of an http server)
LIBRARY DEPENDENCIES:
((simpleJSON.o))
((simpleJSON.o)
(VariableServerVariable.o)
)
**************************************************************************/
#include <sstream>
#include <iomanip> // for setprecision
@ -81,14 +83,14 @@ void VariableServerSession::addVariable(char* vname){
if ( new_ref != NULL ) {
// This REF2 object will "belong" to the VariableServerSessionVariable, so it has
// the right and responsibility to free() it in its destructor.
WSsessionVariable *sessionVariable = new WSsessionVariable( new_ref ) ;
VariableServerVariable *sessionVariable = new VariableServerVariable( new_ref ) ;
sessionVariables.push_back( sessionVariable ) ;
}
}
void VariableServerSession::stageVariableValues() {
stageTime = (double)exec_get_time_tics() / exec_get_time_tic_value();
std::vector<WSsessionVariable*>::iterator it;
std::vector<VariableServerVariable*>::iterator it;
for (it = sessionVariables.begin(); it != sessionVariables.end(); it++ ) {
(*it)->stageValue();
}
@ -103,7 +105,7 @@ void VariableServerSession::stageData() {
}
void VariableServerSession::sendMessage() {
std::vector<WSsessionVariable*>::iterator it;
std::vector<VariableServerVariable*>::iterator it;
std::stringstream ss;
ss << "{ \"msg_type\" : \"values\",\n";
@ -125,7 +127,7 @@ void VariableServerSession::pause() { cyclicSendEnabled = false; }
void VariableServerSession::unpause() { cyclicSendEnabled = true; }
void VariableServerSession::clear() {
std::vector<WSsessionVariable*>::iterator it;
std::vector<VariableServerVariable*>::iterator it;
it = sessionVariables.begin();
while (it != sessionVariables.end()) {
delete *it;

View File

@ -1,9 +1,9 @@
#include "trick/memorymanager_c_intf.h" // for get_size.
#include "../include/WSSessionVariable.hh"
#include "../include/VariableServerVariable.hh"
#include <math.h> // for fpclassify
#include <iomanip> // for setprecision
WSsessionVariable::WSsessionVariable(REF2 * ref ) {
VariableServerVariable::VariableServerVariable(REF2 * ref ) {
varInfo = ref;
address = varInfo->address;
size = varInfo->attr->size ;
@ -41,12 +41,12 @@ WSsessionVariable::WSsessionVariable(REF2 * ref ) {
stageBuffer = calloc(size, 1) ;
}
WSsessionVariable::~WSsessionVariable() {
VariableServerVariable::~VariableServerVariable() {
if (varInfo != NULL) free( varInfo );
}
const char* WSsessionVariable::getName() {
const char* VariableServerVariable::getName() {
return varInfo->reference;
}
@ -66,7 +66,7 @@ static void write_quoted_str( std::ostream& os, const char* s) {
os << "\"" ;
}
void WSsessionVariable::stageValue() {
void VariableServerVariable::stageValue() {
// Copy <size> bytes from <address> to staging_point.
if ( varInfo->attr->type == TRICK_STRING ) {
@ -82,7 +82,7 @@ void WSsessionVariable::stageValue() {
}
}
void WSsessionVariable::writeValue( std::ostream& outs ) {
void VariableServerVariable::writeValue( std::ostream& outs ) {
switch(varInfo->attr->type) {
case TRICK_UNSIGNED_CHARACTER:

View File

@ -1,73 +1,51 @@
/************************************************************************
PURPOSE: (Represent the state and initial conditions of an http server)
LIBRARY DEPENDENCIES:
(
(VariableServerSession.o)
(WSSessionVariable.o)
(http_GET_handlers.o)
)
((VariableServerSession.o)
(http_GET_handlers.o))
**************************************************************************/
/*
Messages sent from Client to Server
================================
{ "cmd" : "var_add",
"var_name" : <str>
}
{ "cmd" : "var_pause" }
{ "cmd" : "var_unpause" }
{ "cmd" : "var_send" }
{ "cmd" : "var_clear" }
{ "cmd" : "var_exit" }
{ "cmd" : "var_cycle",
"period" : <int>
}
Messages sent from Server to Client
=================================
{ "msg_type" : "error",
"error_text" : <str>
}
{ "msg_type" : "var_list"
"time" : <double>
"values" : []
}
*/
#include <iostream>
#include "../include/http_server.hh"
#include "../include/VariableServerSession.hh"
#include "../include/http_GET_handlers.hh"
#include "../include/VariableServerSession.hh"
static const struct mg_str s_get_method = MG_MK_STR("GET");
static const struct mg_str s_put_method = MG_MK_STR("PUT");
static const struct mg_str s_delete_method = MG_MK_STR("DELETE");
static const struct mg_str api_prefix = MG_MK_STR("/api/v1/");
static const struct mg_str http_api_prefix = MG_MK_STR("/api/http/");
static const struct mg_str ws_api_prefix = MG_MK_STR("/api/ws/");
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) {
http_message *hm = (struct http_message *)ev_data;
HTTP_Server* hs = (HTTP_Server *)nc->user_data;
HTTP_Server* httpServer = (HTTP_Server *)nc->user_data;
switch(ev) {
case MG_EV_WEBSOCKET_HANDSHAKE_DONE: {
case MG_EV_WEBSOCKET_HANDSHAKE_DONE: { // Process new websocket connection.
std::string uri(hm->uri.p, hm->uri.len);
std::cout << "WEBSOCKET_REQUEST: URI = \"" << uri << "\"" << std::endl;
if (mg_str_starts_with(hm->uri, ws_api_prefix)) {
std::string wsType (hm->uri.p + ws_api_prefix.len, hm->uri.len - ws_api_prefix.len);
WebSocketSession* session = httpServer->makeWebSocketSession(nc, wsType);
if (session != NULL) {
httpServer->addWebSocketSession(nc, session);
std::cout << "WEBSOCKET[" << (void*)nc << "] OPENED. URI=\"" << uri << "\"." << std::endl;
// Create a session object to store information about this web-socket connection.
WebSocketSession* session = new VariableServerSession(nc);
hs->addSession(nc, session);
} else {
nc->flags |= MG_F_SEND_AND_CLOSE;
std::cout << "ERROR: No such web socket interface: \"" << uri << "\"." << std::endl;
}
}
} break;
case MG_EV_WEBSOCKET_FRAME: { // Process websocket messages from the client (web browser).
struct websocket_message *wm = (struct websocket_message *) ev_data;
std::string msg ((char*)wm->data, wm->size);
std::cout << "WEBSOCKET[" << (void*)nc << "] RECIEVED: " << msg << std::endl;
if (nc->flags & MG_F_IS_WEBSOCKET) {
hs->handleClientMessage(nc, msg);
httpServer->handleWebSocketClientMessage(nc, msg);
}
} break;
case MG_EV_CLOSE: {
case MG_EV_CLOSE: { // Process closed websocket connection.
if (nc->flags & MG_F_IS_WEBSOCKET) {
hs->deleteSession(nc);
httpServer->deleteWebSocketSession(nc);
std::cout << "WEBSOCKET[" << (void*)nc << "] CLOSED." << std::endl;
}
} break;
@ -76,16 +54,16 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) {
// called periodically by the threaded function connectionAttendant() [below].
// Send websocket messages to the client (web browser).
if (nc->flags & MG_F_IS_WEBSOCKET) {
hs->sendSessionMessages(nc);
httpServer->sendWebSocketSessionMessages(nc);
}
} break;
case MG_EV_HTTP_REQUEST: {
case MG_EV_HTTP_REQUEST: { // Process HTTP requests.
std::string uri(hm->uri.p, hm->uri.len);
std::cout << "HTTP_REQUEST: URI = \"" << uri << "\"" << std::endl;
if (mg_str_starts_with(hm->uri, api_prefix)) {
if (mg_str_starts_with(hm->uri, http_api_prefix)) {
if (mg_strcmp(hm->method, s_get_method)==0) {
std::string handlerName (hm->uri.p + api_prefix.len, hm->uri.len - api_prefix.len);
hs->handle_API_GET_request(nc, hm, handlerName);
std::string handlerName (hm->uri.p + http_api_prefix.len, hm->uri.len - http_api_prefix.len);
httpServer->handleHTTPGETrequest(nc, hm, handlerName);
} else if (mg_strcmp(hm->method, s_put_method)==0) {
mg_http_send_error(nc, 405, "PUT method not allowed.");
} else if (mg_strcmp(hm->method, s_delete_method)==0) {
@ -93,7 +71,7 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) {
}
} else {
// Serve the files in the document-root directory, as specified by the URI.
mg_serve_http(nc, (struct http_message *) ev_data, hs->http_server_options);
mg_serve_http(nc, (struct http_message *) ev_data, httpServer->http_server_options);
}
} break;
default: {
@ -121,16 +99,34 @@ static void* connectionAttendant (void* arg) {
return NULL;
}
void HTTP_Server::install_API_GET_handler(std::string APIname, httpMethodHandler handler) {
pthread_mutex_lock(&APIMapLock);
httpMethodHandlerMap.insert(std::pair<std::string, httpMethodHandler>(APIname, handler));
pthread_mutex_unlock(&APIMapLock);
void HTTP_Server::installWebSocketSessionMaker(std::string name, WebSocketSessionMaker maker) {
pthread_mutex_lock(&WebSocketSessionMakerMapLock);
WebSocketSessionMakerMap.insert(std::pair<std::string, WebSocketSessionMaker>(name, maker));
pthread_mutex_unlock(&WebSocketSessionMakerMapLock);
}
void HTTP_Server::handle_API_GET_request(struct mg_connection *nc, http_message *hm, std::string handlerName) {
WebSocketSession* HTTP_Server::makeWebSocketSession(struct mg_connection *nc, std::string name) {
std::map<std::string, WebSocketSessionMaker>::iterator iter;
iter = WebSocketSessionMakerMap.find(name);
if (iter != WebSocketSessionMakerMap.end()) {
WebSocketSessionMaker maker = iter->second;
return maker(nc);
} else {
return NULL;
mg_http_send_error(nc, 404, "No such API.");
}
}
void HTTP_Server::installHTTPGEThandler(std::string APIname, httpMethodHandler handler) {
pthread_mutex_lock(&httpGETHandlerMapLock);
httpGETHandlerMap.insert(std::pair<std::string, httpMethodHandler>(APIname, handler));
pthread_mutex_unlock(&httpGETHandlerMapLock);
}
void HTTP_Server::handleHTTPGETrequest(struct mg_connection *nc, http_message *hm, std::string handlerName) {
std::map<std::string, httpMethodHandler>::iterator iter;
iter = httpMethodHandlerMap.find(handlerName);
if (iter != httpMethodHandlerMap.end()) {
iter = httpGETHandlerMap.find(handlerName);
if (iter != httpGETHandlerMap.end()) {
httpMethodHandler handler = iter->second;
handler(nc, hm);
} else {
@ -138,7 +134,7 @@ void HTTP_Server::handle_API_GET_request(struct mg_connection *nc, http_message
}
}
void HTTP_Server::sendSessionMessages(struct mg_connection *nc) {
void HTTP_Server::sendWebSocketSessionMessages(struct mg_connection *nc) {
// Find the session that goes with the given websocket connection,
// and tell it to send its values to the client (web browser).
std::map<mg_connection*, WebSocketSession*>::iterator iter;
@ -149,7 +145,7 @@ void HTTP_Server::sendSessionMessages(struct mg_connection *nc) {
}
}
void HTTP_Server::deleteSession(struct mg_connection *nc) {
void HTTP_Server::deleteWebSocketSession(struct mg_connection *nc) {
std::map<mg_connection*, WebSocketSession*>::iterator iter;
iter = sessionMap.find(nc);
if (iter != sessionMap.end()) {
@ -158,7 +154,7 @@ void HTTP_Server::deleteSession(struct mg_connection *nc) {
sessionMap.erase(iter);
}
}
void HTTP_Server::handleClientMessage(struct mg_connection *nc, std::string msg) {
void HTTP_Server::handleWebSocketClientMessage(struct mg_connection *nc, std::string msg) {
std::map<mg_connection*, WebSocketSession*>::iterator iter;
iter = sessionMap.find(nc);
if (iter != sessionMap.end()) {
@ -167,7 +163,7 @@ void HTTP_Server::handleClientMessage(struct mg_connection *nc, std::string msg)
}
}
void HTTP_Server::addSession(struct mg_connection *nc, WebSocketSession* session) {
void HTTP_Server::addWebSocketSession(struct mg_connection *nc, WebSocketSession* session) {
pthread_mutex_lock(&sessionMapLock);
sessionMap.insert( std::pair<mg_connection*, WebSocketSession*>(nc, session) );
pthread_mutex_unlock(&sessionMapLock);
@ -184,13 +180,19 @@ int HTTP_Server::http_default_data() {
return 0;
}
static WebSocketSession* makeVariableServerSession( struct mg_connection *nc ) {
return new VariableServerSession(nc);
}
int HTTP_Server::http_init() {
http_server_options.document_root = document_root;
http_server_options.enable_directory_listing = "yes";
install_API_GET_handler("vs_connections", &handle_HTTP_GET_vs_connections);
install_API_GET_handler("alloc_info", &handle_HTTP_GET_alloc_info);
installHTTPGEThandler("vs_connections", &handle_HTTP_GET_vs_connections);
installHTTPGEThandler("alloc_info", &handle_HTTP_GET_alloc_info);
installWebSocketSessionMaker("VariableServer", &makeVariableServerSession);
//installWebSocketSessionMaker("VariableServer", [](struct mg_connection *nc) -> WebSocketSession* { return new VariableServerSession(nc); } );
mg_mgr_init( &mgr, NULL );