Fixed some tests

This commit is contained in:
Caleb Herpin 2021-07-21 17:08:52 -05:00
parent 9a7e0575ee
commit 50ccb70c4f
4 changed files with 126 additions and 64 deletions

View File

@ -128,7 +128,7 @@ void handle_HTTP_GET_alloc_info(struct mg_connection *conn, void* ignore) {
} }
if (error_code2 < 0) { if (error_code2 < 0) {
message_publish(MSG_WARNING, "Could not find uri param: count. Error code: %i\n", error_code2); message_publish(MSG_WARNING, "Could not find uri param: count. Error code: %i\n", error_code2);
strncpy(start_str, "0", 1); strncpy(count_str, "10", 2); //By default we show 10.
} }
mg_send_http_ok(conn, "text/plain", -1); mg_send_http_ok(conn, "text/plain", -1);
int start = strtol(start_str, NULL, 0); int start = strtol(start_str, NULL, 0);

View File

@ -0,0 +1,39 @@
class Params:
#Change the following to change the default parameters
def __init__(self) -> None:
self.__port = 8888
self.__var_server_port = 5001
self.__host = "localhost"
self.__enable_ssl = False
self.__test_time = True
# self.__ssl_cert_path = "server.pem"
# self.__ssl_cert_path = "/home/cherpin/git/trick_fork/trick_sims/Cannon/SIM_cannon_numeric/server.pem"
self.__ssl_cert_path = "/home/cherpin/.ssl/server.pem"
def get_ssl_cert_path(self):
return self.__ssl_cert_path
def get_port(self):
return self.__port
def get_host(self):
return self.__host
def get_ssl_enable(self):
return self.__enable_ssl
def get_var_server_port(self):
return self.__var_server_port
def get_test_time(self):
return self.__test_time
def get_url(self, endpoint):
server_port = self.get_port()
server_host = self.get_host()
ssl_enable = self.get_ssl_enable()
base_url = f"http{ 's' if ssl_enable else '' }://{server_host}:{server_port}"
return f"{base_url}/{endpoint}"
def get_ws_url(self, endpoint):
return f"ws{ 's' if self.get_ssl_enable() else '' }://{self.get_host()}:{self.get_port()}/{endpoint}"

View File

@ -1,30 +1,35 @@
from _pytest.mark import param
import requests import requests
from pprint import pprint from pprint import pprint
import logging import logging
import os import os
import subprocess import subprocess
host = "localhost:8888" from requests.api import get
from parameters import Params
params = Params()
def test_alloc_info(): def test_alloc_info():
endpoint = "api/http/alloc_info" url = params.get_url("api/http/alloc_info")
url = f"http://{host}/{endpoint}" res = requests.get(url, verify=False)
print("url:", url) data = res.json()
res = requests.get(f"http://{host}/{endpoint}") assert len(data["alloc_list"]) == 10
assert res.json()["alloc_list"] == [] assert data["chunk_size"] == 10
assert data["chunk_start"] == 0
assert data["alloc_total"] == 49
def test_alloc_info_2(): def test_alloc_info_2():
endpoint = "api/http/alloc_info?start=0&count=10" endpoint = "api/http/alloc_info?start=0&count=10"
url = f"http://{host}/{endpoint}" url = params.get_url(endpoint)
print("url:", url) res = requests.get(url, verify=False)
res = requests.get(f"http://{host}/{endpoint}")
assert len(res.json()["alloc_list"]) == 10 assert len(res.json()["alloc_list"]) == 10
def test_vs_connections(): def test_vs_connections():
subprocess.Popen("nc localhost 5001".split()) subprocess.Popen(f"nc localhost {params.get_var_server_port()}".split())
endpoint = "api/http/vs_connections" endpoint = "api/http/vs_connections"
url = f"http://{host}/{endpoint}" url = params.get_url(endpoint)
res = requests.get(url) res = requests.get(url, verify=False)
assert res.json()["variable_server_connections"][0]["connection"]["client_IP_address"] == "127.0.0.1" assert res.json()["variable_server_connections"][0]["connection"]["client_IP_address"] == "127.0.0.1"

View File

@ -1,63 +1,81 @@
import logging
import json
import pytest
import websockets import websockets
import asyncio import asyncio
from time import sleep
import datetime import datetime
import json
port = 8888 import pathlib
import ssl
def send_receive(url, data=""):
async def inner():
async with websockets.connect(url) as websocket:
await websocket.send(data)
response = await websocket.recv()
return response
return asyncio.get_event_loop().run_until_complete(inner())
def send(url, data=""):
async def inner():
async with websockets.connect(url) as websocket:
await websocket.send(data)
return asyncio.get_event_loop().run_until_complete(inner())
def test_time():
time = send_receive(f"ws://localhost:{port}/api/ws/Time", "LOCAL")
test_time = datetime.datetime.strftime(datetime.datetime.now(), "Time: %H:%M:%S Date: %m/%d/%Y\n")
print(time)
print()
print(test_time)
# assert time == test_time
def test_variable_server():
def my_send(data="", receive=True):
if receive:
return send_receive(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
else:
send(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
return None
d = '{"cmd":"var_add","var_name":"dyn.cannon.pos[0]"}'
my_send(str(d), receive = False)
d = '{"cmd":"var_unpause"}' from parameters import Params
r = my_send(str(d)) params = Params()
data = json.loads(r)
print(data)
assert data["msg_type"] == "values"
assert "time" in data
assert len(data["values"]) == 1
if params.get_ssl_enable():
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# localhost_pem = pathlib.Path(__file__).with_name(params.get_ssl_cert_path())
localhost_pem = params.get_ssl_cert_path()
ssl_context.load_verify_locations(localhost_pem)
else:
ssl_context = None
def test_variable_server_sie(): @pytest.fixture(autouse=True)
def my_send(data="", receive=True): def variable_server_path():
if receive: return params.get_ws_url("api/ws/VariableServer")
return send_receive(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
else:
send(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
return None
assert my_send('{ "cmd" : "sie" }') == '{ "msg_type" : "sie", "data" : string}' @pytest.fixture(autouse=True)
def time_path():
return params.get_ws_url("api/ws/Time")
@pytest.mark.asyncio
async def test_time(time_path):
if params.get_test_time():
async with websockets.connect(time_path, ssl=ssl_context) as websocket:
await websocket.send("LOCAL")
count = 0
while count < 10:
message = await websocket.recv()
test_format = "Time: %H:%M Date: %m/%d/%Y\n" #Not checking seconds.
time = datetime.datetime.strftime(datetime.datetime.strptime(message, "Time: %H:%M:%S Date: %m/%d/%Y\n"), test_format)
test_time = datetime.datetime.now().strftime(test_format)
print("Checking:", time, "=", test_time)
assert time == test_time
count += 1
else:
raise RuntimeError("Parameter test_time is disabled.")
@pytest.mark.asyncio
async def test_variable_server_vars(variable_server_path):
msg1 = '{"cmd":"var_add","var_name":"dyn.cannon.pos[0]"}'
msg2 = '{ "cmd" : "var_send" }'
async with websockets.connect(variable_server_path, ssl=ssl_context) as websocket:
await websocket.send(msg1)
await websocket.send(msg2)
message = await websocket.recv()
vars = json.loads(message)
assert vars["msg_type"] == "values"
assert "time" in vars
assert len(vars["values"]) == 1
@pytest.mark.asyncio
async def test_variable_server_sie(variable_server_path):
async with websockets.connect(variable_server_path, ssl=ssl_context) as websocket:
await websocket.send('{ "cmd" : "sie" }')
response = await websocket.recv()
assert response == '{ "msg_type": "sie", "data": '
@pytest.mark.asyncio
async def test_variable_server_units(variable_server_path):
msg1 = '{"cmd":"var_add","var_name":"dyn.cannon.pos[0]"}'
msg2 = '{ "cmd" : "units", "var_name" : "dyn.cannon.pos[0]" }'
async with websockets.connect(variable_server_path, ssl=ssl_context) as websocket:
await websocket.send(msg1)
await websocket.send(msg2)
response = await websocket.recv()
assert response == '{ "msg_type": "units", "var_name": "dyn.cannon.pos[0]", "data": "m"}'
if __name__ == "__main__": if __name__ == "__main__":
test_time() asyncio.get_event_loop().run_until_complete(test_variable_server_send())