mirror of
https://github.com/nasa/trick.git
synced 2025-01-31 08:25:41 +00:00
Added test cases for web server.
This commit is contained in:
parent
917046675e
commit
9a7e0575ee
@ -87,6 +87,9 @@ int parent_http_handler(struct mg_connection* conn, void *data) {
|
||||
} else if (method == "DELETE") {
|
||||
std::string msg = "DELETE method not allowed";
|
||||
return http_send_error(conn, 405, msg.c_str(), msg.size(), 100);
|
||||
} else if (method == "POST") {
|
||||
std::string msg = "POST method not allowed";
|
||||
return http_send_error(conn, 405, msg.c_str(), msg.size(), 100);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -108,16 +111,24 @@ void handle_HTTP_GET_alloc_info(struct mg_connection *conn, void* ignore) {
|
||||
int max_size = 100;
|
||||
char start_str[max_size], count_str[max_size];
|
||||
|
||||
int error_code;
|
||||
int error_code1, error_code2;
|
||||
assert(ri != NULL);
|
||||
if (ri->query_string != NULL) {
|
||||
std::string data = ri->query_string;
|
||||
message_publish(MSG_DEBUG, "query_string = %s\n", data.c_str());
|
||||
error_code = mg_get_var(data.c_str(), strlen(data.c_str()), "start", start_str, max_size);
|
||||
if (error_code < 0) {
|
||||
message_publish(MSG_WARNING, "Could not find uri param: start. Error code: %i\n", error_code);
|
||||
error_code1 = mg_get_var(data.c_str(), strlen(data.c_str()), "start", start_str, max_size);
|
||||
error_code2 = mg_get_var(data.c_str(), strlen(data.c_str()), "count", count_str, max_size);
|
||||
} else {
|
||||
error_code1 = -1;
|
||||
error_code2 = -1;
|
||||
}
|
||||
error_code = mg_get_var(data.c_str(), strlen(data.c_str()), "count", count_str, max_size);
|
||||
if (error_code < 0) {
|
||||
message_publish(MSG_WARNING, "Could not find uri param: count. Error code: %i\n", error_code);
|
||||
if (error_code1 < 0) {
|
||||
message_publish(MSG_WARNING, "Could not find uri param: start. Error code: %i\n", error_code1);
|
||||
strncpy(start_str, "0", 1);
|
||||
}
|
||||
if (error_code2 < 0) {
|
||||
message_publish(MSG_WARNING, "Could not find uri param: count. Error code: %i\n", error_code2);
|
||||
strncpy(start_str, "0", 1);
|
||||
}
|
||||
mg_send_http_ok(conn, "text/plain", -1);
|
||||
int start = strtol(start_str, NULL, 0);
|
||||
|
35
trick_source/web/CivetServer/tests/test_http.py
Normal file
35
trick_source/web/CivetServer/tests/test_http.py
Normal file
@ -0,0 +1,35 @@
|
||||
import requests
|
||||
from pprint import pprint
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
host = "localhost:8888"
|
||||
|
||||
def test_alloc_info():
|
||||
endpoint = "api/http/alloc_info"
|
||||
url = f"http://{host}/{endpoint}"
|
||||
print("url:", url)
|
||||
res = requests.get(f"http://{host}/{endpoint}")
|
||||
assert res.json()["alloc_list"] == []
|
||||
|
||||
def test_alloc_info_2():
|
||||
endpoint = "api/http/alloc_info?start=0&count=10"
|
||||
url = f"http://{host}/{endpoint}"
|
||||
print("url:", url)
|
||||
res = requests.get(f"http://{host}/{endpoint}")
|
||||
assert len(res.json()["alloc_list"]) == 10
|
||||
|
||||
def test_vs_connections():
|
||||
subprocess.Popen("nc localhost 5001".split())
|
||||
endpoint = "api/http/vs_connections"
|
||||
url = f"http://{host}/{endpoint}"
|
||||
res = requests.get(url)
|
||||
assert res.json()["variable_server_connections"][0]["connection"]["client_IP_address"] == "127.0.0.1"
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
test_alloc_info()
|
||||
test_vs_connections()
|
63
trick_source/web/CivetServer/tests/test_ws.py
Normal file
63
trick_source/web/CivetServer/tests/test_ws.py
Normal file
@ -0,0 +1,63 @@
|
||||
import websockets
|
||||
import asyncio
|
||||
import datetime
|
||||
import json
|
||||
|
||||
port = 8888
|
||||
|
||||
def send_receive(url, data=""):
|
||||
async def inner():
|
||||
async with websockets.connect(url) as websocket:
|
||||
await websocket.send(data)
|
||||
response = await websocket.recv()
|
||||
return response
|
||||
return asyncio.get_event_loop().run_until_complete(inner())
|
||||
|
||||
def send(url, data=""):
|
||||
async def inner():
|
||||
async with websockets.connect(url) as websocket:
|
||||
await websocket.send(data)
|
||||
return asyncio.get_event_loop().run_until_complete(inner())
|
||||
|
||||
|
||||
def test_time():
|
||||
time = send_receive(f"ws://localhost:{port}/api/ws/Time", "LOCAL")
|
||||
test_time = datetime.datetime.strftime(datetime.datetime.now(), "Time: %H:%M:%S Date: %m/%d/%Y\n")
|
||||
print(time)
|
||||
print()
|
||||
print(test_time)
|
||||
# assert time == test_time
|
||||
|
||||
def test_variable_server():
|
||||
def my_send(data="", receive=True):
|
||||
if receive:
|
||||
return send_receive(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
|
||||
else:
|
||||
send(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
|
||||
return None
|
||||
|
||||
|
||||
d = '{"cmd":"var_add","var_name":"dyn.cannon.pos[0]"}'
|
||||
my_send(str(d), receive = False)
|
||||
|
||||
d = '{"cmd":"var_unpause"}'
|
||||
r = my_send(str(d))
|
||||
data = json.loads(r)
|
||||
print(data)
|
||||
assert data["msg_type"] == "values"
|
||||
assert "time" in data
|
||||
assert len(data["values"]) == 1
|
||||
|
||||
|
||||
def test_variable_server_sie():
|
||||
def my_send(data="", receive=True):
|
||||
if receive:
|
||||
return send_receive(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
|
||||
else:
|
||||
send(f"ws://localhost:{port}/api/ws/VariableServer", data=data)
|
||||
return None
|
||||
|
||||
assert my_send('{ "cmd" : "sie" }') == '{ "msg_type" : "sie", "data" : string}'
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_time()
|
Loading…
x
Reference in New Issue
Block a user