mirror of
https://github.com/OpenMTC/OpenMTC.git
synced 2025-01-24 04:57:59 +00:00
16 KiB
16 KiB
IPE-sensors demo app
The ipe-sensors demo applications generates data from (virtual) sensors and sends them to the CSE. The demo application is extended incrementally from the basic app frame to the complete IPE-sensors demo application.
Step 1: onem2m-ipe-sensors-01.py
- added base structure
from openmtc_app.onem2m import XAE
class TestIPE(XAE):
remove_registration = True
def _on_register(self):
# log message
self.logger.debug('registered')
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 2: onem2m-ipe-sensors-02.py
- adds creation of a container for devices
- introduces function for random sensor data generation
- introduces endless loop
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
def _on_register(self):
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# create some random data for a random sensor
self.get_random_data()
# log message
self.logger.debug('registered')
# start endless loop
self.run_forever()
def get_random_data(self):
pass
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 3: onem2m-ipe-sensors-03.py
- adds random
- spawns run_forever with get_random_data function every one second
- prints some random value operations
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
def _on_register(self):
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
print('---------')
random_value = random()
print(random_value)
print(random_value * 10)
print(int(random_value * 10))
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 4: onem2m-ipe-sensors-04.py
- introducing list of sensors to create
- introducing settings for random sensor data generation
- adding code for random time intervals
- adding code for random sensor selection
- adding code for random sensor data generation
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# settings for random sensor data generation
threshold = 0.5
value_range = 25
value_offset = 10
def _on_register(self):
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
print('')
# for random time intervals
if random() > self.threshold:
print('got some data')
# select a random sensor
print('available sensors: %s' % self.sensors)
print('number of available sensors: %s' % len(self.sensors))
print('some random sensor: %s' % self.sensors[int(random() * len(self.sensors))])
# generate random sensor data
print('random sensor data: %s' % int(random() * self.value_range + self.value_offset))
else:
print('no data')
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 5: onem2m-ipe-sensors-05.py
- adds different range and offset for temperature and humidity value generation
- introducing self._recognized_sensors variable
- completing get_random_data() function
- introducing handle_sensor_data() function
- introducing create_sensor_structure() function
- introducing push_sensor_data() function
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# settings for random sensor data generation
threshold = 0.2
temp_range = 25
temp_offset = 10
humi_range = 50
humi_offset = 30
def _on_register(self):
# init variables
self._recognized_sensors = {}
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
# at random time intervals
if random() > self.threshold:
# select a random sensor
sensor = self.sensors[int(random() * len(self.sensors))]
# set parameters depending on sensor type
if sensor.startswith('Temp'):
value_range = self.temp_range
value_offset = self.temp_offset
else:
value_range = self.humi_range
value_offset = self.humi_offset
# generate random sensor data
value = int(random() * value_range + value_offset)
self.handle_sensor_data(sensor, value)
def handle_sensor_data(self, sensor, value):
# initialize sensor structure if never done before
if sensor not in self._recognized_sensors:
self.create_sensor_structure(sensor)
self.push_sensor_data(sensor, value)
def create_sensor_structure(self, sensor):
print('I need to create a structure for the sensor %s.' % sensor)
self._recognized_sensors[sensor] = 'something useful'
def push_sensor_data(self, sensor, value):
print('I would push the content %i of %s to the gateway.' % (value, sensor))
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 6: onem2m-ipe-sensors-06.py
- added create sensor container to function create_sensor_structure()
- add sensor to _recognized_sensors
- build data set with value and metadata
- printing out the new data set
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# settings for random sensor data generation
threshold = 0.2
temp_range = 25
temp_offset = 10
humi_range = 50
humi_offset = 30
def _on_register(self):
# init variables
self._recognized_sensors = {}
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
# at random time intervals
if random() > self.threshold:
# select a random sensor
sensor = self.sensors[int(random() * len(self.sensors))]
# set parameters depending on sensor type
if sensor.startswith('Temp'):
value_range = self.temp_range
value_offset = self.temp_offset
else:
value_range = self.humi_range
value_offset = self.humi_offset
# generate random sensor data
value = int(random() * value_range + value_offset)
self.handle_sensor_data(sensor, value)
def handle_sensor_data(self, sensor, value):
# initialize sensor structure if never done before
if sensor not in self._recognized_sensors:
self.create_sensor_structure(sensor)
self.push_sensor_data(sensor, value)
def create_sensor_structure(self, sensor):
print('initializing sensor: %s' % sensor)
# create sensor container
device_container = Container(resourceName=sensor)
device_container = self.create_container(self._devices_container.path,
device_container,
labels=['sensor'],
max_nr_of_instances=0)
# add sensor to _recognized_sensors
self._recognized_sensors[sensor] = device_container
def push_sensor_data(self, sensor, value):
# build data set with value and metadata
if sensor.startswith('Temp'):
data = {
'value': value,
'type': 'temperature',
'unit': 'degreeC'
}
else:
data = {
'value': value,
'type': 'humidity',
'unit': 'percentage'
}
# print the new data set
print('%s: %s' % (sensor, data))
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)
Step 7: onem2m-ipe-sensors-07.py
- introduced self._measurement_containers variable
- added creation of measurements container in function create_sensor_structure()
- added push of data to measurements_container of the sensor in function push_sensor_data()
from random import random
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
class TestIPE(XAE):
remove_registration = True
# sensors to create
sensors = [
'Temp-1',
'Temp-2',
'Humi-1',
'Humi-2'
]
# settings for random sensor data generation
threshold = 0.2
temp_range = 25
temp_offset = 10
humi_range = 50
humi_offset = 30
def _on_register(self):
# init variables
self._recognized_sensors = {}
self._recognized_measurement_containers = {}
# init base structure
label = 'devices'
container = Container(resourceName=label)
self._devices_container = self.create_container(None,
container,
labels=[label],
max_nr_of_instances=0)
# trigger periodically new data generation
self.run_forever(1, self.get_random_data)
# log message
self.logger.debug('registered')
def get_random_data(self):
# at random time intervals
if random() > self.threshold:
# select a random sensor
sensor = self.sensors[int(random() * len(self.sensors))]
# set parameters depending on sensor type
if sensor.startswith('Temp'):
value_range = self.temp_range
value_offset = self.temp_offset
else:
value_range = self.humi_range
value_offset = self.humi_offset
# generate random sensor data
value = int(random() * value_range + value_offset)
self.handle_sensor_data(sensor, value)
def handle_sensor_data(self, sensor, value):
# initialize sensor structure if never done before
if sensor not in self._recognized_sensors:
self.create_sensor_structure(sensor)
self.push_sensor_data(sensor, value)
def create_sensor_structure(self, sensor):
print('initializing sensor: %s' % sensor)
# create sensor container
device_container = Container(resourceName=sensor)
device_container = self.create_container(self._devices_container.path,
device_container,
labels=['sensor'],
max_nr_of_instances=0)
# add sensor to _recognized_sensors
self._recognized_sensors[sensor] = device_container
# create measurements container
labels = ['measurements']
if sensor.startswith('Temp'):
labels.append('temperature')
else:
labels.append('humidity')
measurements_container = Container(resourceName='measurements')
measurements_container = self.create_container(device_container.path,
measurements_container,
labels=labels,
max_nr_of_instances=3)
# add measurements_container from sensor to _recognized_measurement_containers
self._recognized_measurement_containers[sensor] = measurements_container
def push_sensor_data(self, sensor, value):
# build data set with value and metadata
if sensor.startswith('Temp'):
data = {
'value': value,
'type': 'temperature',
'unit': 'degreeC'
}
else:
data = {
'value': value,
'type': 'humidity',
'unit': 'percentage'
}
# print the new data set
print ('%s: %s' % (sensor, data))
# finally, push the data set to measurements_container of the sensor
self.push_content(self._recognized_measurement_containers[sensor], data)
if __name__ == '__main__':
from openmtc_app.runner import AppRunner as Runner
host = 'http://localhost:8000'
app = TestIPE()
Runner(app).run(host)