adds ResourceManagementXAE to SDK that works with openmtc labels

This commit is contained in:
Ronald Steinke 2018-01-10 13:53:52 +01:00
parent 03c1ba099a
commit febb78d13c

View File

@ -1,3 +1,4 @@
from abc import abstractmethod
from base64 import (
b64decode,
b64encode,
@ -35,7 +36,7 @@ from openmtc_onem2m.model import (
EncodingTypeE,
get_short_member_name,
NotificationEventTypeE,
EventNotificationCriteria)
EventNotificationCriteria, CSETypeIDE, RemoteCSE)
from openmtc_onem2m.serializer import get_onem2m_decoder
from openmtc_onem2m.transport import OneM2MErrorResponse
import time
@ -748,3 +749,144 @@ class XAE(LoggerMixin):
extra_fields=extra_fields,
restore=restore,
)
class ResourceManagementXAE(XAE):
def __init__(self, interval=10, *args, **kw):
super(ResourceManagementXAE, self).__init__(*args, **kw)
self.interval = interval
self._device_labels = ["openmtc:device"]
self._sensor_labels = ["openmtc:sensor_data"]
self._actuator_labels = ["openmtc:actuator_data"]
# init variables
self._known_remote_cses = {}
self._discovered_devices = {}
self._discovered_sensors = {}
def _discover_openmtc_ipe_entities(self):
# connected to backend or gateway?
cse_base = self.get_resource(self.cse_base)
self.logger.debug("CSE_BASE: %s", cse_base)
if cse_base.cseType in (CSETypeIDE.MN_CSE, CSETypeIDE.AEN_CSE):
self.logger.info("CSE_BASE identified as gateway")
# discover gateway
self._discover_resources(cse_base.CSE_ID + '/' + self.cse_base)
else:
self.logger.info("CSE_BASE identified as backend")
# discover backend
self._discover_resources(cse_base.CSE_ID + '/' + self.cse_base)
# discover remote gateways
self._get_remote_cses(cse_base)
# get remote CSEs
def _get_remote_cses(self, cse_base):
def get_cse_base():
handle_cse_base(self.get_resource(self.cse_base))
def handle_cse_base(cb):
for resource in cb.childResource:
if (isinstance(resource, RemoteCSE) and
resource.path not in self._known_remote_cses):
remote_cse = self.get_resource(resource.id)
self._known_remote_cses[resource.path] = remote_cse
remote_cse_base = (remote_cse.CSE_ID + '/' +
remote_cse.CSEBase)
self._discover_resources(remote_cse_base, resource.path)
handle_cse_base(cse_base)
self.run_forever(self.interval, get_cse_base)
# discover resources
def _discover_resources(self, cse_base, remote_cse_id=None):
def err_cb(error_response):
try:
del self._known_remote_cses[remote_cse_id]
except KeyError:
pass
else:
self._discovered_devices = {k: v for k, v in self._discovered_devices.items()
if not k.startswith(cse_base)}
self._discovered_sensors = {k: v for k, v in self._discovered_sensors.items()
if not k.startswith(cse_base)}
# discover devices
self.periodic_discover(cse_base, {'labels': self._device_labels}, self.interval,
self._discover_devices, err_cb, auto_cra=False)
sleep(0.3)
self.periodic_discover(cse_base, {'labels': self._sensor_labels}, self.interval,
self._discover_sensors, err_cb, auto_cra=False)
def _discover_devices(self, discovery):
for device_path in set(discovery) - set(self._discovered_devices):
self._discovered_devices[device_path] = self.get_resource(device_path)
self.logger.debug("Discovered devices: %s", self._discovered_devices)
def _discover_sensors(self, discovery):
for sensor_path in set(discovery) - set(self._discovered_sensors):
try:
dev_path = [x for x in self._discovered_devices.keys()
if sensor_path.startswith(x)][0]
except IndexError:
continue
sensor = self.get_resource(sensor_path)
self._discovered_sensors[sensor_path] = {
'ID': sensor_path,
'dev_name': dev_path.split('/')[-1],
'cse_id': sensor_path.split('/')[1],
'dev_labels': self._discovered_devices[dev_path].labels,
'sensor_labels': sensor.labels,
'type': 'sensor',
'n': None,
'u': None,
'blacklisted': False
}
if self._sensor_filter(sensor):
self._handle_new_sensor(sensor_path)
else:
self._discovered_sensors[sensor_path]['blacklisted'] = True
def _handle_new_sensor(self, sensor_path):
latest = self.get_resource(sensor_path + '/latest')
if latest:
spawn(self._handle_sensor_data, sensor_path, self._get_content_from_cin(latest))
self.logger.debug("Subscription added for %s", sensor_path)
sub_ref = self.add_container_subscription(sensor_path, self._handle_sensor_data,
self._handle_delete)
self._discovered_sensors[sensor_path]['sub_ref'] = sub_ref
def _handle_delete(self, sub_ref):
self._discovered_sensors = {k: v for k, v in self._discovered_sensors.items()
if v['sub_ref'] != sub_ref}
self._discovered_devices = {k: v for k, v in self._discovered_devices.items()
if any(filter(lambda x: x.startswith(k),
self._discovered_sensors.keys()))
or not sub_ref.startswith(k)}
def _handle_sensor_data(self, container, data):
self.logger.debug("Got Sensor \"%s\" data: %s", container, data)
try:
sensor_info = self._discovered_sensors[container]
sensor_data = data[0]
except (IndexError, KeyError):
return
if not sensor_info['n']:
try:
sensor_info['n'] = sensor_data['n']
sensor_info['u'] = sensor_data['u']
except KeyError:
return
self._sensor_data_cb(sensor_info, sensor_data)
@abstractmethod
def _sensor_data_cb(self, sensor, sensor_data):
pass
@abstractmethod
def _sensor_filter(self, sensor):
pass