diff --git a/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py b/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py index 1552ccf..3a50862 100644 --- a/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py +++ b/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py @@ -1,69 +1,48 @@ -""" -Copyright (c) 2017 Fraunhofer FOKUS -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at +try: + from urllib.parse import urljoin +except ImportError: + from urlparse import urljoin - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import json -from urlparse import urljoin import requests -from copy import deepcopy as deepcopy -from futile.logging import get_logger -logger = get_logger(__name__) +from futile.logging import LoggerMixin -# TODO: check if this is sufficient -def _isSenML(senml_dict): - is_senml = True - try: - is_senml = is_senml and ("bn") in senml_dict.keys() - is_senml = is_senml and ("n") in senml_dict.keys() - is_senml = is_senml and ("u") in senml_dict.keys() - is_senml = is_senml and ("v") in senml_dict.keys() - is_senml = is_senml and ("t") in senml_dict.keys() - except BaseException: - return False - return is_senml - - -def _get_type(element): - if isinstance(element, int): - return u"Int" - elif isinstance(element, float): - return u"Float" - elif isinstance(element, bool): - return u"Boolean" - elif isinstance(element, (str, unicode)): - return u"String" - else: - logger.error("Type of \"element\" unknown") - return "Unknown" - - -class OrionAPI: +class OrionAPI(LoggerMixin): def __init__(self, orion_host=None, api_version="v2"): + super(OrionAPI, self).__init__() self.host = orion_host self.version = api_version - def create_entity(self, entitiy_name, entity_type="openmtc", - fiware_service=""): + # TODO: check if this is sufficient + @staticmethod + def _is_senml(senml_dict): + try: + return (all(key in senml_dict for key in ('bn', 'n', 'u', 't')) and + any(key in senml_dict for key in ('v', 'vs', 'vb'))) + except TypeError: + return False - payload_json = {"id": entitiy_name, "type": entity_type} + def _get_type(self, element): + if isinstance(element, int): + return u"Int" + elif isinstance(element, float): + return u"Float" + elif isinstance(element, bool): + return u"Boolean" + elif isinstance(element, (str, unicode)): + return u"String" + else: + self.logger.error("Type of \"element\" unknown") + return "Unknown" + + def create_entity(self, entity_name, entity_type="openmtc", fiware_service=""): + payload_json = {"id": entity_name, "type": entity_type} if self.version == "v2": - logger.debug("Send Payload to Orion CB: %s", str(payload_json)) - response = self._request( + self.logger.debug("Send Payload to Orion CB: %s", str(payload_json)) + self._request( self.host + "/v2/entities", method="post", json=payload_json, @@ -72,25 +51,24 @@ class OrionAPI: "Content-type": "application/json", "fiware-service": fiware_service} ) - logger.debug("Send Payload to Orion CB: %s", str(payload_json)) + self.logger.debug("Send Payload to Orion CB: %s", str(payload_json)) else: - logger.error("API version \"%s\" not supported!", self.version) + self.logger.error("API version \"%s\" not supported!", self.version) def update_attributes(self, entity_id, data_senml, fiware_service=""): - data_senml = data_senml[0] - if not _isSenML(data_senml): - logger.error("Data \"%s\" is not valid SenML", data_senml) + if not self._is_senml(data_senml): + self.logger.error("Data \"%s\" is not valid SenML", data_senml) return if data_senml["v"] == "type" or data_senml["v"] == "id": - logger.warn( + self.logger.warn( "SenML[v]=%s contains reserved name. Adding underscore", data_senml["v"]) data_senml["v"] = data_senml["v"] + "_" payload_json = { data_senml["n"]: { "value": data_senml["v"], - "type": _get_type(data_senml["v"]), + "type": self._get_type(data_senml["v"]), "metadata": { "timestamp": {"value": data_senml["t"], "type": "String"}, "bn": {"value": data_senml["bn"], "type": "String"}, @@ -100,7 +78,7 @@ class OrionAPI: } if self.version == "v2": - response = self._request( + self._request( self.host + "/v2/entities/" + entity_id + "/attrs", method="post", json=payload_json, @@ -109,29 +87,21 @@ class OrionAPI: "Content-type": "application/json", "fiware-service": fiware_service} ) - logger.debug("Send Payload to Orion CB: %s", str(payload_json)) + self.logger.debug("Send Payload to Orion CB: %s", str(payload_json)) else: - logger.error("API version \"%s\" not supported!", self.version) - - def _request( - self, - url, - method='get', - json=None, - params=None, - headers=None, - raw=False): + self.logger.error("API version \"%s\" not supported!", self.version) + def _request(self, url, method='get', json=None, params=None, headers=None, raw=False): joined_url = urljoin(self.host, url) try: req = requests.request(method, joined_url, json=json, params=params, headers=headers) - logger.debug("Status Code: %s", req.status_code) - logger.debug("Content: %s", req.content) + self.logger.debug("Status Code: %s", req.status_code) + self.logger.debug("Content: %s", req.content) if raw: return {"status": req.status_code, "content": req.content} else: return {"status": req.status_code, "content": req.json()} except requests.ConnectionError as e: - print "Connection Error: " + str(e) + self.logger.error("Connection Error: " + str(e)) return {"status": -1, "content": None} diff --git a/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py b/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py index b9a6bc4..44fa967 100644 --- a/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py +++ b/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py @@ -1,159 +1,53 @@ -""" -Copyright (c) 2017 Fraunhofer FOKUS -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -try: - from urllib.parse import urlparse -except ImportError: - from urlparse import urlparse - -from futile.logging import get_logger -from openmtc_app.onem2m import XAE -from openmtc_onem2m.model import CSETypeIDE, RemoteCSE +import re +from openmtc_app.onem2m import ResourceManagementXAE from orion_api import OrionAPI -logger = get_logger(__name__) +class OrionContextBroker(ResourceManagementXAE): -class OrionContextBroker(XAE): - - def __init__(self, labels=[""], interval=10, - orion_host="http://localhost:1026", orion_api="v2", + def __init__(self, orion_host="http://localhost:1026", orion_api="v2", labels=None, *args, **kw): super(OrionContextBroker, self).__init__(*args, **kw) - self.labels = labels - self.remove_registration = True - self.interval = interval + if isinstance(labels, basestring): + self.labels = {labels} + elif hasattr(labels, '__iter__'): + self.labels = set(labels) + else: + self.labels = None + self._entity_names = {} self.orion_api = OrionAPI(orion_host=orion_host, api_version=orion_api) def _on_register(self): - # init variables - self._known_remote_cses = {} - self._discovered_devices = {} - self._discovered_sensors = {} + self._discover_openmtc_ipe_entities() - # connected to backend or gateway? - cse_base = self.get_resource(self.cse_base) - logger.debug("CSE_BASE: %s", cse_base) - - if (cse_base.cseType == CSETypeIDE.MN_CSE or - cse_base.cseType == CSETypeIDE.AEN_CSE): - logger.debug("CSE_BASE identified as gateway") - # discover gateway - self._discover_cse(cse_base.CSE_ID + '/' + self.cse_base) + def _sensor_filter(self, sensor): + if self.labels: + return len(self.labels.intersection(sensor.labels)) > 0 else: - logger.debug("CSE_BASE identified as backend") - # discover backend - self._discover_cse(cse_base.CSE_ID + '/' + self.cse_base) - # discover remote gateways - self._get_remote_cses(cse_base) + return True - # get remote CSEs + def _sensor_data_cb(self, sensor_info, sensor_data): - def _get_remote_cses(self, cse_base): - - def get_cse_base(): - handle_cse_base(self.get_resource(self.cse_base)) - - def handle_cse_base(cb): - for resource in cb.childResource: - if (isinstance(resource, RemoteCSE) and - resource.path not in self._known_remote_cses): - remote_cse = self.get_resource(resource.id) - self._known_remote_cses[resource.path] = remote_cse - remote_cse_base = (remote_cse.CSE_ID + '/' + - remote_cse.CSEBase) - self._discover_cse(remote_cse_base) - - handle_cse_base(cse_base) - self.run_forever(self.interval, get_cse_base) - - # discover CSE - def _discover_cse(self, cse_base): - - def err_cb(errror_response): + def get_entity_name(): try: - del self._known_remote_cses[remote_cse_id] - except KeyError: - pass - # discover devices - self.periodic_discover(cse_base, {'labels': ['openmtc:device']}, - self.interval, - self._discover_devices) - self.periodic_discover(cse_base, {'labels': ['openmtc:sensor_data', - 'openmtc:actuator_data']}, - self.interval, - self._discover_sensors, err_cb) + id_label = filter(lambda x: (x.startswith('openmtc:id:')), + sensor_info['sensor_labels']).pop() + cse_id, dev_id = re.sub('^openmtc:id:', '', id_label).split('/')[:2] + except (IndexError, ValueError): + cse_id = sensor_info['cse_id'] + dev_id = sensor_info['dev_name'] + try: + f_s, e_pre = cse_id.split('~', 1) + except ValueError: + f_s = '' + e_pre = cse_id + return re.sub('[\W]', '_', f_s), '%s-%s' % (e_pre, dev_id) - def _discover_devices(self, discovery): - for device_path in discovery: - self._discovered_devices[device_path] = 0 - logger.debug("Discovered devices: %s", self._discovered_devices) - - def _handle_sensor_data(self, container, data): - logger.debug("Got Sensor \"%s\" data: %s", container, data) - # XXX if a label contains 3x '/' assume that we need smart orchestra - # naming try: - entity_name = next(lbl for lbl in self.get_resource( - container).labels if lbl.count('/') == 3) - tenant_name = entity_name.split('/')[0] - entity_name = '-'.join(entity_name.split('/')[1:3]) - except Exception as e: - entity_name = container.split('/')[-2] - tenant_name = "" + fiware_service, entity_name = self._entity_names[sensor_info['ID']] + except KeyError: + fiware_service, entity_name = self._entity_names[sensor_info['ID']] = get_entity_name() + self.orion_api.create_entity(entity_name, fiware_service=fiware_service) - self.orion_api.create_entity(entity_name, fiware_service=tenant_name) - self.orion_api.update_attributes( - entity_name, - data, - fiware_service=tenant_name) - - def _handle_new_sensor(self, sensor_path): - - # check labels of openmtc:device - device_labels = self.get_resource( - "/".join(sensor_path.split('/')[0:-1])).labels - # if label configured - if not ((len(self.labels) == 0) or - (len(self.labels) == 1 and self.labels[0] == "")): - # if no matching label - if len(set(self.labels) & set(device_labels)) == 0: - # no matching label no subscription - logger.debug("no matching label for %s", sensor_path) - return - - logger.debug("Subscription added for %s", sensor_path) - self.add_container_subscription(sensor_path, self._handle_sensor_data) - - def _discover_sensors(self, discovery): - for sensor_path in discovery: - try: - dev_path = [x for x in self._discovered_devices.keys() - if sensor_path.startswith(x)][0] - except IndexError as e: # todo(rst): ignore, but should not happen - logger.debug("%s", e) - logger.debug("%s", sensor_path) - continue - self._discovered_sensors[sensor_path] = { - 'ID': sensor_path, - 'dev_name': dev_path.split('/')[-1], - 'cse_id': sensor_path.split('/')[1], - 'data': None, - 'type': 'sensor', - 'n': None, - 'u': None - } - self._handle_new_sensor(sensor_path) + self.orion_api.update_attributes(entity_name, sensor_data, fiware_service=fiware_service)