mirror of
https://github.com/OpenMTC/OpenMTC.git
synced 2024-12-20 05:28:23 +00:00
reqrites Orion CB AE to use new ResourceManagementXAE class
This commit is contained in:
parent
febb78d13c
commit
d3f33aa153
@ -1,69 +1,48 @@
|
||||
"""
|
||||
Copyright (c) 2017 Fraunhofer FOKUS
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
try:
|
||||
from urllib.parse import urljoin
|
||||
except ImportError:
|
||||
from urlparse import urljoin
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import json
|
||||
from urlparse import urljoin
|
||||
import requests
|
||||
from copy import deepcopy as deepcopy
|
||||
|
||||
from futile.logging import get_logger
|
||||
logger = get_logger(__name__)
|
||||
from futile.logging import LoggerMixin
|
||||
|
||||
|
||||
# TODO: check if this is sufficient
|
||||
def _isSenML(senml_dict):
|
||||
is_senml = True
|
||||
try:
|
||||
is_senml = is_senml and ("bn") in senml_dict.keys()
|
||||
is_senml = is_senml and ("n") in senml_dict.keys()
|
||||
is_senml = is_senml and ("u") in senml_dict.keys()
|
||||
is_senml = is_senml and ("v") in senml_dict.keys()
|
||||
is_senml = is_senml and ("t") in senml_dict.keys()
|
||||
except BaseException:
|
||||
return False
|
||||
return is_senml
|
||||
|
||||
|
||||
def _get_type(element):
|
||||
if isinstance(element, int):
|
||||
return u"Int"
|
||||
elif isinstance(element, float):
|
||||
return u"Float"
|
||||
elif isinstance(element, bool):
|
||||
return u"Boolean"
|
||||
elif isinstance(element, (str, unicode)):
|
||||
return u"String"
|
||||
else:
|
||||
logger.error("Type of \"element\" unknown")
|
||||
return "Unknown"
|
||||
|
||||
|
||||
class OrionAPI:
|
||||
class OrionAPI(LoggerMixin):
|
||||
|
||||
def __init__(self, orion_host=None, api_version="v2"):
|
||||
super(OrionAPI, self).__init__()
|
||||
self.host = orion_host
|
||||
self.version = api_version
|
||||
|
||||
def create_entity(self, entitiy_name, entity_type="openmtc",
|
||||
fiware_service=""):
|
||||
# TODO: check if this is sufficient
|
||||
@staticmethod
|
||||
def _is_senml(senml_dict):
|
||||
try:
|
||||
return (all(key in senml_dict for key in ('bn', 'n', 'u', 't')) and
|
||||
any(key in senml_dict for key in ('v', 'vs', 'vb')))
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
payload_json = {"id": entitiy_name, "type": entity_type}
|
||||
def _get_type(self, element):
|
||||
if isinstance(element, int):
|
||||
return u"Int"
|
||||
elif isinstance(element, float):
|
||||
return u"Float"
|
||||
elif isinstance(element, bool):
|
||||
return u"Boolean"
|
||||
elif isinstance(element, (str, unicode)):
|
||||
return u"String"
|
||||
else:
|
||||
self.logger.error("Type of \"element\" unknown")
|
||||
return "Unknown"
|
||||
|
||||
def create_entity(self, entity_name, entity_type="openmtc", fiware_service=""):
|
||||
payload_json = {"id": entity_name, "type": entity_type}
|
||||
|
||||
if self.version == "v2":
|
||||
logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
response = self._request(
|
||||
self.logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
self._request(
|
||||
self.host + "/v2/entities",
|
||||
method="post",
|
||||
json=payload_json,
|
||||
@ -72,25 +51,24 @@ class OrionAPI:
|
||||
"Content-type": "application/json",
|
||||
"fiware-service": fiware_service}
|
||||
)
|
||||
logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
self.logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
else:
|
||||
logger.error("API version \"%s\" not supported!", self.version)
|
||||
self.logger.error("API version \"%s\" not supported!", self.version)
|
||||
|
||||
def update_attributes(self, entity_id, data_senml, fiware_service=""):
|
||||
data_senml = data_senml[0]
|
||||
if not _isSenML(data_senml):
|
||||
logger.error("Data \"%s\" is not valid SenML", data_senml)
|
||||
if not self._is_senml(data_senml):
|
||||
self.logger.error("Data \"%s\" is not valid SenML", data_senml)
|
||||
return
|
||||
|
||||
if data_senml["v"] == "type" or data_senml["v"] == "id":
|
||||
logger.warn(
|
||||
self.logger.warn(
|
||||
"SenML[v]=%s contains reserved name. Adding underscore", data_senml["v"])
|
||||
data_senml["v"] = data_senml["v"] + "_"
|
||||
|
||||
payload_json = {
|
||||
data_senml["n"]: {
|
||||
"value": data_senml["v"],
|
||||
"type": _get_type(data_senml["v"]),
|
||||
"type": self._get_type(data_senml["v"]),
|
||||
"metadata": {
|
||||
"timestamp": {"value": data_senml["t"], "type": "String"},
|
||||
"bn": {"value": data_senml["bn"], "type": "String"},
|
||||
@ -100,7 +78,7 @@ class OrionAPI:
|
||||
}
|
||||
|
||||
if self.version == "v2":
|
||||
response = self._request(
|
||||
self._request(
|
||||
self.host + "/v2/entities/" + entity_id + "/attrs",
|
||||
method="post",
|
||||
json=payload_json,
|
||||
@ -109,29 +87,21 @@ class OrionAPI:
|
||||
"Content-type": "application/json",
|
||||
"fiware-service": fiware_service}
|
||||
)
|
||||
logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
self.logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
else:
|
||||
logger.error("API version \"%s\" not supported!", self.version)
|
||||
|
||||
def _request(
|
||||
self,
|
||||
url,
|
||||
method='get',
|
||||
json=None,
|
||||
params=None,
|
||||
headers=None,
|
||||
raw=False):
|
||||
self.logger.error("API version \"%s\" not supported!", self.version)
|
||||
|
||||
def _request(self, url, method='get', json=None, params=None, headers=None, raw=False):
|
||||
joined_url = urljoin(self.host, url)
|
||||
try:
|
||||
req = requests.request(method, joined_url, json=json,
|
||||
params=params, headers=headers)
|
||||
logger.debug("Status Code: %s", req.status_code)
|
||||
logger.debug("Content: %s", req.content)
|
||||
self.logger.debug("Status Code: %s", req.status_code)
|
||||
self.logger.debug("Content: %s", req.content)
|
||||
if raw:
|
||||
return {"status": req.status_code, "content": req.content}
|
||||
else:
|
||||
return {"status": req.status_code, "content": req.json()}
|
||||
except requests.ConnectionError as e:
|
||||
print "Connection Error: " + str(e)
|
||||
self.logger.error("Connection Error: " + str(e))
|
||||
return {"status": -1, "content": None}
|
||||
|
@ -1,159 +1,53 @@
|
||||
"""
|
||||
Copyright (c) 2017 Fraunhofer FOKUS
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
try:
|
||||
from urllib.parse import urlparse
|
||||
except ImportError:
|
||||
from urlparse import urlparse
|
||||
|
||||
from futile.logging import get_logger
|
||||
from openmtc_app.onem2m import XAE
|
||||
from openmtc_onem2m.model import CSETypeIDE, RemoteCSE
|
||||
import re
|
||||
|
||||
from openmtc_app.onem2m import ResourceManagementXAE
|
||||
from orion_api import OrionAPI
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
class OrionContextBroker(ResourceManagementXAE):
|
||||
|
||||
class OrionContextBroker(XAE):
|
||||
|
||||
def __init__(self, labels=[""], interval=10,
|
||||
orion_host="http://localhost:1026", orion_api="v2",
|
||||
def __init__(self, orion_host="http://localhost:1026", orion_api="v2", labels=None,
|
||||
*args, **kw):
|
||||
super(OrionContextBroker, self).__init__(*args, **kw)
|
||||
self.labels = labels
|
||||
self.remove_registration = True
|
||||
self.interval = interval
|
||||
if isinstance(labels, basestring):
|
||||
self.labels = {labels}
|
||||
elif hasattr(labels, '__iter__'):
|
||||
self.labels = set(labels)
|
||||
else:
|
||||
self.labels = None
|
||||
self._entity_names = {}
|
||||
self.orion_api = OrionAPI(orion_host=orion_host, api_version=orion_api)
|
||||
|
||||
def _on_register(self):
|
||||
# init variables
|
||||
self._known_remote_cses = {}
|
||||
self._discovered_devices = {}
|
||||
self._discovered_sensors = {}
|
||||
self._discover_openmtc_ipe_entities()
|
||||
|
||||
# connected to backend or gateway?
|
||||
cse_base = self.get_resource(self.cse_base)
|
||||
logger.debug("CSE_BASE: %s", cse_base)
|
||||
|
||||
if (cse_base.cseType == CSETypeIDE.MN_CSE or
|
||||
cse_base.cseType == CSETypeIDE.AEN_CSE):
|
||||
logger.debug("CSE_BASE identified as gateway")
|
||||
# discover gateway
|
||||
self._discover_cse(cse_base.CSE_ID + '/' + self.cse_base)
|
||||
def _sensor_filter(self, sensor):
|
||||
if self.labels:
|
||||
return len(self.labels.intersection(sensor.labels)) > 0
|
||||
else:
|
||||
logger.debug("CSE_BASE identified as backend")
|
||||
# discover backend
|
||||
self._discover_cse(cse_base.CSE_ID + '/' + self.cse_base)
|
||||
# discover remote gateways
|
||||
self._get_remote_cses(cse_base)
|
||||
return True
|
||||
|
||||
# get remote CSEs
|
||||
def _sensor_data_cb(self, sensor_info, sensor_data):
|
||||
|
||||
def _get_remote_cses(self, cse_base):
|
||||
|
||||
def get_cse_base():
|
||||
handle_cse_base(self.get_resource(self.cse_base))
|
||||
|
||||
def handle_cse_base(cb):
|
||||
for resource in cb.childResource:
|
||||
if (isinstance(resource, RemoteCSE) and
|
||||
resource.path not in self._known_remote_cses):
|
||||
remote_cse = self.get_resource(resource.id)
|
||||
self._known_remote_cses[resource.path] = remote_cse
|
||||
remote_cse_base = (remote_cse.CSE_ID + '/' +
|
||||
remote_cse.CSEBase)
|
||||
self._discover_cse(remote_cse_base)
|
||||
|
||||
handle_cse_base(cse_base)
|
||||
self.run_forever(self.interval, get_cse_base)
|
||||
|
||||
# discover CSE
|
||||
def _discover_cse(self, cse_base):
|
||||
|
||||
def err_cb(errror_response):
|
||||
def get_entity_name():
|
||||
try:
|
||||
del self._known_remote_cses[remote_cse_id]
|
||||
except KeyError:
|
||||
pass
|
||||
# discover devices
|
||||
self.periodic_discover(cse_base, {'labels': ['openmtc:device']},
|
||||
self.interval,
|
||||
self._discover_devices)
|
||||
self.periodic_discover(cse_base, {'labels': ['openmtc:sensor_data',
|
||||
'openmtc:actuator_data']},
|
||||
self.interval,
|
||||
self._discover_sensors, err_cb)
|
||||
id_label = filter(lambda x: (x.startswith('openmtc:id:')),
|
||||
sensor_info['sensor_labels']).pop()
|
||||
cse_id, dev_id = re.sub('^openmtc:id:', '', id_label).split('/')[:2]
|
||||
except (IndexError, ValueError):
|
||||
cse_id = sensor_info['cse_id']
|
||||
dev_id = sensor_info['dev_name']
|
||||
try:
|
||||
f_s, e_pre = cse_id.split('~', 1)
|
||||
except ValueError:
|
||||
f_s = ''
|
||||
e_pre = cse_id
|
||||
return re.sub('[\W]', '_', f_s), '%s-%s' % (e_pre, dev_id)
|
||||
|
||||
def _discover_devices(self, discovery):
|
||||
for device_path in discovery:
|
||||
self._discovered_devices[device_path] = 0
|
||||
logger.debug("Discovered devices: %s", self._discovered_devices)
|
||||
|
||||
def _handle_sensor_data(self, container, data):
|
||||
logger.debug("Got Sensor \"%s\" data: %s", container, data)
|
||||
# XXX if a label contains 3x '/' assume that we need smart orchestra
|
||||
# naming
|
||||
try:
|
||||
entity_name = next(lbl for lbl in self.get_resource(
|
||||
container).labels if lbl.count('/') == 3)
|
||||
tenant_name = entity_name.split('/')[0]
|
||||
entity_name = '-'.join(entity_name.split('/')[1:3])
|
||||
except Exception as e:
|
||||
entity_name = container.split('/')[-2]
|
||||
tenant_name = ""
|
||||
fiware_service, entity_name = self._entity_names[sensor_info['ID']]
|
||||
except KeyError:
|
||||
fiware_service, entity_name = self._entity_names[sensor_info['ID']] = get_entity_name()
|
||||
self.orion_api.create_entity(entity_name, fiware_service=fiware_service)
|
||||
|
||||
self.orion_api.create_entity(entity_name, fiware_service=tenant_name)
|
||||
self.orion_api.update_attributes(
|
||||
entity_name,
|
||||
data,
|
||||
fiware_service=tenant_name)
|
||||
|
||||
def _handle_new_sensor(self, sensor_path):
|
||||
|
||||
# check labels of openmtc:device
|
||||
device_labels = self.get_resource(
|
||||
"/".join(sensor_path.split('/')[0:-1])).labels
|
||||
# if label configured
|
||||
if not ((len(self.labels) == 0) or
|
||||
(len(self.labels) == 1 and self.labels[0] == "")):
|
||||
# if no matching label
|
||||
if len(set(self.labels) & set(device_labels)) == 0:
|
||||
# no matching label no subscription
|
||||
logger.debug("no matching label for %s", sensor_path)
|
||||
return
|
||||
|
||||
logger.debug("Subscription added for %s", sensor_path)
|
||||
self.add_container_subscription(sensor_path, self._handle_sensor_data)
|
||||
|
||||
def _discover_sensors(self, discovery):
|
||||
for sensor_path in discovery:
|
||||
try:
|
||||
dev_path = [x for x in self._discovered_devices.keys()
|
||||
if sensor_path.startswith(x)][0]
|
||||
except IndexError as e: # todo(rst): ignore, but should not happen
|
||||
logger.debug("%s", e)
|
||||
logger.debug("%s", sensor_path)
|
||||
continue
|
||||
self._discovered_sensors[sensor_path] = {
|
||||
'ID': sensor_path,
|
||||
'dev_name': dev_path.split('/')[-1],
|
||||
'cse_id': sensor_path.split('/')[1],
|
||||
'data': None,
|
||||
'type': 'sensor',
|
||||
'n': None,
|
||||
'u': None
|
||||
}
|
||||
self._handle_new_sensor(sensor_path)
|
||||
self.orion_api.update_attributes(entity_name, sensor_data, fiware_service=fiware_service)
|
||||
|
Loading…
Reference in New Issue
Block a user