mirror of
https://github.com/OpenMTC/OpenMTC.git
synced 2025-01-04 12:14:19 +00:00
22 KiB
22 KiB
ipe-sensors-actuators demo app
The ipe-sensors-actuators demo applications generates data from (virtual) sensors and sends them to the CSE. It further receives and processes commands meant for (virtual) actuators attaced to the hardware this ipe demo app is running on. The demo application is extended incrementally from the basic app frame to the complete ipe-sensors-actuators demo application.
Step 1: onem2m-ipe-sensors-actuators-01.py
- this file is equivalent to onem2m-ipe-sensors-final.py
- onem2m-ipe-sensors-final.py will be completed from this point for actuator support
- added variable self.actuators and self._command_containers
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# available actuators
actuators = [
'Switch-AirCon', # AirConditioning
'Switch-Window' # Window
]
# settings for random sensor data generation
threshold = 0.2
temp_range = 25
temp_offset = 10
humi_range = 50
humi_offset = 30
def _on_register(self):
# init variables
self._recognized_sensors = {}
self._recognized_measurement_containers = {}
self._command_containers = {}
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
# at random time intervals
if random() > self.threshold:
# select a random sensor
sensor = self.sensors[int(random() * len(self.sensors))]
# set parameters depending on sensor type
if sensor.startswith('Temp'):
value_range = self.temp_range
value_offset = self.temp_offset
else:
value_range = self.humi_range
value_offset = self.humi_offset
# generate random sensor data
value = int(random() * value_range + value_offset)
self.handle_sensor_data(sensor, value)
def handle_sensor_data(self, sensor, value):
# initialize sensor structure if never done before
if sensor not in self._recognized_sensors:
self.create_sensor_structure(sensor)
self.push_sensor_data(sensor, value)
def create_sensor_structure(self, sensor):
print('initializing sensor: %s' % sensor)
# create sensor container
device_container = Container(resourceName=sensor)
device_container = self.create_container(self._devices_container.path,
device_container,
labels=['sensor'],
max_nr_of_instances=0)
# add sensor to _recognized_sensors
self._recognized_sensors[sensor] = device_container
# create measurements container
labels = ['measurements']
if sensor.startswith('Temp'):
labels.append('temperature')
else:
labels.append('humidity')
measurements_container = Container(resourceName='measurements')
measurements_container = self.create_container(device_container.path,
measurements_container,
labels=labels,
max_nr_of_instances=3)
# add measurements_container from sensor to _recognized_measurement_containers
self._recognized_measurement_containers[sensor] = measurements_container
def push_sensor_data(self, sensor, value):
# build data set with value and metadata
if sensor.startswith('Temp'):
data = {
'value': value,
'type': 'temperature',
'unit': 'degreeC'
}
else:
data = {
'value': value,
'type': 'humidity',
'unit': 'percentage'
}
# print the new data set
print ('%s: %s' % (sensor, data))
# finally, push the data set to measurements_container of the sensor
self.push_content(self._recognized_measurement_containers[sensor], data)
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 2: onem2m-ipe-sensors-actuators-02.py
- adds the creation of a container for each actuator of this ipe inside the devices_container
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# available actuators
actuators = [
'Switch-AirCon', # AirConditioning
'Switch-Window' # Window
]
# settings for random sensor data generation
threshold = 0.2
temp_range = 25
temp_offset = 10
humi_range = 50
humi_offset = 30
def _on_register(self):
# init variables
self._recognized_sensors = {}
self._recognized_measurement_containers = {}
self._command_containers = {}
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# create container for each actuator
for actuator in self.actuators:
actuator_container = Container(resourceName=actuator)
self.create_container(
self._devices_container.path, # the target resource/path parenting the Container
actuator_container, # the Container resource or a valid container ID
max_nr_of_instances=0, # the container's max_nr_of_instances (here: 0=unlimited)
labels=['actuator'] # (optional) the container's labels
)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
# at random time intervals
if random() > self.threshold:
# select a random sensor
sensor = self.sensors[int(random() * len(self.sensors))]
# set parameters depending on sensor type
if sensor.startswith('Temp'):
value_range = self.temp_range
value_offset = self.temp_offset
else:
value_range = self.humi_range
value_offset = self.humi_offset
# generate random sensor data
value = int(random() * value_range + value_offset)
self.handle_sensor_data(sensor, value)
def handle_sensor_data(self, sensor, value):
# initialize sensor structure if never done before
if sensor not in self._recognized_sensors:
self.create_sensor_structure(sensor)
self.push_sensor_data(sensor, value)
def create_sensor_structure(self, sensor):
print('initializing sensor: %s' % sensor)
# create sensor container
device_container = Container(resourceName=sensor)
device_container = self.create_container(self._devices_container.path,
device_container,
labels=['sensor'],
max_nr_of_instances=0)
# add sensor to _recognized_sensors
self._recognized_sensors[sensor] = device_container
# create measurements container
labels = ['measurements']
if sensor.startswith('Temp'):
labels.append('temperature')
else:
labels.append('humidity')
measurements_container = Container(resourceName='measurements')
measurements_container = self.create_container(device_container.path,
measurements_container,
labels=labels,
max_nr_of_instances=3)
# add measurements_container from sensor to _recognized_measurement_containers
self._recognized_measurement_containers[sensor] = measurements_container
def push_sensor_data(self, sensor, value):
# build data set with value and metadata
if sensor.startswith('Temp'):
data = {
'value': value,
'type': 'temperature',
'unit': 'degreeC'
}
else:
data = {
'value': value,
'type': 'humidity',
'unit': 'percentage'
}
# print the new data set
print ('%s: %s' % (sensor, data))
# finally, push the data set to measurements_container of the sensor
self.push_content(self._recognized_measurement_containers[sensor], data)
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 3: onem2m-ipe-sensors-actuators-03.py
- adds the creation of a commands_container inside each actuator_container
- adds this new commands_container to the self._command_containers variable
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# available actuators
actuators = [
'Switch-AirCon', # AirConditioning
'Switch-Window' # Window
]
# settings for random sensor data generation
threshold = 0.2
temp_range = 25
temp_offset = 10
humi_range = 50
humi_offset = 30
def _on_register(self):
# init variables
self._recognized_sensors = {}
self._recognized_measurement_containers = {}
self._command_containers = {}
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# create container for each actuator
for actuator in self.actuators:
actuator_container = Container(resourceName=actuator)
self.create_container(
self._devices_container.path, # the target resource/path parenting the Container
actuator_container, # the Container resource or a valid container ID
max_nr_of_instances=0, # the container's max_nr_of_instances (here: 0=unlimited)
labels=['actuator'] # (optional) the container's labels
)
# create container for the commands of the actuators
commands_container = Container(resourceName='commands')
commands_container = self.create_container(
actuator_container.path,
commands_container,
max_nr_of_instances=3,
labels=['commands']
)
# add commands_container of current actuator to self._command_containers
self._command_containers[actuator] = commands_container
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
# at random time intervals
if random() > self.threshold:
# select a random sensor
sensor = self.sensors[int(random() * len(self.sensors))]
# set parameters depending on sensor type
if sensor.startswith('Temp'):
value_range = self.temp_range
value_offset = self.temp_offset
else:
value_range = self.humi_range
value_offset = self.humi_offset
# generate random sensor data
value = int(random() * value_range + value_offset)
self.handle_sensor_data(sensor, value)
def handle_sensor_data(self, sensor, value):
# initialize sensor structure if never done before
if sensor not in self._recognized_sensors:
self.create_sensor_structure(sensor)
self.push_sensor_data(sensor, value)
def create_sensor_structure(self, sensor):
print('initializing sensor: %s' % sensor)
# create sensor container
device_container = Container(resourceName=sensor)
device_container = self.create_container(self._devices_container.path,
device_container,
labels=['sensor'],
max_nr_of_instances=0)
# add sensor to _recognized_sensors
self._recognized_sensors[sensor] = device_container
# create measurements container
labels = ['measurements']
if sensor.startswith('Temp'):
labels.append('temperature')
else:
labels.append('humidity')
measurements_container = Container(resourceName='measurements')
measurements_container = self.create_container(device_container.path,
measurements_container,
labels=labels,
max_nr_of_instances=3)
# add measurements_container from sensor to _recognized_measurement_containers
self._recognized_measurement_containers[sensor] = measurements_container
def push_sensor_data(self, sensor, value):
# build data set with value and metadata
if sensor.startswith('Temp'):
data = {
'value': value,
'type': 'temperature',
'unit': 'degreeC'
}
else:
data = {
'value': value,
'type': 'humidity',
'unit': 'percentage'
}
# print the new data set
print ('%s: %s' % (sensor, data))
# finally, push the data set to measurements_container of the sensor
self.push_content(self._recognized_measurement_containers[sensor], data)
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 4: onem2m-ipe-sensors-actuators-04.py
- adds the subscription of each commands_container to the handler handle_command()
- adds the handler function handle_command()
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# available actuators
actuators = [
'Switch-AirCon', # AirConditioning
'Switch-Window' # Window
]
# settings for random sensor data generation
threshold = 0.2
temp_range = 25
temp_offset = 10
humi_range = 50
humi_offset = 30
def _on_register(self):
# init variables
self._recognized_sensors = {}
self._recognized_measurement_containers = {}
self._command_containers = {}
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# create container for each actuator
for actuator in self.actuators:
actuator_container = Container(resourceName=actuator)
self.create_container(
self._devices_container.path, # the target resource/path parenting the Container
actuator_container, # the Container resource or a valid container ID
max_nr_of_instances=0, # the container's max_nr_of_instances (here: 0=unlimited)
labels=['actuator'] # (optional) the container's labels
)
# create container for the commands of the actuators
commands_container = Container(resourceName='commands')
commands_container = self.create_container(
actuator_container.path,
commands_container,
max_nr_of_instances=3,
labels=['commands']
)
# add commands_container of current actuator to self._command_containers
self._command_containers[actuator] = commands_container
# subscribe to command container of each actuator to the handler command
self.add_container_subscription(
commands_container.path, # the Container or it's path to be subscribed
self.handle_command # reference of the notification handling function
)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def handle_command(self, container, value):
print('handle_command...')
print('container: %s' % container)
print('value: %s' % value)
print('')
def get_random_data(self):
# at random time intervals
if random() > self.threshold:
# select a random sensor
sensor = self.sensors[int(random() * len(self.sensors))]
# set parameters depending on sensor type
if sensor.startswith('Temp'):
value_range = self.temp_range
value_offset = self.temp_offset
else:
value_range = self.humi_range
value_offset = self.humi_offset
# generate random sensor data
value = int(random() * value_range + value_offset)
self.handle_sensor_data(sensor, value)
def handle_sensor_data(self, sensor, value):
# initialize sensor structure if never done before
if sensor not in self._recognized_sensors:
self.create_sensor_structure(sensor)
self.push_sensor_data(sensor, value)
def create_sensor_structure(self, sensor):
print('initializing sensor: %s' % sensor)
# create sensor container
device_container = Container(resourceName=sensor)
device_container = self.create_container(self._devices_container.path,
device_container,
labels=['sensor'],
max_nr_of_instances=0)
# add sensor to _recognized_sensors
self._recognized_sensors[sensor] = device_container
# create measurements container
labels = ['measurements']
if sensor.startswith('Temp'):
labels.append('temperature')
else:
labels.append('humidity')
measurements_container = Container(resourceName='measurements')
measurements_container = self.create_container(device_container.path,
measurements_container,
labels=labels,
max_nr_of_instances=3)
# add measurements_container from sensor to _recognized_measurement_containers
self._recognized_measurement_containers[sensor] = measurements_container
def push_sensor_data(self, sensor, value):
# build data set with value and metadata
if sensor.startswith('Temp'):
data = {
'value': value,
'type': 'temperature',
'unit': 'degreeC'
}
else:
data = {
'value': value,
'type': 'humidity',
'unit': 'percentage'
}
# print the new data set
print ('%s: %s' % (sensor, data))
# finally, push the data set to measurements_container of the sensor
self.push_content(self._recognized_measurement_containers[sensor], data)
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE(
poas=['http://localhost:21346'], # adds poas in order to receive notifications
)
Runner(app).run(host)