mirror of
https://github.com/OpenMTC/OpenMTC.git
synced 2025-04-09 20:11:18 +00:00
add actuator capability to ocb ae
This commit is contained in:
parent
ceb2faeaa3
commit
b3bfc3ff17
@ -1,11 +1,11 @@
|
||||
# Introduction
|
||||
|
||||
OrionContextBroker is an OpenMTC AE to forward OpenMTC data (via Subscription) to an instance of the Orion Context Broker.
|
||||
All ContentInstances are expected to use the SenML format. It is possible to connect the AE either to an OpenMTC Gateway or an OpenMTC Backend.
|
||||
OrionContextBroker is an OpenMTC AE to forward OpenMTC sensor data (via Subscription) to an instance of the Orion Context Broker. Additionally OpenMTC actuators are handled by forwarding changes on the OCB side via OpenMTC to the actuator.
|
||||
All Content Instances are expected to use the SenML format. It is possible to connect the AE either to an OpenMTC Gateway or an OpenMTC Backend.
|
||||
|
||||
# Getting started
|
||||
|
||||
Within the openmtc root directory the app can be started via
|
||||
Within the OpenMTC root directory the app can be started via
|
||||
|
||||
```
|
||||
./apps/orion-context-broker -v
|
||||
@ -13,7 +13,7 @@ Within the openmtc root directory the app can be started via
|
||||
|
||||
## Configuration
|
||||
|
||||
It is possible to configure the AE either via config.json or CLI paramters. All possible paramters can be shown via:
|
||||
It is possible to configure the AE either via config.json or CLI parameters. All possible parameters can be shown via:
|
||||
|
||||
```
|
||||
./apps/orion-context-broker -h
|
||||
@ -25,10 +25,11 @@ The most important parameters are:
|
||||
* labels (the labels that should be forwarded to the OrionCB, one label has to match (OR), empty ([""]) means every label)
|
||||
* interval (for periodic discovery)
|
||||
* orion_host (hostname:port of the Orion CB)
|
||||
* accumulate_address (Subscription Sink (RESTful HTTP) used for subscriptions to the OCB (actuator functionality))
|
||||
|
||||
# How the data is stored at the Orion CB
|
||||
|
||||
The Orion CB uses the model of *entities* having *attributes*. The AE matches all Container having the label "openmtc:device" to entities. Attributes are matched to the SenML Key "n" of ContentInstances. The types of values are determined by the AE to match typical Orion CB types (e.g. Int, String, Float...).
|
||||
The Orion CB uses the model of *entities* having *attributes*. The AE matches all Container having the label "openmtc:device" to entities. Attributes are matched to the SenML Key "n" of Content Instances. The types of values are determined by the AE to match typical Orion CB types (e.g. Int, String, Float...).
|
||||
|
||||
## Example
|
||||
|
||||
@ -101,3 +102,9 @@ curl localhost:1026/v2/entities/ | jq '.'
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
# Actuator Functionality
|
||||
|
||||

|
||||
|
||||
If an actuator is created in OpenMTC an event is triggered at the orion context broker app. The app will create an entity of the actuator on the OCB with an attribute "cmd". After that a subscription to the attribute "cmd" is created and therefore all change on this attribute will be forwarded to the corresponding OpenMTC backend or gateway.
|
||||
|
BIN
apps/OrionContextBroker/actuator_arch.png
Normal file
BIN
apps/OrionContextBroker/actuator_arch.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 120 KiB |
@ -12,6 +12,7 @@ SSL_KEY=${SSL_KEY-"/etc/openmtc/certs/orioncontextbroker.key.pem"}
|
||||
SSL_CA=${SSL_CA-"/etc/openmtc/certs/ca-chain.cert.pem"}
|
||||
ORION_HOST=${ORION_HOST-"http://localhost:1026"}
|
||||
ORION_API=${ORION_API-"v2"}
|
||||
ACCUMULATE_ADDRESS=${ACCUMULATE_ADDRESS-"http://localhost:8080"}
|
||||
|
||||
# defaults logging
|
||||
LOGGING_FILE=${LOGGING_FILE-"/var/log/openmtc/orioncontextbroker.log"}
|
||||
@ -46,6 +47,7 @@ JQ_STRING=${JQ_STRING}' |
|
||||
.originator_pre = "'${ORIGINATOR_PRE}'" |
|
||||
.orion_host = "'${ORION_HOST}'" |
|
||||
.orion_api = "'${ORION_API}'" |
|
||||
.accumulate_address = "'${ACCUMULATE_ADDRESS}'" |
|
||||
.ssl_certs.cert_file = "'${SSL_CRT}'" |
|
||||
.ssl_certs.key_file = "'${SSL_KEY}'" |
|
||||
.ssl_certs.ca_certs = "'${SSL_CA}'" |
|
||||
|
@ -11,21 +11,28 @@ default_labels = [""]
|
||||
default_interval = 10 # interval(s) to check status updates
|
||||
default_orion_host = "http://localhost:1026"
|
||||
default_orion_api = "v2"
|
||||
default_accumulate_address = "http://localhost:8080"
|
||||
|
||||
# args parser
|
||||
parser = ArgumentParser(
|
||||
description="Stores OpenMTC Date in an instance of the "
|
||||
"Orion Context Broker",
|
||||
"Orion Context Broker",
|
||||
prog="OrionContextBroker",
|
||||
formatter_class=ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument("-n", "--name", help="Name used for the AE.")
|
||||
parser.add_argument("-s", "--ep", help="URL of the local Endpoint.")
|
||||
parser.add_argument("--orion_host", help="URL of Orion CB")
|
||||
parser.add_argument("--orion_api", help="Orion CB Api version (possible "
|
||||
"values: \"v2\")")
|
||||
parser.add_argument('--labels', type=str, help='just subscribe to those '
|
||||
'labels', nargs='+')
|
||||
parser.add_argument(
|
||||
"--orion_api", help="Orion CB Api version (possible "
|
||||
"values: \"v2\")")
|
||||
parser.add_argument(
|
||||
'--labels', type=str, help='just subscribe to those '
|
||||
'labels', nargs='+')
|
||||
parser.add_argument('--interval', type=int, help='update interval (s)')
|
||||
parser.add_argument(
|
||||
'--accumulate_address',
|
||||
type=str,
|
||||
help='address for webserver collecting orion subscription notifications')
|
||||
|
||||
# args, config and logging
|
||||
args, config = prepare_app(parser, __loader__, __name__, "config.json")
|
||||
@ -39,17 +46,25 @@ originator_pre = config.get("originator_pre", "//openmtc.org/mn-cse-1")
|
||||
ssl_certs = config.get("ssl_certs", {})
|
||||
interval = get_value("interval", int, default_ep, args, config)
|
||||
lbl = get_value("labels", list, default_labels, args, config)
|
||||
orion_host = get_value("orion_host", (unicode, str),
|
||||
default_orion_host, args, config)
|
||||
orion_api = get_value("orion_api", (unicode, str),
|
||||
default_orion_api, args, config)
|
||||
orion_host = get_value("orion_host", (unicode, str), default_orion_host, args,
|
||||
config)
|
||||
orion_api = get_value("orion_api", (unicode, str), default_orion_api, args,
|
||||
config)
|
||||
accumulate_address = get_value("accumulate_address", (unicode, str),
|
||||
default_accumulate_address, args, config)
|
||||
|
||||
# start
|
||||
app = OrionContextBroker(
|
||||
orion_host=orion_host, orion_api=orion_api, labels=lbl,
|
||||
interval=interval, name=nm, cse_base=cb, poas=poas,
|
||||
originator_pre=originator_pre, **ssl_certs
|
||||
)
|
||||
orion_host=orion_host,
|
||||
orion_api=orion_api,
|
||||
labels=lbl,
|
||||
interval=interval,
|
||||
name=nm,
|
||||
accumulate_address=accumulate_address,
|
||||
cse_base=cb,
|
||||
poas=poas,
|
||||
originator_pre=originator_pre,
|
||||
**ssl_certs)
|
||||
Runner(app).run(ep)
|
||||
|
||||
print ("Exiting....")
|
||||
print("Exiting....")
|
||||
|
@ -4,23 +4,26 @@ except ImportError:
|
||||
from urlparse import urljoin
|
||||
|
||||
import requests
|
||||
import json
|
||||
|
||||
from futile.logging import LoggerMixin
|
||||
|
||||
|
||||
class OrionAPI(LoggerMixin):
|
||||
|
||||
def __init__(self, orion_host=None, api_version="v2"):
|
||||
def __init__(self,
|
||||
orion_host=None,
|
||||
api_version="v2",
|
||||
accumulate_endpoint="http://localhost:8080"):
|
||||
super(OrionAPI, self).__init__()
|
||||
self.host = orion_host
|
||||
self.version = api_version
|
||||
self.accumulate_endpoint = accumulate_endpoint
|
||||
|
||||
# TODO: check if this is sufficient
|
||||
@staticmethod
|
||||
def _is_senml(senml_dict):
|
||||
try:
|
||||
return (all(key in senml_dict for key in ('bn', 'n', 'u', 't')) and
|
||||
any(key in senml_dict for key in ('v', 'vs', 'vb')))
|
||||
return (all(key in senml_dict for key in ('bn', 'n', 'u', 't'))
|
||||
and any(key in senml_dict for key in ('v', 'vs', 'vb')))
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
@ -34,74 +37,146 @@ class OrionAPI(LoggerMixin):
|
||||
elif isinstance(element, (str, unicode)):
|
||||
return u"String"
|
||||
else:
|
||||
self.logger.error("Type of \"element\" unknown")
|
||||
return "Unknown"
|
||||
self.logger.error('Type of "{}" unknown'.format(element))
|
||||
return u"Unknown"
|
||||
|
||||
def create_entity(self, entity_name, entity_type="openmtc", fiware_service=""):
|
||||
def create_entity(self,
|
||||
entity_name,
|
||||
entity_type="openmtc",
|
||||
fiware_service=""):
|
||||
payload_json = {"id": entity_name, "type": entity_type}
|
||||
|
||||
if self.version == "v2":
|
||||
self.logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
self.logger.debug(
|
||||
"Send Payload to Orion CB: {}".format(payload_json))
|
||||
self._request(
|
||||
self.host + "/v2/entities",
|
||||
"{}/v2/entities".format(self.host),
|
||||
method="post",
|
||||
json=payload_json,
|
||||
raw=True,
|
||||
headers={
|
||||
"Content-type": "application/json",
|
||||
"fiware-service": fiware_service}
|
||||
)
|
||||
self.logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
"fiware-service": fiware_service
|
||||
})
|
||||
self.logger.debug(
|
||||
"Send Payload to Orion CB: {}".format(payload_json))
|
||||
else:
|
||||
self.logger.error("API version \"%s\" not supported!", self.version)
|
||||
self.logger.error('API version "{}" not supported!'.format(
|
||||
self.version))
|
||||
|
||||
def update_attributes(self, entity_id, data_senml, fiware_service=""):
|
||||
if not self._is_senml(data_senml):
|
||||
self.logger.error("Data \"%s\" is not valid SenML", data_senml)
|
||||
self.logger.error(
|
||||
'Data "{}" is not valid SenML'.format(data_senml))
|
||||
return
|
||||
|
||||
if data_senml["v"] == "type" or data_senml["v"] == "id":
|
||||
if data_senml["v"] in ("type", "id"):
|
||||
self.logger.warn(
|
||||
"SenML[v]=%s contains reserved name. Adding underscore", data_senml["v"])
|
||||
data_senml["v"] = data_senml["v"] + "_"
|
||||
"SenML[v]={} contains reserved name. Adding underscore".format(
|
||||
data_senml["v"]))
|
||||
data_senml["v"] = "{}_".format(data_senml["v"])
|
||||
|
||||
payload_json = {
|
||||
data_senml["n"]: {
|
||||
"value": data_senml["v"],
|
||||
"type": self._get_type(data_senml["v"]),
|
||||
"metadata": {
|
||||
"timestamp": {"value": data_senml["t"], "type": "String"},
|
||||
"bn": {"value": data_senml["bn"], "type": "String"},
|
||||
"unit": {"value": data_senml["u"], "type": "String"}
|
||||
"timestamp": {
|
||||
"value": data_senml["t"],
|
||||
"type": "String"
|
||||
},
|
||||
"bn": {
|
||||
"value": data_senml["bn"],
|
||||
"type": "String"
|
||||
},
|
||||
"unit": {
|
||||
"value": data_senml["u"],
|
||||
"type": "String"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.version == "v2":
|
||||
self._request(
|
||||
self.host + "/v2/entities/" + entity_id + "/attrs",
|
||||
self.host + "/v2/entities/{}/attrs".format(entity_id),
|
||||
method="post",
|
||||
json=payload_json,
|
||||
raw=True,
|
||||
headers={
|
||||
"Content-type": "application/json",
|
||||
"fiware-service": fiware_service}
|
||||
)
|
||||
self.logger.debug("Send Payload to Orion CB: %s", str(payload_json))
|
||||
"fiware-service": fiware_service
|
||||
})
|
||||
self.logger.debug(
|
||||
"Send Payload to Orion CB: {}".format(payload_json))
|
||||
else:
|
||||
self.logger.error("API version \"%s\" not supported!", self.version)
|
||||
self.logger.error('API version "{}" not supported!'.format(
|
||||
self.version))
|
||||
|
||||
def _request(self, url, method='get', json=None, params=None, headers=None, raw=False):
|
||||
def subscribe(self, entity_id, entity_type="openmtc", fiware_service=""):
|
||||
payload_json = {
|
||||
"description":
|
||||
'OpenMTC Actuator Subscription for Entitiy {}'.format(entity_id),
|
||||
"subject": {
|
||||
"entities": [{
|
||||
"id": entity_id,
|
||||
"type": entity_type
|
||||
}],
|
||||
"condition": {
|
||||
"attrs": ["cmd"]
|
||||
}
|
||||
},
|
||||
"notification": {
|
||||
"http": {
|
||||
"url": self.accumulate_endpoint
|
||||
},
|
||||
"attrs": ["cmd"]
|
||||
}
|
||||
}
|
||||
if self.version == "v2":
|
||||
response = self._request(
|
||||
self.host + "/v2/subscriptions",
|
||||
method="post",
|
||||
json=payload_json,
|
||||
raw=True,
|
||||
headers={
|
||||
"Content-type": "application/json",
|
||||
"fiware-service": fiware_service
|
||||
})
|
||||
self.logger.debug(
|
||||
"Add Subscription for {} on Fiware Service {}".format(
|
||||
entity_id, fiware_service))
|
||||
else:
|
||||
self.logger.error('API version "{}" not supported!'.format(
|
||||
self.version))
|
||||
# return the subscriptionId
|
||||
return response["headers"]["Location"].split('/')[3]
|
||||
|
||||
def _request(self,
|
||||
url,
|
||||
method='get',
|
||||
json=None,
|
||||
params=None,
|
||||
headers=None,
|
||||
raw=False):
|
||||
joined_url = urljoin(self.host, url)
|
||||
try:
|
||||
req = requests.request(method, joined_url, json=json,
|
||||
params=params, headers=headers)
|
||||
self.logger.debug("Status Code: %s", req.status_code)
|
||||
self.logger.debug("Content: %s", req.content)
|
||||
req = requests.request(
|
||||
method, joined_url, json=json, params=params, headers=headers)
|
||||
self.logger.debug('Got "{}" with Status Code {}'.format(
|
||||
req.status_code, req.content))
|
||||
if raw:
|
||||
return {"status": req.status_code, "content": req.content}
|
||||
return {
|
||||
"status": req.status_code,
|
||||
"content": req.content,
|
||||
"headers": req.headers
|
||||
}
|
||||
else:
|
||||
return {"status": req.status_code, "content": req.json()}
|
||||
return {
|
||||
"status": req.status_code,
|
||||
"content": req.json(),
|
||||
"headers": req.headers
|
||||
}
|
||||
except requests.ConnectionError as e:
|
||||
self.logger.error("Connection Error: " + str(e))
|
||||
self.logger.error("Connection Error: {}".format(e))
|
||||
return {"status": -1, "content": None}
|
||||
|
@ -1,13 +1,19 @@
|
||||
import re
|
||||
from flask import Flask, Response, request
|
||||
from gevent import wsgi
|
||||
|
||||
from openmtc_app.onem2m import ResourceManagementXAE
|
||||
from orion_api import OrionAPI
|
||||
|
||||
|
||||
class OrionContextBroker(ResourceManagementXAE):
|
||||
|
||||
def __init__(self, orion_host="http://localhost:1026", orion_api="v2", labels=None,
|
||||
*args, **kw):
|
||||
def __init__(self,
|
||||
orion_host="http://localhost:1026",
|
||||
orion_api="v2",
|
||||
labels=None,
|
||||
accumulate_address="http://localhost:8080",
|
||||
*args,
|
||||
**kw):
|
||||
super(OrionContextBroker, self).__init__(*args, **kw)
|
||||
if isinstance(labels, basestring):
|
||||
self.labels = {labels}
|
||||
@ -16,42 +22,97 @@ class OrionContextBroker(ResourceManagementXAE):
|
||||
else:
|
||||
self.labels = None
|
||||
self._entity_names = {}
|
||||
self.orion_api = OrionAPI(orion_host=orion_host, api_version=orion_api)
|
||||
self._subscriptions = {}
|
||||
self.logger.critical(accumulate_address)
|
||||
self.orion_api = OrionAPI(
|
||||
orion_host=orion_host,
|
||||
api_version=orion_api,
|
||||
accumulate_endpoint="{}/accumulate".format(accumulate_address))
|
||||
|
||||
# Subscription Sink for OCB
|
||||
self.app = Flask(__name__)
|
||||
self.app.add_url_rule(
|
||||
'/accumulate',
|
||||
'process_notification',
|
||||
self.process_notification,
|
||||
methods=["POST"])
|
||||
accumulate_ip, accumulate_port = accumulate_address.split('//')[
|
||||
1].split(':')
|
||||
self.server = wsgi.WSGIServer((accumulate_ip, int(accumulate_port)),
|
||||
self.app)
|
||||
self.server.start()
|
||||
|
||||
def process_notification(self):
|
||||
self.logger.debug("Got from Subscription {}".format(request.json))
|
||||
actuator = self.get_resource(
|
||||
self._subscriptions[request.json["subscriptionId"]])
|
||||
self.push_content(actuator, request.json["data"][0]["cmd"]["value"])
|
||||
return Response(status=200, headers={})
|
||||
|
||||
def _on_register(self):
|
||||
self._discover_openmtc_ipe_entities()
|
||||
|
||||
def _new_actuator(self, actuator_info):
|
||||
# handle OCB actuator here
|
||||
pass
|
||||
|
||||
def _sensor_filter(self, sensor_info):
|
||||
if self.labels:
|
||||
return len(self.labels.intersection(sensor_info['sensor_labels'])) > 0
|
||||
return len(self.labels.intersection(
|
||||
sensor_info['sensor_labels'])) > 0
|
||||
else:
|
||||
return True
|
||||
|
||||
def _get_entity_name(self, sensor_info):
|
||||
device_type = "sensor" if sensor_info.get("sensor_labels",
|
||||
None) else "actuator"
|
||||
try:
|
||||
id_label = filter(
|
||||
lambda x: (x.startswith('openmtc:id:')),
|
||||
sensor_info['{}_labels'.format(device_type)]).pop()
|
||||
cse_id, dev_id = re.sub('^openmtc:id:', '',
|
||||
id_label).split('/')[:2]
|
||||
except (IndexError, ValueError):
|
||||
cse_id = sensor_info['cse_id']
|
||||
dev_id = sensor_info['dev_name']
|
||||
try:
|
||||
f_s, e_pre = cse_id.split('~', 1)
|
||||
except ValueError:
|
||||
f_s = ''
|
||||
e_pre = cse_id
|
||||
return re.sub('[\W]', '_', f_s), '%s-%s' % (e_pre, dev_id)
|
||||
|
||||
def _sensor_data_cb(self, sensor_info, sensor_data):
|
||||
|
||||
def get_entity_name():
|
||||
try:
|
||||
id_label = filter(lambda x: (x.startswith('openmtc:id:')),
|
||||
sensor_info['sensor_labels']).pop()
|
||||
cse_id, dev_id = re.sub('^openmtc:id:', '', id_label).split('/')[:2]
|
||||
except (IndexError, ValueError):
|
||||
cse_id = sensor_info['cse_id']
|
||||
dev_id = sensor_info['dev_name']
|
||||
try:
|
||||
f_s, e_pre = cse_id.split('~', 1)
|
||||
except ValueError:
|
||||
f_s = ''
|
||||
e_pre = cse_id
|
||||
return re.sub('[\W]', '_', f_s), '%s-%s' % (e_pre, dev_id)
|
||||
|
||||
try:
|
||||
fiware_service, entity_name = self._entity_names[sensor_info['ID']]
|
||||
except KeyError:
|
||||
fiware_service, entity_name = self._entity_names[sensor_info['ID']] = get_entity_name()
|
||||
self.orion_api.create_entity(entity_name, fiware_service=fiware_service)
|
||||
self._entity_names[sensor_info['ID']] = self._get_entity_name(
|
||||
sensor_info)
|
||||
fiware_service, entity_name = self._entity_names[sensor_info['ID']]
|
||||
self.orion_api.create_entity(
|
||||
entity_name, fiware_service=fiware_service)
|
||||
self.orion_api.update_attributes(
|
||||
entity_name, sensor_data, fiware_service=fiware_service)
|
||||
|
||||
self.orion_api.update_attributes(entity_name, sensor_data, fiware_service=fiware_service)
|
||||
def _new_actuator(self, actuator_info):
|
||||
try:
|
||||
fiware_service, entity_name = self._entity_names[actuator_info[
|
||||
'ID']]
|
||||
except KeyError:
|
||||
self._entity_names[actuator_info['ID']] = self._get_entity_name(
|
||||
actuator_info)
|
||||
fiware_service, entity_name = self._entity_names[actuator_info[
|
||||
'ID']]
|
||||
self.logger.info("Create new Entity {} on Fiware Service {}".format(
|
||||
entity_name, fiware_service))
|
||||
self.orion_api.create_entity(
|
||||
entity_name, fiware_service=fiware_service)
|
||||
data_dummy = {
|
||||
'v': "none",
|
||||
'bn': "none",
|
||||
'n': "cmd",
|
||||
'u': "none",
|
||||
't': "none"
|
||||
}
|
||||
self.orion_api.update_attributes(
|
||||
entity_name, data_dummy, fiware_service=fiware_service)
|
||||
|
||||
subscriptionId = self.orion_api.subscribe(
|
||||
entity_name, fiware_service=fiware_service)
|
||||
self._subscriptions[subscriptionId] = actuator_info['ID']
|
||||
|
Loading…
x
Reference in New Issue
Block a user