From e71dfd15e812d1a284b1f7c4dea6fe65d43e70fc Mon Sep 17 00:00:00 2001 From: Christian Klopp Date: Tue, 12 Jun 2018 14:25:12 +0200 Subject: [PATCH] Enhance subscription handling in orion ae + adds possibility to guess accumulate address in orion ae --- apps/OrionContextBroker/config.json | 3 +- .../configure-orioncontextbroker-and-start | 4 +- .../etc/conf/config.json.dist | 7 +- .../src/orioncontextbroker/__main__.py | 2 +- .../src/orioncontextbroker/orion_api.py | 11 ++- .../orion_context_broker.py | 68 +++++++++++++++---- 6 files changed, 73 insertions(+), 22 deletions(-) diff --git a/apps/OrionContextBroker/config.json b/apps/OrionContextBroker/config.json index 1f90c83..9c54e0c 100644 --- a/apps/OrionContextBroker/config.json +++ b/apps/OrionContextBroker/config.json @@ -18,5 +18,6 @@ "labels": ["openmtc:sensor_data:temperature", "openmtc:sensor_data:humidity"], "interval": 10, "orion_host": "http://localhost:1026", - "orion_api": "v2" + "orion_api": "v2", + "accumulate_address": "http://localhost:8080" } diff --git a/apps/OrionContextBroker/docker/configure-orioncontextbroker-and-start b/apps/OrionContextBroker/docker/configure-orioncontextbroker-and-start index d7b73d3..8822f25 100755 --- a/apps/OrionContextBroker/docker/configure-orioncontextbroker-and-start +++ b/apps/OrionContextBroker/docker/configure-orioncontextbroker-and-start @@ -10,9 +10,10 @@ ORIGINATOR_PRE=${ORIGINATOR_PRE-"//openmtc.org/in-cse-1"} SSL_CRT=${SSL_CRT-"/etc/openmtc/certs/orioncontextbroker.cert.pem"} SSL_KEY=${SSL_KEY-"/etc/openmtc/certs/orioncontextbroker.key.pem"} SSL_CA=${SSL_CA-"/etc/openmtc/certs/ca-chain.cert.pem"} +LABELS=${LABELS-'["openmtc:sensor_data"]'} ORION_HOST=${ORION_HOST-"http://localhost:1026"} ORION_API=${ORION_API-"v2"} -ACCUMULATE_ADDRESS=${ACCUMULATE_ADDRESS-"http://localhost:8080"} +ACCUMULATE_ADDRESS=${ACCUMULATE_ADDRESS} LABELS=${LABELS-'["openmtc:sensor_data"]'} # defaults logging @@ -46,6 +47,7 @@ JQ_STRING=${JQ_STRING}' | .cse_base = "'${CSE_BASE}'" | .poas = '${POAS}' | .originator_pre = "'${ORIGINATOR_PRE}'" | + .labels = '${LABELS}' | .orion_host = "'${ORION_HOST}'" | .orion_api = "'${ORION_API}'" | .accumulate_address = "'${ACCUMULATE_ADDRESS}'" | diff --git a/apps/OrionContextBroker/etc/conf/config.json.dist b/apps/OrionContextBroker/etc/conf/config.json.dist index 82f4965..cba9e3a 100644 --- a/apps/OrionContextBroker/etc/conf/config.json.dist +++ b/apps/OrionContextBroker/etc/conf/config.json.dist @@ -1,11 +1,11 @@ { "name": "OrionContextBroker", - "ep": "http://localhost:8000", + "ep": "http://localhost:18000", "cse_base": "onem2m", "poas": [ "http://auto:25396" ], - "originator_pre": "//openmtc.org/mn-cse-1", + "originator_pre": "//openmtc.org/in-cse-1", "ssl_certs": { "cert_file": "/etc/openmtc/certs/orioncontextbroker.cert.pem", "key_file": "/etc/openmtc/certs/orioncontextbroker.key.pem", @@ -18,5 +18,6 @@ "labels": ["openmtc:sensor_data"], "interval": 10, "orion_host": "http://localhost:1026", - "orion_api": "v2" + "orion_api": "v2", + "accumulate_address": "http://localhost:8080" } diff --git a/apps/OrionContextBroker/src/orioncontextbroker/__main__.py b/apps/OrionContextBroker/src/orioncontextbroker/__main__.py index 6fbf255..a68ba4f 100644 --- a/apps/OrionContextBroker/src/orioncontextbroker/__main__.py +++ b/apps/OrionContextBroker/src/orioncontextbroker/__main__.py @@ -51,7 +51,7 @@ orion_host = get_value("orion_host", (unicode, str), default_orion_host, args, orion_api = get_value("orion_api", (unicode, str), default_orion_api, args, config) accumulate_address = get_value("accumulate_address", (unicode, str), - default_accumulate_address, args, config) + default_accumulate_address, args, config) # start app = OrionContextBroker( diff --git a/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py b/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py index f4fc935..d477f74 100644 --- a/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py +++ b/apps/OrionContextBroker/src/orioncontextbroker/orion_api.py @@ -2,12 +2,15 @@ try: from urllib.parse import urljoin except ImportError: from urlparse import urljoin +import logging import requests -import json from futile.logging import LoggerMixin +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + class OrionAPI(LoggerMixin): def __init__(self, @@ -152,6 +155,12 @@ class OrionAPI(LoggerMixin): # return the subscriptionId return response["headers"]["Location"].split('/')[3] + def unsubscribe(self, subscription_id, fiware_service=""): + self._request(self.host + "/v2/subscriptions/" + subscription_id, + method='delete', + headers={"fiware-service": fiware_service}, + raw=True) + def _request(self, url, method='get', diff --git a/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py b/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py index c92d2ea..7a1c946 100644 --- a/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py +++ b/apps/OrionContextBroker/src/orioncontextbroker/orion_context_broker.py @@ -1,6 +1,11 @@ import re +try: + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse + from flask import Flask, Response, request -from gevent import wsgi +from gevent.pywsgi import WSGIServer from openmtc_app.onem2m import ResourceManagementXAE from orion_api import OrionAPI @@ -10,8 +15,8 @@ class OrionContextBroker(ResourceManagementXAE): def __init__(self, orion_host="http://localhost:1026", orion_api="v2", - labels=["openmtc:sensor_data"], - accumulate_address="http://localhost:8080", + labels=None, + accumulate_address=None, *args, **kw): super(OrionContextBroker, self).__init__(*args, **kw) @@ -19,11 +24,19 @@ class OrionContextBroker(ResourceManagementXAE): self.labels = {labels} elif hasattr(labels, '__iter__'): self.labels = set(labels) + elif labels is None: + self.labels = ["openmtc:sensor_data"] else: self.labels = None self._entity_names = {} - self._subscriptions = {} - self.logger.critical(accumulate_address) + self._subscription_endpoints = {} + self._subscription_services = {} + + # accumulate address + if not accumulate_address: + accumulate_address = "http://" + self._get_auto_host(orion_host) + ":8080" + + # Orion API self.orion_api = OrionAPI( orion_host=orion_host, api_version=orion_api, @@ -36,22 +49,45 @@ class OrionContextBroker(ResourceManagementXAE): 'process_notification', self.process_notification, methods=["POST"]) - accumulate_ip, accumulate_port = accumulate_address.split('//')[ - 1].split(':') - self.server = wsgi.WSGIServer(("0.0.0.0", int(accumulate_port)), - self.app) + accumulate_ip, accumulate_port = urlparse(accumulate_address).netloc.rsplit(':', 1) + self.server = WSGIServer(("0.0.0.0", int(accumulate_port)), + self.app) self.server.start() + @staticmethod + def _get_auto_host(ep): + try: + import socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + netloc = urlparse(ep).netloc.split(':') + s.connect((netloc[0], int(netloc[1]))) + host = s.getsockname()[0] + s.close() + except: + host = "127.0.0.1" + + return host + def process_notification(self): self.logger.debug("Got from Subscription {}".format(request.json)) - actuator = self.get_resource( - self._subscriptions[request.json["subscriptionId"]]) - self.push_content(actuator, request.json["data"][0]["cmd"]["value"]) + try: + actuator = self.get_resource( + self._subscription_endpoints[request.json["subscriptionId"]] + ) + except KeyError: + # ignore not deleted old subscriptions + pass + else: + self.push_content(actuator, request.json["data"][0]["cmd"]["value"]) return Response(status=200, headers={}) def _on_register(self): self._discover_openmtc_ipe_entities() + def _on_shutdown(self): + for subscription_id, fiware_service in self._subscription_services.items(): + self.orion_api.unsubscribe(subscription_id, fiware_service) + def _sensor_filter(self, sensor_info): if self.labels: return len(self.labels.intersection( @@ -59,7 +95,8 @@ class OrionContextBroker(ResourceManagementXAE): else: return True - def _get_entity_name(self, sensor_info): + @staticmethod + def _get_entity_name(sensor_info): device_type = "sensor" if sensor_info.get("sensor_labels", None) else "actuator" try: @@ -113,6 +150,7 @@ class OrionContextBroker(ResourceManagementXAE): self.orion_api.update_attributes( entity_name, data_dummy, fiware_service=fiware_service) - subscriptionId = self.orion_api.subscribe( + subscription_id = self.orion_api.subscribe( entity_name, fiware_service=fiware_service) - self._subscriptions[subscriptionId] = actuator_info['ID'] + self._subscription_endpoints[subscription_id] = actuator_info['ID'] + self._subscription_services[subscription_id] = fiware_service