updates for the FIWARE integration setup

This commit is contained in:
Ronald Steinke 2018-05-24 13:35:06 +02:00
parent 3a3a147853
commit ca667df371
6 changed files with 93 additions and 39 deletions

View File

@ -13,7 +13,7 @@ SSL_CA=${SSL_CA-"/etc/openmtc/certs/ca-chain.cert.pem"}
LABELS=${LABELS-'["openmtc:sensor_data"]'}
ORION_HOST=${ORION_HOST-"http://localhost:1026"}
ORION_API=${ORION_API-"v2"}
ACCUMULATE_ADDRESS=${ACCUMULATE_ADDRESS-"http://localhost:8080"}
ACCUMULATE_ADDRESS=${ACCUMULATE_ADDRESS}
LABELS=${LABELS-'["openmtc:sensor_data"]'}
# defaults logging

View File

@ -2,12 +2,15 @@ try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
import logging
import requests
import json
from futile.logging import LoggerMixin
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
class OrionAPI(LoggerMixin):
def __init__(self,
@ -152,6 +155,12 @@ class OrionAPI(LoggerMixin):
# return the subscriptionId
return response["headers"]["Location"].split('/')[3]
def unsubscribe(self, subscription_id, fiware_service=""):
self._request(self.host + "/v2/subscriptions/" + subscription_id,
method='delete',
headers={"fiware-service": fiware_service},
raw=True)
def _request(self,
url,
method='get',

View File

@ -1,4 +1,9 @@
import re
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from flask import Flask, Response, request
from gevent.pywsgi import WSGIServer
@ -11,7 +16,7 @@ class OrionContextBroker(ResourceManagementXAE):
orion_host="http://localhost:1026",
orion_api="v2",
labels=None,
accumulate_address="http://localhost:8080",
accumulate_address=None,
*args,
**kw):
super(OrionContextBroker, self).__init__(*args, **kw)
@ -24,7 +29,14 @@ class OrionContextBroker(ResourceManagementXAE):
else:
self.labels = None
self._entity_names = {}
self._subscriptions = {}
self._subscription_endpoints = {}
self._subscription_services = {}
# accumulate address
if not accumulate_address:
accumulate_address = "http://" + self._get_auto_host(orion_host) + ":8080"
# Orion API
self.orion_api = OrionAPI(
orion_host=orion_host,
api_version=orion_api,
@ -37,22 +49,45 @@ class OrionContextBroker(ResourceManagementXAE):
'process_notification',
self.process_notification,
methods=["POST"])
accumulate_ip, accumulate_port = accumulate_address.split('//')[
1].split(':')
accumulate_ip, accumulate_port = urlparse(accumulate_address).netloc.rsplit(':', 1)
self.server = WSGIServer(("0.0.0.0", int(accumulate_port)),
self.app)
self.server.start()
@staticmethod
def _get_auto_host(ep):
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
netloc = urlparse(ep).netloc.split(':')
s.connect((netloc[0], int(netloc[1])))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
return host
def process_notification(self):
self.logger.debug("Got from Subscription {}".format(request.json))
actuator = self.get_resource(
self._subscriptions[request.json["subscriptionId"]])
self.push_content(actuator, request.json["data"][0]["cmd"]["value"])
try:
actuator = self.get_resource(
self._subscription_endpoints[request.json["subscriptionId"]]
)
except KeyError:
# ignore not deleted old subscriptions
pass
else:
self.push_content(actuator, request.json["data"][0]["cmd"]["value"])
return Response(status=200, headers={})
def _on_register(self):
self._discover_openmtc_ipe_entities()
def _on_shutdown(self):
for subscription_id, fiware_service in self._subscription_services.items():
self.orion_api.unsubscribe(subscription_id, fiware_service)
def _sensor_filter(self, sensor_info):
if self.labels:
return len(self.labels.intersection(
@ -117,4 +152,5 @@ class OrionContextBroker(ResourceManagementXAE):
subscription_id = self.orion_api.subscribe(
entity_name, fiware_service=fiware_service)
self._subscriptions[subscription_id] = actuator_info['ID']
self._subscription_endpoints[subscription_id] = actuator_info['ID']
self._subscription_services[subscription_id] = fiware_service

View File

@ -25,7 +25,7 @@ class CUL868Coordinator(LoggerMixin):
PROTOCOL_FS20 = "F"
PROTOCOL_HMS = "H"
def __init__(self, device="/dev/ttyACM1"):
def __init__(self, device="/dev/ttyACM1", sim=True):
super(CUL868Coordinator, self).__init__()
self.running = False
self.device = device
@ -37,6 +37,7 @@ class CUL868Coordinator(LoggerMixin):
"H": HMSParser()
}
self.sim = sim
self.sim_parsers = {
"K": SIMParser(),
# "E": SIMParser(),
@ -140,10 +141,16 @@ class CUL868Coordinator(LoggerMixin):
self.logger.debug("Command sent")
def switch_on(self, house_code, device_code):
self._send_fs20(house_code, device_code, self.COMMAND_ON)
self.logger.info("Switch on %s-%s" % (house_code, device_code))
if not self.sim:
self._send_fs20(house_code, device_code, self.COMMAND_ON)
def switch_off(self, house_code, device_code):
self._send_fs20(house_code, device_code, self.COMMAND_OFF)
self.logger.info("Switch off %s-%s" % (house_code, device_code))
if not self.sim:
self._send_fs20(house_code, device_code, self.COMMAND_OFF)
def toggle(self, house_code, device_code):
self._send_fs20(house_code, device_code, self.COMMAND_TOGGLE)
self.logger.info("Toggle %s-%s" % (house_code, device_code))
if not self.sim:
self._send_fs20(house_code, device_code, self.COMMAND_TOGGLE)

View File

@ -42,7 +42,7 @@ class CUL868IPE(XAE):
self._old_fs20_values = {}
self.cul = CUL868Coordinator(device=device)
self.cul = CUL868Coordinator(device=device, sim=sim)
for d in map(lambda s: CULDevice(*s.split(":")[:2]), cul_devices):
if d.type == "fs20":

View File

@ -833,21 +833,22 @@ class ResourceManagementXAE(XAE):
except IndexError:
continue
sensor = self.get_resource(sensor_path)
sensor_info = self._discovered_sensors[sensor_path] = {
'ID': sensor_path,
'dev_name': dev_path.split('/')[-1],
'cse_id': sensor_path.split('/')[1],
'dev_labels': self._discovered_devices[dev_path].labels,
'sensor_labels': sensor.labels,
'type': 'sensor',
'n': None,
'u': None,
'blacklisted': False
}
if self._sensor_filter(sensor_info):
self._handle_new_sensor(sensor_path)
else:
self._discovered_sensors[sensor_path]['blacklisted'] = True
if sensor:
sensor_info = self._discovered_sensors[sensor_path] = {
'ID': sensor_path,
'dev_name': dev_path.split('/')[-1],
'cse_id': sensor_path.split('/')[1],
'dev_labels': self._discovered_devices[dev_path].labels,
'sensor_labels': sensor.labels,
'type': 'sensor',
'n': None,
'u': None,
'blacklisted': False
}
if self._sensor_filter(sensor_info):
self._handle_new_sensor(sensor_path)
else:
self._discovered_sensors[sensor_path]['blacklisted'] = True
def _handle_new_sensor(self, sensor_path):
latest = self.get_resource(sensor_path + '/latest')
@ -893,15 +894,16 @@ class ResourceManagementXAE(XAE):
except IndexError:
continue
actuator = self.get_resource(actuator_path)
actuator_info = self._discovered_actuators[actuator_path] = {
'ID': actuator_path,
'dev_name': dev_path.split('/')[-1],
'cse_id': actuator_path.split('/')[1],
'dev_labels': self._discovered_devices[dev_path].labels,
'actuator_labels': actuator.labels,
'type': 'actuator'
}
self._new_actuator(actuator_info)
if actuator:
actuator_info = self._discovered_actuators[actuator_path] = {
'ID': actuator_path,
'dev_name': dev_path.split('/')[-1],
'cse_id': actuator_path.split('/')[1],
'dev_labels': self._discovered_devices[dev_path].labels,
'actuator_labels': actuator.labels,
'type': 'actuator'
}
self._new_actuator(actuator_info)
@abstractmethod
def _sensor_data_cb(self, sensor_info, sensor_data):