initial commit for notify-policy

This commit is contained in:
aor-fokus 2019-01-18 16:11:21 +01:00
parent 7c35afbb0c
commit 4f4b4bbe0d
5 changed files with 180 additions and 17 deletions

View File

@ -772,11 +772,21 @@ class EventNotificationCriteria(OneM2MEntity):
class BatchNotify(OneM2MEntity): class BatchNotify(OneM2MEntity):
pass # TODO """ See TS-004 6.3.5.6
See TS-004 Table 6.3.5.6-1"""
number = Attribute(int)
duration = Attribute(int)
class RateLimit(OneM2MEntity): class RateLimit(OneM2MEntity):
pass # TODO """ See TS-004 6.3.5.31
See TS-004 Table 6.3.5.31-1"""
maxNrOfNotify = Attribute(int)
timeWindow = Attribute(int)
class Subscription(RegularResourceC): class Subscription(RegularResourceC):

View File

@ -13,7 +13,7 @@ from openmtc_onem2m.model import (get_onem2m_type, ContentInstance,
get_long_attribute_name, get_long_attribute_name,
OneM2MEntity, OneM2MResource, Container, OneM2MEntity, OneM2MResource, Container,
get_long_resource_name, OneM2MContentResource, get_long_resource_name, OneM2MContentResource,
URIList, OneM2MIntEnum, SecurityInfo) URIList, OneM2MIntEnum, SecurityInfo, AggregatedNotification)
_typename_matcher = re_compile(r'^m2m:([a-z]+)$') _typename_matcher = re_compile(r'^m2m:([a-z]+)$')
@ -51,6 +51,17 @@ class OneM2MSerializer(LoggerMixin, metaclass=ABCMeta):
representation = data["notificationEvent"]["representation"] representation = data["notificationEvent"]["representation"]
representation = self.decode(self.dumps(representation)) representation = self.decode(self.dumps(representation))
data["notificationEvent"]["representation"] = representation data["notificationEvent"]["representation"] = representation
if resource_type is AggregatedNotification and "notification" in data:
notifications = data["notification"]
data["notification"] = []
for notification in notifications:
data["notification"].append(
self.decode(
self.dumps(notification)
)
)
resource = resource_type(**data) resource = resource_type(**data)
if child_resource: if child_resource:
resource.childResource = child_resource resource.childResource = child_resource
@ -67,6 +78,22 @@ class OneM2MDictSerializer(OneM2MSerializer):
self.logger.debug("Encoding representation: %s", representation) self.logger.debug("Encoding representation: %s", representation)
if isinstance(resource, AggregatedNotification):
try:
notifications = representation["notification"]
if notifications:
representation["notification"] = []
for notification in notifications:
representation["notification"].append(
self.encode_resource(
notification, pretty, path, encoding, fields, True
)
)
except (AttributeError, KeyError):
pass
if isinstance(resource, Notification): if isinstance(resource, Notification):
# handle notifications # handle notifications
try: try:
@ -120,7 +147,7 @@ class OneM2MDictSerializer(OneM2MSerializer):
k, v in representation.items()} k, v in representation.items()}
if not isinstance(resource, (OneM2MResource, Notification, if not isinstance(resource, (OneM2MResource, Notification,
SecurityInfo, OneM2MContentResource)): SecurityInfo, OneM2MContentResource,AggregatedNotification)):
return representation return representation
typename = 'm2m:' + (get_short_resource_name(resource.typename) or typename = 'm2m:' + (get_short_resource_name(resource.typename) or

View File

@ -7,6 +7,9 @@ from openmtc_onem2m.model import (
EventNotificationCriteria, EventNotificationCriteria,
NotificationEventTypeE, NotificationEventTypeE,
Subscription, Subscription,
BatchNotify,
RateLimit,
AggregatedNotification,
) )
from openmtc_onem2m.serializer import get_onem2m_decoder from openmtc_onem2m.serializer import get_onem2m_decoder
from urllib.parse import urlparse from urllib.parse import urlparse
@ -98,12 +101,19 @@ class NotificationManager(LoggerMixin):
def register_callback(self, func, sur, del_func=None): def register_callback(self, func, sur, del_func=None):
self.callbacks[sur] = { self.callbacks[sur] = {
'cb': func if len(getargspec(func)[0]) > 1 'cb': func if len(getargspec(func)[0]) > 1
else lambda _, **notification: func(notification['rep']), else lambda _, **notification: (
func(notification['not']) if 'not' in notification.keys()
else func(notification['rep'])),
'del_cb': del_func 'del_cb': del_func
} }
def _handle_callback(self, originator, **notification): def _handle_callback(self, originator, **notification):
sur = notification.pop('sur') try:
n = notification["not"][0]
except KeyError:
n = notification
sur = n.pop('sur')
sur = self._normalize_path(sur) sur = self._normalize_path(sur)
try: try:
@ -132,7 +142,7 @@ class NotificationManager(LoggerMixin):
def get_expiration_time(self): def get_expiration_time(self):
return None return None
def subscribe(self, path, func, delete_func=None, filter_criteria=None, expiration_time=None, def subscribe(self, path, func, delete_func=None, filter_criteria=None, sub_options=None, expiration_time=None,
notification_types=(NotificationEventTypeE.updateOfResource,)): notification_types=(NotificationEventTypeE.updateOfResource,)):
self._init() self._init()
@ -140,12 +150,23 @@ class NotificationManager(LoggerMixin):
event_notification_criteria.notificationEventType = ( event_notification_criteria.notificationEventType = (
event_notification_criteria.notificationEventType or list(notification_types)) event_notification_criteria.notificationEventType or list(notification_types))
subscription = self.mapper.create(path, Subscription( subscription = Subscription(
notificationURI=[self.mapper.originator], notificationURI=[self.mapper.originator],
expirationTime=expiration_time or self.get_expiration_time(), expirationTime=expiration_time or self.get_expiration_time(),
eventNotificationCriteria=event_notification_criteria, eventNotificationCriteria=event_notification_criteria,
subscriberURI=self.mapper.originator, subscriberURI=self.mapper.originator,
)) )
if sub_options:
try:
subscription.batchNotify = BatchNotify(**sub_options["batchNotify"])
except KeyError:
try:
subscription.rateLimit = RateLimit(**sub_options["rateLimit"])
except KeyError:
pass
subscription = self.mapper.create(path, subscription)
reference = self._normalize_path(subscription.path) reference = self._normalize_path(subscription.path)
self.register_callback(func, reference, delete_func) self.register_callback(func, reference, delete_func)
@ -263,10 +284,32 @@ class HttpNotificationHandler(BaseNotificationHandler):
cl = int(request.environ.get('CONTENT_LENGTH'), 0) cl = int(request.environ.get('CONTENT_LENGTH'), 0)
request.data = request.environ['wsgi.input'].read(cl) request.data = request.environ['wsgi.input'].read(cl)
notification = get_onem2m_decoder(request.content_type).decode(request.data)
if not notification.verificationRequest: ######## NEW CODE BEGINNING
notification = self._unpack_notification(notification) notification = {}
self._callback(request.headers['x-m2m-origin'], **notification) notification_type = get_onem2m_decoder(request.content_type).decode(request.data)
if isinstance(notification_type, AggregatedNotification):
notification["not"] = []
for n in notification_type.values["notification"]:
notification["not"].append(
self._unpack_notification(n)
)
else:
notification = self._unpack_notification(notification_type)
######## NEW CODE ENDING
# TODO Check if the above is working or take the below one
######## OLD CODE BEGINNING
# notification = get_onem2m_decoder(request.content_type).decode(request.data)
# if not notification.verificationRequest:
# notification = self._unpack_notification(notification)
######## OLD CODE ENDING
self._callback(request.headers['x-m2m-origin'], **notification)
return Response( return Response(
headers={ headers={

View File

@ -571,10 +571,11 @@ class XAE(LoggerMixin):
if x.rule != route] if x.rule != route]
def _add_subscription(self, path, _, handler, delete_handler, filter_criteria=None, def _add_subscription(self, path, _, handler, delete_handler, filter_criteria=None,
expiration_time=None): sub_options=None, expiration_time=None):
params = { params = {
'filter_criteria': filter_criteria, 'filter_criteria': filter_criteria,
'expiration_time': expiration_time, 'expiration_time': expiration_time,
'sub_options': sub_options,
} }
return self.add_subscription_handler(path, handler, delete_handler, **params) return self.add_subscription_handler(path, handler, delete_handler, **params)
@ -590,7 +591,7 @@ class XAE(LoggerMixin):
def add_subscription_handler(self, path, handler, delete_handler=None, def add_subscription_handler(self, path, handler, delete_handler=None,
types=(NotificationEventTypeE.updateOfResource, ), types=(NotificationEventTypeE.updateOfResource, ),
filter_criteria=None, expiration_time=None): filter_criteria=None, sub_options=None, expiration_time=None):
""" """
:param path: :param path:
@ -598,6 +599,7 @@ class XAE(LoggerMixin):
:param delete_handler: :param delete_handler:
:param types: :param types:
:param filter_criteria: :param filter_criteria:
:param sub_options:
:param expiration_time: :param expiration_time:
:return: :return:
""" """
@ -608,6 +610,7 @@ class XAE(LoggerMixin):
delete_handler, delete_handler,
notification_types=types, notification_types=types,
filter_criteria=filter_criteria, filter_criteria=filter_criteria,
sub_options=sub_options,
expiration_time=expiration_time expiration_time=expiration_time
) )
@ -627,7 +630,7 @@ class XAE(LoggerMixin):
return subscription.path return subscription.path
def add_container_subscription(self, container, handler, delete_handler=None, def add_container_subscription(self, container, handler, delete_handler=None,
filter_criteria=None): filter_criteria=None, sub_options=None):
""" Creates a Subscription to the ContentInstances of the given """ Creates a Subscription to the ContentInstances of the given
Container. Container.
@ -635,6 +638,7 @@ class XAE(LoggerMixin):
:param handler: reference of the notification handling function :param handler: reference of the notification handling function
:param delete_handler: reference to delete handling function :param delete_handler: reference to delete handling function
:param filter_criteria: (optional) FilterCriteria for the subscription :param filter_criteria: (optional) FilterCriteria for the subscription
:param sub_options: (optional) SubscriptionOptions for the subscription
""" """
path = getattr(container, "path", container) path = getattr(container, "path", container)
@ -647,6 +651,7 @@ class XAE(LoggerMixin):
filter_criteria = filter_criteria or EventNotificationCriteria() filter_criteria = filter_criteria or EventNotificationCriteria()
filter_criteria.notificationEventType = list([ filter_criteria.notificationEventType = list([
NotificationEventTypeE.createOfDirectChildResource, NotificationEventTypeE.createOfDirectChildResource,
NotificationEventTypeE.updateOfResource,
]) ])
def content_handler(cin): def content_handler(cin):
@ -657,7 +662,8 @@ class XAE(LoggerMixin):
None, None,
content_handler, content_handler,
delete_handler, delete_handler,
filter_criteria filter_criteria,
sub_options,
) )
def __start_refresher(self, instance, extra_fields=(), restore=None): def __start_refresher(self, instance, extra_fields=(), restore=None):

View File

@ -1,3 +1,4 @@
import datetime
from openmtc_onem2m import OneM2MRequest from openmtc_onem2m import OneM2MRequest
from openmtc_onem2m.exc import CSENotFound from openmtc_onem2m.exc import CSENotFound
from openmtc_onem2m.model import ( from openmtc_onem2m.model import (
@ -8,6 +9,7 @@ from openmtc_onem2m.model import (
NotificationContentTypeE, NotificationContentTypeE,
EventNotificationCriteria, EventNotificationCriteria,
NotificationEventTypeE, NotificationEventTypeE,
AggregatedNotification,
) )
from openmtc_onem2m.transport import OneM2MOperation from openmtc_onem2m.transport import OneM2MOperation
from openmtc_server.Plugin import Plugin from openmtc_server.Plugin import Plugin
@ -95,6 +97,10 @@ class NotificationHandler(Plugin):
"pid": subscription.parentID, "pid": subscription.parentID,
"enc": get_event_notification_criteria(subscription), "enc": get_event_notification_criteria(subscription),
"sub": subscription, "sub": subscription,
"not": [], # notifications
"levt": datetime.datetime.now(), # last event time
"sno": 0 # sent notifications (ratelimit)
} }
def _handle_subscription_updated(self, subscription, _): def _handle_subscription_updated(self, subscription, _):
@ -276,6 +282,68 @@ class NotificationHandler(Plugin):
# preSubscriptionNotify is not supported in this release of the # preSubscriptionNotify is not supported in this release of the
# document. # document.
# batchNotify
try:
batch_notify = sub.batchNotify
if batch_notify is not None:
current_time = datetime.datetime.now()
notifications = self.subscriptions_info[sub.resourceID]["not"]
for uri in sub.notificationURI:
notifications.append(
Notification(
notificationEvent=NotificationEventC(
representation=resource
),
subscriptionReference=self._get_subscription_reference(uri, sub.path),
creator=sub.creator
)
)
# Note: the extra condition 'len(notifications) < int(batch_notify.number) + 1'
# is needed, otherwise sometimes one more notification is sent
if int(batch_notify.number) <= len(notifications) and len(notifications) < int(batch_notify.number) + 1 or \
int(batch_notify.duration) <= int((current_time - self.subscriptions_info[sub.resourceID]["levt"]).seconds):
aggregated_notification = AggregatedNotification(**{"notification": notifications})
self._send_notification(aggregated_notification, sub)
self.subscriptions_info[sub.resourceID]["levt"] = current_time
self.subscriptions_info[sub.resourceID]["not"] = []
return
except AttributeError:
pass
# rateLimit
try:
ratelimit = sub.rateLimit
if ratelimit is not None:
notification_resources = self.subscriptions_info[sub.resourceID]["not"]
for uri in sub.notificationURI:
notification_resources.append(resource)
for i in range(len(notification_resources)):
current_time = datetime.datetime.now()
if int((current_time - self.subscriptions_info[sub.resourceID]["levt"]).seconds) <= int(ratelimit.timeWindow):
if self.subscriptions_info[sub.resourceID]["sno"] <= int(ratelimit.maxNrOfNotify):
self._send_notification(notification_resources.pop(0), sub)
self.subscriptions_info[sub.resourceID]["sno"] += 1
else:
break
else:
self.subscriptions_info[sub.resourceID]["levt"] = current_time
self.subscriptions_info[sub.resourceID]["sno"] = 0
break
return
except AttributeError:
pass
# Step 3.0 The Originator shall check the notification and reachability # Step 3.0 The Originator shall check the notification and reachability
# schedules, but the notification schedules may be checked in different # schedules, but the notification schedules may be checked in different
# order. # order.
@ -351,6 +419,15 @@ class NotificationHandler(Plugin):
def _send_notification(self, resource, sub): def _send_notification(self, resource, sub):
self.logger.debug("sending notification for resource: %s", resource) self.logger.debug("sending notification for resource: %s", resource)
if isinstance(resource, AggregatedNotification):
for uri in sub.notificationURI:
self.api.handle_onem2m_request(OneM2MRequest(
op=OneM2MOperation.notify,
to=uri,
pc=resource
))
return
for uri in sub.notificationURI: for uri in sub.notificationURI:
self.api.handle_onem2m_request(OneM2MRequest( self.api.handle_onem2m_request(OneM2MRequest(
op=OneM2MOperation.notify, op=OneM2MOperation.notify,