overworks MQTT functionality

This commit is contained in:
Ronald Steinke 2018-02-13 09:38:06 +01:00
parent 9ad44d82fe
commit e2475f64f1
13 changed files with 466 additions and 236 deletions

View File

@ -144,7 +144,10 @@ class OneM2MHTTPClient(OneM2MClient):
for name, val in filter_criteria.get_values(True).iteritems()
})
path = normalize_path(onem2m_request.to)
if onem2m_request.ae_notifying:
path = ''
else:
path = normalize_path(onem2m_request.to)
if params:
path += '?' + urllib.urlencode(params, True)

View File

@ -1,3 +1,5 @@
from datetime import datetime
from aplus import (
Promise,
)
@ -11,7 +13,7 @@ from ..exc import (
ERROR_MIN,
CSEValueError,
CSEError,
)
CSETargetNotReachable)
from ..serializer.util import (
decode_onem2m_content,
encode_onem2m_content,
@ -31,36 +33,47 @@ from simplejson import (
)
from socket import error as SocketError
from urlparse import urlparse
from openmtc_onem2m.util import split_onem2m_address
#: Dictionary mapping supported schemes to port numbers
portmap = {
'mqtt': 1883,
'mqtts': 8883,
# NB: The correct (i.e. registered with IANA) service-name for SSL/TLS-wrapped MQTT is 'secure-mqtt' in an effort to
# prevent confusion with MQTT-S/N. But as the entire world seems to insist on using 'mqtts' (including TS 0010,
# sec. 6.6) ... We are supporting both names here for maximum compliance and robustness.
# NB: The correct (i.e. registered with IANA) service-name for SSL/TLS-wrapped MQTT is
# 'secure-mqtt' in an effort to prevent confusion with MQTT-S/N. But as the entire world seems
# to insist on using 'mqtts' (including TS 0010, sec. 6.6) ... We are supporting both names here
# for maximum compliance and robustness.
'secure-mqtt': 8883,
}
MQTT_QOS_LEVEL = 1
MQTT_RESPONSE_TIMEOUT = 1
_clients = LRUCache(threadsafe=False)
def get_client(m2m_ep, use_xml=False, client_id=None, handle_request_func=None):
def get_client(m2m_ep, use_xml=False, client_id=None, handle_request_func=None,
ca_certs=None, cert_file=None, key_file=None, insecure=False):
"""
:param string m2m_ep:
:param boolean use_xml:
:param string client_id:
:param fun handle_request_func:
:param string ca_certs:
:param string cert_file:
:param string key_file:
:param string insecure:
:return OneM2MMQTTClient:
"""
try:
return _clients[(m2m_ep, use_xml)]
return _clients[(m2m_ep.split('#')[0], use_xml)]
except KeyError:
_clients[(m2m_ep, use_xml)] = OneM2MMQTTClient(m2m_ep, use_xml, client_id, handle_request_func)
return _clients[(m2m_ep, use_xml)]
client = _clients[(m2m_ep.split('#')[0], use_xml)] = OneM2MMQTTClient(
m2m_ep, use_xml, client_id, handle_request_func, ca_certs=ca_certs,
cert_file=cert_file, key_file=key_file, insecure=insecure
)
return client
class OneM2MMQTTClient(OneM2MClient):
@ -87,6 +100,7 @@ class OneM2MMQTTClient(OneM2MClient):
'gid',
'drt',
'to',
'fc',
])
__response_fields = frozenset([
@ -118,10 +132,23 @@ class OneM2MMQTTClient(OneM2MClient):
OneM2MMQTTClient._mqtt_mask(receiver),
])
@staticmethod
def _get_client_id_from_originator(originator):
_, cse_id, ae_id = split_onem2m_address(originator)
if cse_id:
client_id = cse_id[1:] + ('/' + ae_id if ae_id else '')
elif ae_id:
client_id = ae_id
else:
# TODO: make this configurable
client_id = 'ae0'
return client_id
def attach_callback(self):
"""
Wrapper function to attach callback handlers to the MQTT client. Functions attached in this manner are expected
to have the same name as the handler they seek to implement.
Wrapper function to attach callback handlers to the MQTT client. Functions attached in this
manner are expected to have the same name as the handler they seek to implement.
:return fun:
"""
def decorator(func):
@ -131,8 +158,8 @@ class OneM2MMQTTClient(OneM2MClient):
return wrapper
return decorator
def __init__(self, m2m_ep, _, client_id, handle_request_func=None, subscribe_sys_topics=False, ca_certs=None,
cert_file=None, key_file=None, insecure=False):
def __init__(self, m2m_ep, _, client_id, handle_request_func=None, subscribe_sys_topics=False,
ca_certs=None, cert_file=None, key_file=None, insecure=False):
"""
:param str m2m_ep:
:param bool _:
@ -143,9 +170,20 @@ class OneM2MMQTTClient(OneM2MClient):
"""
super(OneM2MMQTTClient, self).__init__()
parsed_url = urlparse(m2m_ep)
self._target_id = parsed_url.fragment
self._default_target_id = parsed_url.fragment
self._encode = JSONEncoder().encode
def _default(x):
if isinstance(x, datetime):
try:
isoformat = x.isoformat
except AttributeError:
raise TypeError("%s (%s)" % (x, type(x)))
return isoformat()
else:
return x
self._encode = JSONEncoder(default=_default).encode
self._decode = JSONDecoder().decode
self._handle_request_func = handle_request_func
@ -157,6 +195,8 @@ class OneM2MMQTTClient(OneM2MClient):
import random
import string
client_id = ''.join(random.sample(string.letters, 16))
else:
client_id = self._get_client_id_from_originator(client_id)
self._client = mqtt.Client(
clean_session=False,
@ -166,142 +206,28 @@ class OneM2MMQTTClient(OneM2MClient):
]),
)
self._client_id = client_id
@self.attach_callback()
def on_connect(client, _, rc):
def on_connect(client, userdata, flags_dict, rc):
"""
:param mqtt.Client client:
:param All userdata:
:param dict flags_dict:
:param integer rc:
:return void:
"""
if not rc == mqtt.CONNACK_ACCEPTED:
raise ConnectionFailed(mqtt.connack_string(rc))
def request_callback(client, _, message):
"""
Catch requests and
:param mqtt.Client client:
:param All _:
:param mqtt.MQTTMessage message:
:return void:
"""
originator = message.topic.split('/')[3]
try:
request = self._decode(message.payload)
except JSONDecodeError as e:
self.logger.warn(
'Got rubbish request from client %s: %s'
% (originator, e.message, )
)
return
try:
if request['rqi'] in self._processed_request_ids:
self.logger.info('Request %s already processed; discarding duplicate.' % (request['rqi'], ))
return
else:
rqi = request['rqi']
except KeyError:
self.logger.warn(
'Special treatment for special request w/o request id from %s.'
% (originator, )
)
return
try:
request['pc'] = decode_onem2m_content(self._encode(request['pc']), 'application/json')
request['ty'] = type(request['pc'])
except KeyError:
# No content, eh?
request['ty'] = None
self.logger.debug('Decoded JSON request: %s' % (request, ))
op = OneM2MOperation._member_map_.values()[request['op'] - 1]
to = request['to']
del request['op'], request['to']
try:
response = self._handle_request_func(
OneM2MRequest(op, to, **request)
).get()
except OneM2MErrorResponse as response:
self.logger.error('OneM2MError: %s' % (response.message, ))
except CSEError as e:
response = OneM2MErrorResponse(status_code=e.response_status_code, rqi=rqi)
if not response.rqi:
# This really should not happen. No, really, it shouldn't.
self.logger.debug(
'FIXUP! FIXUP! FIXUP! Adding missing request identifier to response: %s'
% (rqi, )
)
response.rqi = rqi
if response.content:
response.content = self._decode(
encode_onem2m_content(response.content, 'application/json', path=response.to)[1]
)
self._publish_message(
self._encode({
k: getattr(response, k) for k in self.__response_fields if getattr(response, k) is not None
}),
self._build_topic(originator, client_id, type='resp'),
)
self._processed_request_ids.append(rqi)
def response_callback(client, _, message):
"""
:param mqtt.Client client:
:param All _:
:param mqtt.MQTTMessage message:
:return:
"""
try:
response = self._decode(message.payload)
except JSONDecodeError as e:
self.logger.error('Discarding response w/ damaged payload: %s', (e.message, ))
return
promise_key = (message.topic.split('/')[4], response['rqi'])
try:
p = self._request_promises[promise_key]
except KeyError:
self.logger.debug(
'Response %s could not be mapped to a request. Discarding.'
% (response['rqi'], )
)
return
try:
response['pc'] = decode_onem2m_content(self._encode(response['pc']), 'application/json')
except KeyError:
pass
except CSEValueError as e:
self.logger.error(
'Content of response %s could not be parsed, throwing on the trash heap: %s'
% (response['rqi'], e.message)
)
p.reject(e)
status_code = response['rsc']
del response['rsc']
if status_code >= ERROR_MIN:
p.reject(OneM2MErrorResponse(status_code, **response))
else:
p.fulfill(OneM2MResponse(status_code, **response))
topics = [
self._build_topic(originator=client_id, receiver='#', type='resp'),
]
client.message_callback_add(topics[0], response_callback)
client.message_callback_add(topics[0], self._response_callback)
if self._handle_request_func is not None:
topics.append(self._build_topic(receiver=client_id) + '/+')
client.message_callback_add(topics[1], request_callback)
client.message_callback_add(topics[1], self._request_callback)
if subscribe_sys_topics:
topics.append('$SYS/#')
@ -349,8 +275,9 @@ class OneM2MMQTTClient(OneM2MClient):
if parsed_url.username:
self._client.username_pw_set(parsed_url.username, parsed_url.password)
self._client.tls_set(ca_certs=ca_certs, certfile=cert_file, keyfile=key_file)
self._client.tls_insecure_set(insecure)
if parsed_url.scheme != 'mqtt':
self._client.tls_set(ca_certs=ca_certs, certfile=cert_file, keyfile=key_file)
self._client.tls_insecure_set(insecure)
try:
self._client.connect(
@ -364,15 +291,172 @@ class OneM2MMQTTClient(OneM2MClient):
try:
while self._client.loop(timeout=0.1) != mqtt.mqtt_cs_disconnecting:
gevent.sleep()
except (KeyboardInterrupt, SystemExit):
except (SystemExit, KeyboardInterrupt, AttributeError):
pass
gevent.spawn(loop)
def _request_callback(self, client, _, message):
"""
Catch requests and
:param mqtt.Client client:
:param All _:
:param mqtt.MQTTMessage message:
:return void:
"""
def handle_request():
originator = message.topic.split('/')[3]
try:
request = self._decode(message.payload)
except JSONDecodeError as e:
self.logger.warn(
'Got rubbish request from client %s: %s'
% (originator, e.message, )
)
return
try:
if request['rqi'] in self._processed_request_ids:
self.logger.info('Request %s already processed; discarding duplicate.' %
(request['rqi'], ))
return
else:
rqi = request['rqi']
except KeyError:
self.logger.warn(
'Special treatment for special request w/o request id from %s.'
% (originator, )
)
return
try:
request['pc'] = decode_onem2m_content(self._encode(request['pc']),
'application/json')
request['ty'] = type(request['pc'])
except KeyError:
# No content, eh?
request['ty'] = None
self.logger.debug('Decoded JSON request: %s' % (request, ))
op = OneM2MOperation._member_map_.values()[request['op'] - 1]
to = request['to']
del request['op'], request['to']
try:
response = self._handle_request_func(
OneM2MRequest(op, to, **request)
)
try:
response = response.get()
except AttributeError:
pass
except OneM2MErrorResponse as response:
self.logger.debug('OneM2MError: %s' % response)
except CSEError as e:
response = OneM2MErrorResponse(status_code=e.response_status_code, rqi=rqi)
if not response.rqi:
# This really should not happen. No, really, it shouldn't.
self.logger.debug(
'FIXUP! FIXUP! FIXUP! Adding missing request identifier to response: %s'
% (rqi, )
)
response.rqi = rqi
if response.content:
sp_id, cse_id, _ = split_onem2m_address(response.to)
response.content = self._decode(
encode_onem2m_content(response.content, 'application/json',
path=sp_id + cse_id)[1]
)
self._publish_message(
self._encode({
k: getattr(response, k) for k in self.__response_fields
if getattr(response, k) is not None
}),
self._build_topic(originator, self._client_id, type='resp'),
)
self._processed_request_ids.append(rqi)
gevent.spawn(handle_request)
def _response_callback(self, client, _, message):
"""
:param mqtt.Client client:
:param All _:
:param mqtt.MQTTMessage message:
:return:
"""
def handle_response():
try:
response = self._decode(message.payload)
except JSONDecodeError as e:
self.logger.error('Discarding response w/ damaged payload: %s', (e.message, ))
return
promise_key = (message.topic.split('/')[4], response['rqi'])
try:
p = self._request_promises[promise_key]
except KeyError:
self.logger.debug(
'Response %s could not be mapped to a request. Discarding.'
% (response['rqi'], )
)
return
try:
response['pc'] = decode_onem2m_content(self._encode(response['pc']),
'application/json')
except KeyError:
pass
except CSEValueError as e:
self.logger.error(
'Content of response %s could not be parsed, throwing on the trash heap: %s'
% (response['rqi'], e.message)
)
p.reject(e)
status_code = response['rsc']
del response['rsc']
if status_code >= ERROR_MIN:
p.reject(OneM2MErrorResponse(status_code, **response))
else:
p.fulfill(OneM2MResponse(status_code, **response))
gevent.spawn(handle_response)
@property
def handle_request_func(self):
return self._handle_request_func
@handle_request_func.setter
def handle_request_func(self, func):
if self._handle_request_func is not None:
raise RuntimeError()
topic = self._build_topic(receiver=self._client_id) + '/+'
self._client.message_callback_add(topic, self._request_callback)
self.logger.debug('Subscribing to topic %s ...' % topic)
self._client.subscribe((str(topic), MQTT_QOS_LEVEL))
self._handle_request_func = func
def _cancel_request(self, promise_key):
if promise_key in self._request_promises:
p = self._request_promises[promise_key]
p.reject(CSETargetNotReachable())
def _publish_message(self, payload, topic):
(rc, mid) = self._client.publish(topic, payload, MQTT_QOS_LEVEL)
if not rc == mqtt.MQTT_ERR_SUCCESS:
self.logger.info('Code %d while sending message %d: %s' % (rc, mid, mqtt.error_string(rc)))
self.logger.info('Code %d while sending message %d: %s' %
(rc, mid, mqtt.error_string(rc)))
def send_onem2m_request(self, request):
"""
@ -381,32 +465,33 @@ class OneM2MMQTTClient(OneM2MClient):
"""
p = Promise()
try:
client_id = request.originator.split('/')[-1]
except (KeyError, AttributeError):
# TODO: make this configurable
client_id = 'ae0'
client_id = self._get_client_id_from_originator(request.originator)
if request.ty and request.op == OneM2MOperation.create:
request.ty = ResourceTypeE[request.resource_type.typename].value
else:
request.ty = None
request.op = 1 + OneM2MOperation._member_map_.keys().index(OneM2MOperation[request.op].name)
if request.pc:
request.pc = self._decode(
encode_onem2m_content(request.pc, 'application/json', path=request.to)[1]
)
try:
if request.to.startswith('//'): # abs CSE
request.to = '/_' + request.to[1:]
elif request.to.startswith('/'): # sp rel CSE
request.to = '/~' + request.to
except AttributeError:
self.logger.error('Could not resolve target id; defaulting to preset')
request.to = '/' + self._target_id
if request.fc:
request.fc = encode_onem2m_content(request.fc, 'application/json', path=request.to)[1]
if request.ty:
request.ty = ResourceTypeE[request.resource_type.typename].value
if self._default_target_id:
target_id = self._default_target_id
else:
_, cse_id, suffix = split_onem2m_address(request.to)
if cse_id:
target_id = cse_id[1:] + ('/' + suffix if request.ae_notifying else '')
else:
raise CSETargetNotReachable()
self.logger.debug('Preparing request for transit: %s' % (request, ))
promises_key = (self._target_id, request.rqi)
promises_key = (OneM2MMQTTClient._mqtt_mask(target_id), request.rqi)
def cleanup(_):
self.logger.debug('Clearing request id %s ...' % (promises_key, ))
@ -416,22 +501,32 @@ class OneM2MMQTTClient(OneM2MClient):
p.addErrback(cleanup)
self._request_promises[promises_key] = p
gevent.spawn_later(MQTT_RESPONSE_TIMEOUT, self._cancel_request, promises_key)
self._publish_message(
self._encode({
str(k): getattr(request, k) for k in self.__request_fields if getattr(request, k) is not None
str(k): getattr(request, k) for k in self.__request_fields
if getattr(request, k) is not None
}),
self._build_topic(client_id, self._target_id) + '/json',
self._build_topic(client_id, target_id) + '/json',
)
return p
def stop(self):
# TODO(rst): cleaning up MQTT clients has to be redesigned
# two scenarios:
# 1) receiving request with handle_func -> easy to maintain for stopping
# 2) send out MQTT request and waiting for a response -> more difficult to handle
# - need to change the idea of the client_id, maybe two per entity
# - different clients per broker, maybe per default target_id as well
if self._client:
self._client.disconnect()
# TODO(sho): this is abominable. But for the time being, there seems to be no elegant solution to this.
# TODO(sho): this is abominable. But for the time being, there seems to be no elegant
# solution to this.
self._client._clean_session = True
# TS 0010, sec. 6.3 mandates a reconnect in order to leave a clean state with the MQTT broker
# TS 0010, sec. 6.3 mandates a reconnect in order to leave a clean state with the MQTT
# broker
self._client.reconnect()
self._client.disconnect()
self._client = None

View File

@ -13,16 +13,22 @@ def _is_persistent(instance):
class OneM2MMapper(BasicMapper):
def __init__(self, cse, originator=None, ca_certs=None, cert_file=None, key_file=None, *args, **kw):
def __init__(self, cse, originator=None, ca_certs=None, cert_file=None, key_file=None,
*args, **kw):
super(OneM2MMapper, self).__init__(*args, **kw)
scheme = urlparse(cse).scheme.lower()
if scheme in ("", "https", "http"):
from openmtc_onem2m.client.http import get_client
self._send_request = get_client(cse, use_xml=False, ca_certs=ca_certs, cert_file=cert_file, key_file=key_file).send_onem2m_request
self._send_request = get_client(
cse, use_xml=False, ca_certs=ca_certs, cert_file=cert_file, key_file=key_file
).send_onem2m_request
elif scheme in ("mqtt", "mqtts", "secure-mqtt"):
from openmtc_onem2m.client.mqtt import get_client
self._send_request = get_client(cse, use_xml=False, client_id=originator).send_onem2m_request
self._send_request = get_client(
cse, use_xml=False, client_id=originator, ca_certs=ca_certs, cert_file=cert_file,
key_file=key_file
).send_onem2m_request
elif scheme == "coap":
raise NotImplementedError
else:
@ -33,7 +39,8 @@ class OneM2MMapper(BasicMapper):
def create(self, path, instance):
instance.__dict__.update({
attribute.name: None for attribute in type(instance).attributes if attribute.accesstype == attribute.RO
attribute.name: None for attribute in type(instance).attributes
if attribute.accesstype == attribute.RO
})
# TODO(rst): add resource_type
@ -86,12 +93,21 @@ class OneM2MMapper(BasicMapper):
return response.content
def get(self, path):
response = self._get_data(path)
def get(self, path, fc=None, **request_options):
response = self._get_data(path, fc, **request_options)
response.content.path = path
self.logger.debug("Received response: %s", response.content)
return response.content
def _get_data(self, path, fc=None, **request_options):
return self._send_request(OneM2MRequest(
OneM2MOperation.retrieve,
path,
self.originator,
filter_criteria=fc,
**request_options
)).get()
def delete(self, instance):
self._send_request(OneM2MRequest(
OneM2MOperation.delete,
@ -99,13 +115,6 @@ class OneM2MMapper(BasicMapper):
self.originator
))
def _get_data(self, path):
return self._send_request(OneM2MRequest(
OneM2MOperation.retrieve,
path,
self.originator
)).get()
# TODO(rst): check if this can be removed in parent class
@classmethod
def _patch_model(cls):

View File

@ -5,7 +5,7 @@ from enum import Enum, unique
from futile.logging import get_logger
from openmtc.model import StrEnum
from openmtc_onem2m.exc import OneM2MError
from openmtc_onem2m.exc import OneM2MError, STATUS, get_response_status
@unique
@ -197,13 +197,14 @@ class OneM2MOperation(StrEnum):
class OneM2MRequest(object):
internal = False
cascading = False
ae_notifying = False
"""Class representing a OneM2M request"""
def __init__(self, op, to, fr=None, rqi=None, ty=None, pc=None, rol=None,
ot=None, rqet=None, rset=None, oet=None, rt=None, rp=None,
rcn=None, ec=None, da=None, gid=None, filter_criteria=None,
drt=None):
fc=None, drt=None):
# Operation
self.operation = op
# Target uri
@ -226,7 +227,7 @@ class OneM2MRequest(object):
self.event_category = ec
self.delivery_aggregation = da
self.group_request_identifier = gid
self.filter_criteria = filter_criteria
self.filter_criteria = filter_criteria or fc
# Optional Discovery result type
self.discovery_result_type = drt
@ -386,7 +387,10 @@ class OneM2MResponse(object):
def __init__(self, status_code, request=None, rqi=None, pc=None, to=None,
fr=None, rsc=None):
# Operation result
self.response_status_code = status_code
if isinstance(status_code, STATUS):
self.response_status_code = status_code
else:
self.response_status_code = get_response_status(status_code)
if request:
self.request_identifier = request.rqi
# Target uri

View File

@ -54,13 +54,13 @@ class BasicMapper(LoggerMixin):
def _do_update(self, instance, fields):
raise NotImplementedError()
def get(self, path):
def get(self, path, fc=None, **request_options):
raise NotImplementedError()
def delete(self, instance):
raise NotImplementedError()
def _get_data(self, path):
def _get_data(self, path, fc=None, **request_options):
raise NotImplementedError()
def _map(self, path, typename, data):

View File

@ -21,10 +21,23 @@ Actual low-level MQTT handling is performed by the [paho mqtt library](http://ww
Although the interfaces of both, `OneM2MHTTPClient` and `OneM2MMQTTClient` are identical, addressing endpoints varies drastically. Through the necessity of a broker commonly reachable by two peers, said broker has to be the endpoint instead of the peer's machines. Subsequently, an address suitable for `OneM2MMQTTClient` can in general not be crafted by merely substituting `http://` with `mqtt://`. (A notable exception is a set-up in which all peers - including the broker - are located on one and the same machine.)
For a simple set-up of one AE and one CSE, proceed as follows:
#### Gateway (CSE)
##### Broker
Either you use an external broker like `iot.eclipse.org` (see [link](https://iot.eclipse.org/getting-started.html#sandboxes) for information) or you can set up a local one.
An example for a docker-based broker can be found [here](https://github.com/renarj/moquette-spring-docker). Just start it with the following command (assumes docker is already installed):
```bash
$ docker run -d -p 1883:1883 renarj/mqtt:latest
```
The following example assumes you have a local broker. Otherwise exchange `localhost` with `iot.eclipse.org`.
#### Gateway (MN-CSE)
1. Locate the `config-gateway.json` configuration file
1. Find the `plugins`.`openmtc_cse` key
1. Add the following stanza to the list:
1. Find the `plugins`.`openmtc_cse`.`MQTTTransportPlugin` entry
1. Change `disabled` to `false`:
```json
{
@ -32,33 +45,98 @@ For a simple set-up of one AE and one CSE, proceed as follows:
"package": "openmtc_cse.plugins.transport_gevent_mqtt",
"disabled": false,
"config": {
"interface": "iot.eclipse.org",
"interface": "localhost",
"port": 1883
}
},
```
> ** Hint:** A gateway is not locked in a single protocol. Multiple transport plugins can be active at the same time, allowing for a CSE to be reachable through a set of protocols.
> **_Hint:_** A gateway is not locked in a single protocol. Multiple transport plugins can be active at the same time, allowing for a CSE to be reachable through a set of protocols.
> **⚠️ Warning:** For the sake of brevity, `iot.eclipse.org` is set as broker. Please consider the introduction on MQTT regarding the ramifications.
> **⚠️ Warning:** For the sake of brevity, `localhost` is set as broker. Please consider the introduction on MQTT regarding the ramifications.
4. Start the gateway through the `openmtc-gateway-gevent` script
4. Start the gateway through the `run-gateway` script
On a related note, enabling the plugin in the backend (NSE) is done in an almost identical way: Just read `config-backend.json` in step 1 and `openmtc-backend-gevent` in step 4.
On a related note, enabling the plugin in the backend (IN-CSE) is done in an almost identical way: Just read `config-backend.json` in step 1 and `run-backend` in step 4.
To have the gateway registered to the backend via MQTT you have to change the Registration Plugin. Here the `poa` and `own_poa` entries like the following example:
```json
{
"name": "RegistrationHandler",
"package": "openmtc_cse.plugins.registration_handler",
"disabled": false,
"config": {
"labels": [
"openmtc"
],
"remote_cses": [
{
"cse_id": "in-cse-1",
"poa": [
"mqtt://localhost:1883"
],
"own_poa": [
"mqtt://localhost:1883"
],
"cse_base": "onem2m",
"cse_type": "IN_CSE"
}
],
"interval": 3600,
"offset": 3600
}
},
```
#### Application Entity
Programmatically, it is sufficient to create an instance of `OneM2MMQTTClient` with a given endpoint. In adoption of [example 8a](./training/onem2m-examples/onem2m-example-8a.py):
```python
from openmtc_onem2m.client.mqtt import OneM2MMQTTClient
client = OneM2MMQTTClient("mqtt://iot.eclipse.org#mn-cse-1")
client = OneM2MMQTTClient("mqtt://localhost#mn-cse-1")
```
All subsequent examples should be modifiable in the same fashion in order to enable MQTT support. In general, adjusting endpoints and providing the proper client is concluding the required steps.
Please note the particle of the endpoint's URL being the name of a CSE. Due to the addressing scheme in oneM2M/MQTT, a requesting entity has to know the responding entities name in advance. It should be duly noted that this is a workaround neither mandated nor sanctioned by TS-0010. In fact, the semantics of particles in MQTT-URLs are [entirely undefined](https://github.com/mqtt/mqtt.github.io/wiki/URI-Scheme). This inconvenience may or may not vanish in future releases.
#### Using Training Applications
Also the existing training applications can be re-used which can be found under [doc/training/apps/onem2m](doc/training/apps/onem2m). The four applications which end with `-final.py` can be changed in order to use the starting script under [doc/training/start-app.sh](doc/training/start-app.sh).
One example would be `onem2m-gui-sensors-final.py`. Change the last few lines from
```python
host = 'http://localhost:18000'
app = TestGUI(
poas=['http://localhost:21345'], # adds poas in order to receive notifications
# SSL options
originator_pre='//openmtc.org/in-cse-1', # originator_pre, needs to match value in cert
ca_certs='../../openmtc-gevent/certs/ca-chain.cert.pem',
cert_file='certs/test-gui.cert.pem', # cert file, pre-shipped and should match name
key_file='certs/test-gui.key.pem'
)
Runner(app).run(host)
```
to
```python
host = 'mqtt://localhost:1883#in-cse-1'
app = TestGUI(
poas=['mqtt://localhost:1883'], # adds poas in order to receive notifications
# SSL options
originator_pre='//openmtc.org/in-cse-1', # originator_pre, needs to match value in cert
ca_certs='../../openmtc-gevent/certs/ca-chain.cert.pem',
cert_file='certs/test-gui.cert.pem', # cert file, pre-shipped and should match name
key_file='certs/test-gui.key.pem'
)
Runner(app).run(host)
```
The change in `host` makes the AE to communicate with the backend (IN-CSE) via the broker. The change in `poas` lets notifications handled by the broker as well.
## Further Reading
- [Official MQTT Website](http://mqtt.org/)
- [MQTT on Wikipedia](https://en.wikipedia.org/wiki/MQTT)

View File

@ -122,7 +122,8 @@ class NotificationManager(LoggerMixin):
try:
if notification.get('sud'):
del self.callbacks[sur]
spawn(callback['del_cb'], sur)
if callback['del_cb']:
spawn(callback['del_cb'], sur)
else:
spawn(callback['cb'], sur, **notification)
except:
@ -206,12 +207,15 @@ class MqttNotificationHandler(BaseNotificationHandler):
def wrapper(request):
notification = self._unpack_notification(request.content)
self._callback(request.originator, **notification)
return OneM2MResponse(status_code=get_response_status(2002), request=request)
return OneM2MResponse(status_code=get_response_status(2000), request=request)
self._client = get_client(self._endpoint.geturl(), handle_request_func=wrapper)
if not self._client.handle_request_func:
self._client.handle_request_func = wrapper
def stop(self):
self._client.stop()
pass
register_handler(MqttNotificationHandler, ('mqtt', 'mqtts', 'secure-mqtt'))

View File

@ -1,21 +1,24 @@
import logging
import re
import time
from abc import abstractmethod
from base64 import (
b64decode,
b64encode,
)
from datetime import datetime
from json import (
dumps as json_dumps,
loads as json_loads,
)
from gevent import (
spawn,
sleep,
)
from iso8601 import parse_date
from json import (
dumps as json_dumps,
loads as json_loads,
)
from futile.logging import LoggerMixin
import logging
from futile.logging import LoggerMixin
from openmtc.util import (
UTC,
datetime_now,
@ -34,14 +37,14 @@ from openmtc_onem2m.model import (
Container,
ContentInstance,
EncodingTypeE,
get_short_member_name,
NotificationEventTypeE,
EventNotificationCriteria, CSETypeIDE, RemoteCSE)
EventNotificationCriteria,
CSETypeIDE,
RemoteCSE,
FilterCriteria
)
from openmtc_onem2m.serializer import get_onem2m_decoder
from openmtc_onem2m.transport import OneM2MErrorResponse
import time
import re
from urllib import urlencode
logging.getLogger("iso8601").setLevel(logging.ERROR)
@ -365,18 +368,11 @@ class XAE(LoggerMixin):
# TODO(rst): use filter_criteria from model
if not filter_criteria:
filter_criteria = {}
path += "?fu=1"
if filter_criteria:
path += "&" + urlencode(
{
get_short_member_name(k): v for k, v in filter_criteria.iteritems()
},
True
)
path += '&drt=' + str(1 if unstructured else 2)
filter_criteria['filterUsage'] = 1
discovery = self.mapper.get(path)
discovery = self.mapper.get(path, FilterCriteria(**filter_criteria),
drt=1 if unstructured else 2)
return discovery.CONTENT

View File

@ -53,6 +53,15 @@
"require_cert": true
}
},
{
"name": "MQTTTransportPlugin",
"package": "openmtc_cse.plugins.transport_gevent_mqtt",
"disabled": true,
"config": {
"interface": "localhost",
"port": 1883
}
},
{
"name": "NotificationHandler",
"package": "openmtc_cse.plugins.notification_handler",

View File

@ -53,6 +53,15 @@
"require_cert": true
}
},
{
"name": "MQTTTransportPlugin",
"package": "openmtc_cse.plugins.transport_gevent_mqtt",
"disabled": true,
"config": {
"interface": "localhost",
"port": 1883
}
},
{
"name": "NotificationHandler",
"package": "openmtc_cse.plugins.notification_handler",

View File

@ -89,6 +89,7 @@ class RegistrationHandler(Plugin):
try:
self._handle_remote_cses()
self._registered = True
except CSEError as e:
self._handle_registration_error(e)
@ -310,9 +311,6 @@ class RegistrationHandler(Plugin):
""" Stops the plugin.
DELETES CSE resource.
"""
self._handle_remote_cses(
handle_remote_cse_method=self._handle_remote_cse_delete)
try:
self.api.cancel_timer(self.__timer)
except AttributeError:
@ -324,6 +322,7 @@ class RegistrationHandler(Plugin):
pass
if self._registered:
pass
self._handle_remote_cses(
handle_remote_cse_method=self._handle_remote_cse_delete)
self._stopped()

View File

@ -1,3 +1,6 @@
from openmtc_cse import OneM2MEndPoint
from openmtc_cse.methoddomain.filtercriteria import parse_filter_criteria
from openmtc_onem2m.model import get_long_member_name
from openmtc_server.Plugin import Plugin
from openmtc_server.configuration import Configuration
from openmtc_onem2m.client.mqtt import get_client, portmap
@ -21,6 +24,15 @@ class MQTTTransportPlugin(Plugin):
scheme = portmap.keys()[portmap.values().index(port)]
except (KeyError, ValueError):
scheme = 'mqtt'
def handle_request_func(onem2m_request):
if onem2m_request.fc:
onem2m_request.fc = parse_filter_criteria({
get_long_member_name(k): v for k, v in onem2m_request.fc.items()
})
return self.api.handle_onem2m_request(onem2m_request)
self._client = get_client(
''.join([
scheme,
@ -29,9 +41,13 @@ class MQTTTransportPlugin(Plugin):
':',
str(port),
]),
handle_request_func=self.api.handle_onem2m_request,
handle_request_func=handle_request_func,
client_id=self.config['onem2m'].get('cse_id'),
)
self.api.register_point_of_access(
OneM2MEndPoint(scheme=scheme, server_address=interface, port=port))
self._started()
def _stop(self):

View File

@ -78,25 +78,29 @@ class OneM2MTransportDomain(Component):
self._endpoints = []
for poa_t in self._poa_templates:
def map_func(address):
if address.family == AF_INET6:
a = '[' + address.address + ']'
else:
a = address.address
return poa_t.scheme + '://' + a + ':' + str(poa_t.port)
if poa_t.server_address == "::":
def filter_func(x):
return x
elif poa_t.server_address in ("", "0.0.0.0"):
def filter_func(x):
return x.family == AF_INET
if poa_t.scheme.startswith("mqtt"):
self._endpoints.append(poa_t.scheme + '://' + poa_t.server_address +
':' + str(poa_t.port))
else:
def filter_func(x):
return x.address == poa_t.server_address
def map_func(address):
if address.family == AF_INET6:
a = '[' + address.address + ']'
else:
a = address.address
return poa_t.scheme + '://' + a + ':' + str(poa_t.port)
self._endpoints += map(map_func, filter(filter_func,
self._get_address_list()))
if poa_t.server_address == "::":
def filter_func(x):
return x
elif poa_t.server_address in ("", "0.0.0.0"):
def filter_func(x):
return x.family == AF_INET
else:
def filter_func(x):
return x.address == poa_t.server_address
self._endpoints += map(map_func, filter(filter_func,
self._get_address_list()))
# interface handling
def _handle_interface_created(self, interface):
@ -162,8 +166,12 @@ class OneM2MTransportDomain(Component):
'key_file': self.key_file
}
# TODO(hve): add scheme test.
client = self._get_clients[urlparse(poa).scheme](
poa, use_xml, insecure=self.accept_insecure_certs, **ssl_certs)
try:
client = self._get_clients[urlparse(poa).scheme](
poa, use_xml, insecure=self.accept_insecure_certs, **ssl_certs)
except KeyError:
self.logger.error("Scheme %s not configured" % urlparse(poa).scheme)
continue
try:
response = client.send_onem2m_request(onem2m_request).get()
p.fulfill(response)
@ -178,7 +186,7 @@ class OneM2MTransportDomain(Component):
return p
def send_notify(self, notify_request, poa_list=None):
notify_request.to = ''
notify_request.ae_notifying = True
return self._send_request_to_endpoints(notify_request, poa_list)
def send_onem2m_request(self, onem2m_request):