Debug concurrency problem exposed on macOS, use cond_wait instead of mutex

Co-authored-by: Caleb Herpin caleb.herpin@gmail.com
This commit is contained in:
Scott Fennell 2021-08-02 15:00:50 -05:00
parent f0b2857f00
commit 36013eefdf
4 changed files with 35 additions and 19 deletions

View File

@ -42,6 +42,8 @@ class MyCivetServer {
pthread_t server_thread; /* ** */ pthread_t server_thread; /* ** */
bool sessionDataMarshalled; /* ** */ bool sessionDataMarshalled; /* ** */
pthread_mutex_t lock_loop; /* ** */ pthread_mutex_t lock_loop; /* ** */
pthread_cond_t cond_loop;
bool service_connections = true;
bool shutting_down; /* ** */ bool shutting_down; /* ** */
std::map<std::string, WebSocketSessionMaker> WebSocketSessionMakerMap; /* ** */ std::map<std::string, WebSocketSessionMaker> WebSocketSessionMakerMap; /* ** */
@ -80,4 +82,4 @@ struct Data {
}; };
#endif #endif
#endif #endif

View File

@ -34,23 +34,20 @@ class TestWebserverWs:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_time(self, time_path): async def test_time(self, time_path):
if "macos" in platform.platform().lower(): if params.get_test_time():
logging.warning("Time endpoint is currently not working on MacOS. Skipping this test.") async with websockets.connect(time_path, ssl=TestWebserverWs.ssl_context) as websocket:
await websocket.send("LOCAL")
count = 0
while count < 2:
message = await websocket.recv()
test_format = "Time: %H:%M Date: %m/%d/%Y\n" #Not checking seconds.
time = datetime.datetime.strftime(datetime.datetime.strptime(message, "Time: %H:%M:%S Date: %m/%d/%Y\n"), test_format)
test_time = datetime.datetime.now().strftime(test_format)
print("Checking:", time, "=", test_time)
assert time == test_time
count += 1
else: else:
if params.get_test_time(): raise RuntimeError("Parameter test_time is disabled.")
async with websockets.connect(time_path, ssl=TestWebserverWs.ssl_context) as websocket:
await websocket.send("LOCAL")
count = 0
while count < 2:
message = await websocket.recv()
test_format = "Time: %H:%M Date: %m/%d/%Y\n" #Not checking seconds.
time = datetime.datetime.strftime(datetime.datetime.strptime(message, "Time: %H:%M:%S Date: %m/%d/%Y\n"), test_format)
test_time = datetime.datetime.now().strftime(test_format)
print("Checking:", time, "=", test_time)
assert time == test_time
count += 1
else:
raise RuntimeError("Parameter test_time is disabled.")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_variable_server_vars(self, variable_server_path): async def test_variable_server_vars(self, variable_server_path):

View File

@ -28,12 +28,14 @@ PURPOSE: (Represent the state and initial conditions for my server)
void MyCivetServer::deleteWebSocketSession(struct mg_connection * nc) { void MyCivetServer::deleteWebSocketSession(struct mg_connection * nc) {
std::map<mg_connection*, WebSocketSession*>::iterator iter; std::map<mg_connection*, WebSocketSession*>::iterator iter;
pthread_mutex_lock(&WebSocketSessionMapLock);
iter = webSocketSessionMap.find(nc); iter = webSocketSessionMap.find(nc);
if (iter != webSocketSessionMap.end()) { if (iter != webSocketSessionMap.end()) {
WebSocketSession* session = iter->second; WebSocketSession* session = iter->second;
delete session; delete session;
webSocketSessionMap.erase(iter); webSocketSessionMap.erase(iter);
} }
pthread_mutex_unlock(&WebSocketSessionMapLock);
} }
static const char * style_css = static const char * style_css =
@ -205,8 +207,13 @@ void* main_loop(void* S) {
MyCivetServer* server = (MyCivetServer*) S; MyCivetServer* server = (MyCivetServer*) S;
bool messageSent; bool messageSent;
while(1) { while(1) {
pthread_mutex_lock(&server->lock_loop); pthread_mutex_lock(&server->lock_loop);
while (!server->service_connections)
pthread_cond_wait(&server->cond_loop, &server->lock_loop);
//pthread_mutex_lock(&server->lock_loop);
//pthread_cond_wait(&server->cond_loop, &server->lock_loop);
if (server->shutting_down) { if (server->shutting_down) {
return NULL; return NULL;
} }
@ -219,13 +226,17 @@ void* main_loop(void* S) {
pthread_mutex_lock(&server->WebSocketSessionMapLock); pthread_mutex_lock(&server->WebSocketSessionMapLock);
for (iter = server->webSocketSessionMap.begin(); iter != server->webSocketSessionMap.end(); iter++ ) { for (iter = server->webSocketSessionMap.begin(); iter != server->webSocketSessionMap.end(); iter++ ) {
WebSocketSession* session = iter->second; WebSocketSession* session = iter->second;
message_publish(MSG_DEBUG, "Sending message...\n");
session->sendMessage(); session->sendMessage();
message_publish(MSG_DEBUG, "Message sent.\n");
messageSent = true; messageSent = true;
} }
if (messageSent) { //If any message was sent we say the data is now not marshalled. if (messageSent) { //If any message was sent we say the data is now not marshalled.
server->sessionDataMarshalled = false; server->sessionDataMarshalled = false;
} }
pthread_mutex_unlock(&server->WebSocketSessionMapLock); pthread_mutex_unlock(&server->WebSocketSessionMapLock);
server->service_connections = false;
pthread_mutex_unlock(&server->lock_loop);
} }
} }
@ -315,13 +326,18 @@ int MyCivetServer::http_top_of_frame() {
if (time_homogeneous) { if (time_homogeneous) {
marshallWebSocketSessionData(); marshallWebSocketSessionData();
} }
message_publish(MSG_DEBUG, "Top of frame.\n");
unlockConnections(); unlockConnections();
} }
return 0; return 0;
} }
void MyCivetServer::unlockConnections() { void MyCivetServer::unlockConnections() {
pthread_mutex_unlock(&lock_loop); //pthread_mutex_unlock(&lock_loop);
pthread_mutex_lock(&lock_loop);
service_connections = true;
pthread_cond_signal(&cond_loop);
pthread_mutex_unlock(&lock_loop);
// std::map<mg_connection*, WebSocketSession*>::iterator iter; // std::map<mg_connection*, WebSocketSession*>::iterator iter;
// pthread_mutex_lock(&WebSocketSessionMapLock); // pthread_mutex_lock(&WebSocketSessionMapLock);
// for (iter = webSocketSessionMap.begin(); iter != webSocketSessionMap.end(); iter++ ) { // for (iter = webSocketSessionMap.begin(); iter != webSocketSessionMap.end(); iter++ ) {

View File

@ -92,6 +92,7 @@ int parent_http_handler(struct mg_connection* conn, void *data) {
return http_send_error(conn, 405, msg.c_str(), msg.size(), 100); return http_send_error(conn, 405, msg.c_str(), msg.size(), 100);
} }
} }
// TODO add return value
} }
void handle_HTTP_GET_vs_connections(struct mg_connection* conn, void *cbdata) { void handle_HTTP_GET_vs_connections(struct mg_connection* conn, void *cbdata) {