Enhance subscription handling in orion ae + adds possibility to guess accumulate address in orion ae

This commit is contained in:
Christian Klopp 2018-06-12 14:25:12 +02:00
parent 81ba52b953
commit e71dfd15e8
6 changed files with 73 additions and 22 deletions

View File

@ -18,5 +18,6 @@
"labels": ["openmtc:sensor_data:temperature", "openmtc:sensor_data:humidity"], "labels": ["openmtc:sensor_data:temperature", "openmtc:sensor_data:humidity"],
"interval": 10, "interval": 10,
"orion_host": "http://localhost:1026", "orion_host": "http://localhost:1026",
"orion_api": "v2" "orion_api": "v2",
"accumulate_address": "http://localhost:8080"
} }

View File

@ -10,9 +10,10 @@ ORIGINATOR_PRE=${ORIGINATOR_PRE-"//openmtc.org/in-cse-1"}
SSL_CRT=${SSL_CRT-"/etc/openmtc/certs/orioncontextbroker.cert.pem"} SSL_CRT=${SSL_CRT-"/etc/openmtc/certs/orioncontextbroker.cert.pem"}
SSL_KEY=${SSL_KEY-"/etc/openmtc/certs/orioncontextbroker.key.pem"} SSL_KEY=${SSL_KEY-"/etc/openmtc/certs/orioncontextbroker.key.pem"}
SSL_CA=${SSL_CA-"/etc/openmtc/certs/ca-chain.cert.pem"} SSL_CA=${SSL_CA-"/etc/openmtc/certs/ca-chain.cert.pem"}
LABELS=${LABELS-'["openmtc:sensor_data"]'}
ORION_HOST=${ORION_HOST-"http://localhost:1026"} ORION_HOST=${ORION_HOST-"http://localhost:1026"}
ORION_API=${ORION_API-"v2"} ORION_API=${ORION_API-"v2"}
ACCUMULATE_ADDRESS=${ACCUMULATE_ADDRESS-"http://localhost:8080"} ACCUMULATE_ADDRESS=${ACCUMULATE_ADDRESS}
LABELS=${LABELS-'["openmtc:sensor_data"]'} LABELS=${LABELS-'["openmtc:sensor_data"]'}
# defaults logging # defaults logging
@ -46,6 +47,7 @@ JQ_STRING=${JQ_STRING}' |
.cse_base = "'${CSE_BASE}'" | .cse_base = "'${CSE_BASE}'" |
.poas = '${POAS}' | .poas = '${POAS}' |
.originator_pre = "'${ORIGINATOR_PRE}'" | .originator_pre = "'${ORIGINATOR_PRE}'" |
.labels = '${LABELS}' |
.orion_host = "'${ORION_HOST}'" | .orion_host = "'${ORION_HOST}'" |
.orion_api = "'${ORION_API}'" | .orion_api = "'${ORION_API}'" |
.accumulate_address = "'${ACCUMULATE_ADDRESS}'" | .accumulate_address = "'${ACCUMULATE_ADDRESS}'" |

View File

@ -1,11 +1,11 @@
{ {
"name": "OrionContextBroker", "name": "OrionContextBroker",
"ep": "http://localhost:8000", "ep": "http://localhost:18000",
"cse_base": "onem2m", "cse_base": "onem2m",
"poas": [ "poas": [
"http://auto:25396" "http://auto:25396"
], ],
"originator_pre": "//openmtc.org/mn-cse-1", "originator_pre": "//openmtc.org/in-cse-1",
"ssl_certs": { "ssl_certs": {
"cert_file": "/etc/openmtc/certs/orioncontextbroker.cert.pem", "cert_file": "/etc/openmtc/certs/orioncontextbroker.cert.pem",
"key_file": "/etc/openmtc/certs/orioncontextbroker.key.pem", "key_file": "/etc/openmtc/certs/orioncontextbroker.key.pem",
@ -18,5 +18,6 @@
"labels": ["openmtc:sensor_data"], "labels": ["openmtc:sensor_data"],
"interval": 10, "interval": 10,
"orion_host": "http://localhost:1026", "orion_host": "http://localhost:1026",
"orion_api": "v2" "orion_api": "v2",
"accumulate_address": "http://localhost:8080"
} }

View File

@ -51,7 +51,7 @@ orion_host = get_value("orion_host", (unicode, str), default_orion_host, args,
orion_api = get_value("orion_api", (unicode, str), default_orion_api, args, orion_api = get_value("orion_api", (unicode, str), default_orion_api, args,
config) config)
accumulate_address = get_value("accumulate_address", (unicode, str), accumulate_address = get_value("accumulate_address", (unicode, str),
default_accumulate_address, args, config) default_accumulate_address, args, config)
# start # start
app = OrionContextBroker( app = OrionContextBroker(

View File

@ -2,12 +2,15 @@ try:
from urllib.parse import urljoin from urllib.parse import urljoin
except ImportError: except ImportError:
from urlparse import urljoin from urlparse import urljoin
import logging
import requests import requests
import json
from futile.logging import LoggerMixin from futile.logging import LoggerMixin
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
class OrionAPI(LoggerMixin): class OrionAPI(LoggerMixin):
def __init__(self, def __init__(self,
@ -152,6 +155,12 @@ class OrionAPI(LoggerMixin):
# return the subscriptionId # return the subscriptionId
return response["headers"]["Location"].split('/')[3] return response["headers"]["Location"].split('/')[3]
def unsubscribe(self, subscription_id, fiware_service=""):
self._request(self.host + "/v2/subscriptions/" + subscription_id,
method='delete',
headers={"fiware-service": fiware_service},
raw=True)
def _request(self, def _request(self,
url, url,
method='get', method='get',

View File

@ -1,6 +1,11 @@
import re import re
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from flask import Flask, Response, request from flask import Flask, Response, request
from gevent import wsgi from gevent.pywsgi import WSGIServer
from openmtc_app.onem2m import ResourceManagementXAE from openmtc_app.onem2m import ResourceManagementXAE
from orion_api import OrionAPI from orion_api import OrionAPI
@ -10,8 +15,8 @@ class OrionContextBroker(ResourceManagementXAE):
def __init__(self, def __init__(self,
orion_host="http://localhost:1026", orion_host="http://localhost:1026",
orion_api="v2", orion_api="v2",
labels=["openmtc:sensor_data"], labels=None,
accumulate_address="http://localhost:8080", accumulate_address=None,
*args, *args,
**kw): **kw):
super(OrionContextBroker, self).__init__(*args, **kw) super(OrionContextBroker, self).__init__(*args, **kw)
@ -19,11 +24,19 @@ class OrionContextBroker(ResourceManagementXAE):
self.labels = {labels} self.labels = {labels}
elif hasattr(labels, '__iter__'): elif hasattr(labels, '__iter__'):
self.labels = set(labels) self.labels = set(labels)
elif labels is None:
self.labels = ["openmtc:sensor_data"]
else: else:
self.labels = None self.labels = None
self._entity_names = {} self._entity_names = {}
self._subscriptions = {} self._subscription_endpoints = {}
self.logger.critical(accumulate_address) self._subscription_services = {}
# accumulate address
if not accumulate_address:
accumulate_address = "http://" + self._get_auto_host(orion_host) + ":8080"
# Orion API
self.orion_api = OrionAPI( self.orion_api = OrionAPI(
orion_host=orion_host, orion_host=orion_host,
api_version=orion_api, api_version=orion_api,
@ -36,22 +49,45 @@ class OrionContextBroker(ResourceManagementXAE):
'process_notification', 'process_notification',
self.process_notification, self.process_notification,
methods=["POST"]) methods=["POST"])
accumulate_ip, accumulate_port = accumulate_address.split('//')[ accumulate_ip, accumulate_port = urlparse(accumulate_address).netloc.rsplit(':', 1)
1].split(':') self.server = WSGIServer(("0.0.0.0", int(accumulate_port)),
self.server = wsgi.WSGIServer(("0.0.0.0", int(accumulate_port)), self.app)
self.app)
self.server.start() self.server.start()
@staticmethod
def _get_auto_host(ep):
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
netloc = urlparse(ep).netloc.split(':')
s.connect((netloc[0], int(netloc[1])))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
return host
def process_notification(self): def process_notification(self):
self.logger.debug("Got from Subscription {}".format(request.json)) self.logger.debug("Got from Subscription {}".format(request.json))
actuator = self.get_resource( try:
self._subscriptions[request.json["subscriptionId"]]) actuator = self.get_resource(
self.push_content(actuator, request.json["data"][0]["cmd"]["value"]) self._subscription_endpoints[request.json["subscriptionId"]]
)
except KeyError:
# ignore not deleted old subscriptions
pass
else:
self.push_content(actuator, request.json["data"][0]["cmd"]["value"])
return Response(status=200, headers={}) return Response(status=200, headers={})
def _on_register(self): def _on_register(self):
self._discover_openmtc_ipe_entities() self._discover_openmtc_ipe_entities()
def _on_shutdown(self):
for subscription_id, fiware_service in self._subscription_services.items():
self.orion_api.unsubscribe(subscription_id, fiware_service)
def _sensor_filter(self, sensor_info): def _sensor_filter(self, sensor_info):
if self.labels: if self.labels:
return len(self.labels.intersection( return len(self.labels.intersection(
@ -59,7 +95,8 @@ class OrionContextBroker(ResourceManagementXAE):
else: else:
return True return True
def _get_entity_name(self, sensor_info): @staticmethod
def _get_entity_name(sensor_info):
device_type = "sensor" if sensor_info.get("sensor_labels", device_type = "sensor" if sensor_info.get("sensor_labels",
None) else "actuator" None) else "actuator"
try: try:
@ -113,6 +150,7 @@ class OrionContextBroker(ResourceManagementXAE):
self.orion_api.update_attributes( self.orion_api.update_attributes(
entity_name, data_dummy, fiware_service=fiware_service) entity_name, data_dummy, fiware_service=fiware_service)
subscriptionId = self.orion_api.subscribe( subscription_id = self.orion_api.subscribe(
entity_name, fiware_service=fiware_service) entity_name, fiware_service=fiware_service)
self._subscriptions[subscriptionId] = actuator_info['ID'] self._subscription_endpoints[subscription_id] = actuator_info['ID']
self._subscription_services[subscription_id] = fiware_service