add influxDB Application

This commit is contained in:
Christian Klopp 2018-01-30 17:20:19 +01:00
parent cb5d2a49ca
commit 782fe8e16b
16 changed files with 660 additions and 0 deletions

View File

@ -0,0 +1 @@
include utils.py

36
apps/InfluxDB/README.md Normal file
View File

@ -0,0 +1,36 @@
# InfluxDB
This App will subscribe to OpenMTC data and tranfer it to an instance of the InfluxDB.
## Run the app
You need a running instance of InfluxDB and configure the following parameters according to your setup.
```
apps/influx-db \
--ep "http://127.0.0.1:8000" \
--influx_host "127.0.0.1" \
--influx-port "8086" \
--influx-user "root" \
--influx-password "secret" \
--db-name "example" \
--db-user "root" \
--db-pw "secret"
```
## Data Model
Entries in the InfluxDB are organized by measurement, time, fields and tags. Data is transfered from OpenMTC like shown below:
* measurement: data_senml["n"] (example: vehiclecount)
* time: data_senml["t"]
* tags:
* application name (example: loadgen)
* device name (example: parking_space)
* sensor name (example: totalspaces)
* sensor labels (example: "openmtc:sensor")
* device labels (example: "openmtc:device")
* fields:
* value: data_senml["v"]
* bn: data_senml["bn"]
* unit: data_senml["u"]

View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
exec python -m influxdb $@

26
apps/InfluxDB/config.json Normal file
View File

@ -0,0 +1,26 @@
{
"name": "InfluxDB",
"ep": "http://localhost:8000",
"cse_base": "onem2m",
"poas": [
"http://auto:23706"
],
"originator_pre": "//openmtc.org/mn-cse-1",
"ssl_certs": {
"cert_file": null,
"key_file": null,
"ca_certs": null
},
"logging": {
"level": "ERROR",
"file": null
},
"influx_host": "localhost",
"influx_port": "8086",
"influx_user": "root",
"influx_password": "root",
"dbname": "example",
"dbuser": "test",
"labels": [],
"dbuser_pw": "test"
}

View File

@ -0,0 +1,73 @@
#!/usr/bin/env bash
CONFIG_FILE="/etc/openmtc/influxdb/config.json"
NAME=${NAME-"InfluxDB"}
EP=${EP-"http://localhost:8000"}
CSE_BASE=${CSE_BASE-"onem2m"}
POAS=${POAS-'["http://auto:23706"]'}
ORIGINATOR_PRE=${ORIGINATOR_PRE-"//openmtc.org/mn-cse-1"}
SSL_CRT=${SSL_CRT-"/etc/openmtc/certs/influxdb.cert.pem"}
SSL_KEY=${SSL_KEY-"/etc/openmtc/certs/influxdb.key.pem"}
SSL_CA=${SSL_CA-"/etc/openmtc/certs/ca-chain.cert.pem"}
LABELS=${LABELS-'[]'}
INFLUX_HOST=${INFLUX_HOST-"localhost"}
INFLUX_PORT=${INFLUX_PORT-"8086"}
INFLUX_USER=${INFLUX_USER-"root"}
INFLUX_PASSWORD=${INFLUX_PASSWORD-"root"}
DBNAME=${DBNAME-"example"},
DBUSER=${DBUSER-"test"},
DBUSER_PW=${DBUSER_PW-"test"}
# defaults logging
LOGGING_FILE=${LOGGING_FILE-"/var/log/openmtc/influxdb.log"}
LOGGING_LEVEL=${LOGGING_LEVEL-"ERROR"}
# ensure correct level
case ${LOGGING_LEVEL} in
FATAL|ERROR|WARN|INFO|DEBUG)
;;
*)
LOGGING_LEVEL="ERROR"
;;
esac
# local ip
LOCAL_IP=$(ip r get 8.8.8.8 | awk 'NR==1 {print $NF}')
# set hostname
HOST_NAME=${EXTERNAL_IP-${LOCAL_IP}}
# Configuration of the service.
CONFIG_TEMP=${CONFIG_FILE}".tmp"
echo -n "Configuring M2M influxdb..."
JQ_STRING='.'
# basics
JQ_STRING=${JQ_STRING}' |
.name = "'${NAME}'" |
.ep = "'${EP}'" |
.cse_base = "'${CSE_BASE}'" |
.poas = '${POAS}' |
.originator_pre = "'${ORIGINATOR_PRE}'" |
.ssl_certs.cert_file = "'${SSL_CRT}'" |
.ssl_certs.key_file = "'${SSL_KEY}'" |
.ssl_certs.ca_certs = "'${SSL_CA}'" |
.logging.file |= "'${LOGGING_FILE}'" |
.logging.level |= "'${LOGGING_LEVEL}'" |
.labels |= '${LABELS}' |
.influx_host = "'${INFLUX_HOST}'" |
.influx_port = "'${INFLUX_PORT}'" |
.influx_user = "'${INFLUX_USER}'" |
.influx_password = "'${INFLUX_PASSWORD}'" |
.dbname = "'${DBNAME}'" |
.dbuser = "'${DBUSER}'" |
.dbuser_pw = "'${DBUSER_PW}'"
'
cat ${CONFIG_FILE} | jq -M "${JQ_STRING}"> ${CONFIG_TEMP}
mv ${CONFIG_TEMP} ${CONFIG_FILE}
echo "done"
exec python -m influxdb $@

View File

@ -0,0 +1,30 @@
############################################################
# Dockerfile to run openmtc influxdb binary
############################################################
# Set the base image to use openmtc/sdk
FROM openmtc/sdk-amd64:latest
ENV MOD_NAME=influxdb
# Set the file maintainer
MAINTAINER rst
# install openmtc dependencies
COPY tmp/$MOD_NAME-dependencies.txt /tmp/requirements.txt
RUN pip install --upgrade --requirement /tmp/requirements.txt
# install openmtc-influxdb
COPY tmp/openmtc-$MOD_NAME.tar.gz /tmp/openmtc-$MOD_NAME.tar.gz
RUN tar xzf /tmp/openmtc-$MOD_NAME.tar.gz -C / \
--owner root --group root --no-same-owner --no-overwrite-dir \
--transform 's/json\.dist/json/' --show-transformed
RUN mkdir -p /var/log/openmtc
# add change config
COPY configure-$MOD_NAME-and-start /usr/local/bin/configure-and-start
# entry point
ENTRYPOINT ["/usr/local/bin/configure-and-start"]
CMD [""]

View File

@ -0,0 +1,30 @@
############################################################
# Dockerfile to run openmtc influxdb binary
############################################################
# Set the base image to use openmtc/sdk
FROM openmtc/sdk-arm:latest
ENV MOD_NAME=influxdb
# Set the file maintainer
MAINTAINER rst
# install openmtc dependencies
COPY tmp/$MOD_NAME-dependencies.txt /tmp/requirements.txt
RUN pip install --upgrade --requirement /tmp/requirements.txt
# install openmtc-influxdb
COPY tmp/openmtc-$MOD_NAME.tar.gz /tmp/openmtc-$MOD_NAME.tar.gz
RUN tar xzf /tmp/openmtc-$MOD_NAME.tar.gz -C / \
--owner root --group root --no-same-owner --no-overwrite-dir \
--transform 's/json\.dist/json/' --show-transformed
RUN mkdir -p /var/log/openmtc
# add change config
COPY configure-$MOD_NAME-and-start /usr/local/bin/configure-and-start
# entry point
ENTRYPOINT ["/usr/local/bin/configure-and-start"]
CMD [""]

View File

@ -0,0 +1,26 @@
{
"name": "InfluxDB",
"ep": "http://localhost:8000",
"cse_base": "onem2m",
"poas": [
"http://auto:23706"
],
"originator_pre": "//openmtc.org/mn-cse-1",
"ssl_certs": {
"cert_file": "/etc/openmtc/certs/influxdb.cert.pem",
"key_file": "/etc/openmtc/certs/influxdb.key.pem",
"ca_certs": "/etc/openmtc/certs/ca-chain.cert.pem"
},
"logging": {
"level": "INFO",
"file": "/var/log/openmtc/influxdb.log"
},
"labels": [],
"influx_host": "localhost",
"influx_port": "8086",
"influx_user": "root",
"influx_password": "root",
"dbname": "example",
"dbuser": "test",
"dbuser_pw": "test"
}

View File

@ -0,0 +1,10 @@
[Unit]
Description=OpenMTC InfluxDB
After=network.target
Wants=ntp.service
[Service]
ExecStart=/usr/local/bin/influx-db
[Install]
WantedBy=multi-user.target

82
apps/InfluxDB/setup-influxdb.py Executable file
View File

@ -0,0 +1,82 @@
#!/usr/bin/env python
from setuptools import setup
from distutils.core import setup
from glob import glob
import sys
from utils import get_packages, get_pkg_files, OpenMTCSdist, move_config_files
# name and dir
NAME = "influxdb"
BASE_DIR = "."
# import pkg
sys.path.append(BASE_DIR + "/src")
pkg = __import__(NAME)
# setup name and version
SETUP_NAME = "openmtc-" + NAME
SETUP_VERSION = pkg.__version__
SETUP_DESCRIPTION = pkg.__description__
# meta
SETUP_AUTHOR = pkg.__author_name__
SETUP_AUTHOR_EMAIL = pkg.__author_mail__
SETUP_URL = "http://www.openmtc.org"
SETUP_LICENSE = "Fraunhofer FOKUS proprietary"
# requirements
SETUP_REQUIRES = pkg.__requires__
SETUP_INSTALL_REQUIRES = pkg.__requires__
# packages
PACKAGES = [NAME]
PACKAGE_DIR = {"": BASE_DIR + "/src"}
all_packages = []
for package in PACKAGES:
all_packages.extend(get_packages(package, PACKAGE_DIR))
# scripts
SETUP_SCRIPTS = glob(BASE_DIR + "/bin/*")
# package data
PACKAGE_DATA = {NAME: get_pkg_files(BASE_DIR, NAME)}
# data files
CONFIG_FILES = ("config.json",)
CONFIG_DIR = "/etc/openmtc/" + NAME
CONFIG_DIST_FILES = (BASE_DIR + "/etc/conf/config.json.dist",)
DATA_FILES = [(CONFIG_DIR, CONFIG_DIST_FILES)]
# cmd class
CMD_CLASS = {'sdist': OpenMTCSdist}
if __name__ == "__main__":
if 'bdist_wheel' in sys.argv:
raise RuntimeError("This setup.py does not support wheels")
############################################################################
# setup
setup(name=SETUP_NAME,
version=SETUP_VERSION,
description=SETUP_DESCRIPTION,
author=SETUP_AUTHOR,
author_email=SETUP_AUTHOR_EMAIL,
url=SETUP_URL,
license=SETUP_LICENSE,
requires=SETUP_REQUIRES,
install_requires=SETUP_INSTALL_REQUIRES,
package_dir=PACKAGE_DIR,
packages=all_packages,
scripts=SETUP_SCRIPTS,
package_data=PACKAGE_DATA,
data_files=DATA_FILES,
cmdclass=CMD_CLASS
)
############################################################################
# install
if "install" in sys.argv:
# only do this during install
move_config_files(CONFIG_DIR, CONFIG_FILES)

View File

@ -0,0 +1,9 @@
"""
Transfer OpenMTC Data to an InfluxDB
"""
__version__ = "0.1"
__description__ = "InfluxDB"
__author_name__ = "Christian Klopp"
__author_mail__ = "christian.klopp@fokus.fraunhofer.de"
__requires__ = ['influxdb']

View File

@ -0,0 +1,65 @@
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from openmtc_app.util import prepare_app, get_value
from openmtc_app.runner import AppRunner as Runner
from .influx_db import InfluxDB
# defaults
default_name = "InfluxDB"
default_ep = "http://localhost:8000"
default_labels = []
# args parser
parser = ArgumentParser(
description="An IPE called InfluxDB",
prog="InfluxDB",
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("-n", "--name", help="Name used for the AE.")
parser.add_argument("-s", "--ep", help="URL of the local Endpoint.")
parser.add_argument("--influx-host", help="Host of InfluxDB")
parser.add_argument("--influx-port", help="Port of InfluxDB")
parser.add_argument("--influx-user", help="Root User of InfluxDB")
parser.add_argument('--labels', type=str, help='just subscribe to those '
'labels', nargs='+')
parser.add_argument("--influx-password", help="Root Password of InfluxDB")
parser.add_argument("--db-name", help="InfluxDB name")
parser.add_argument("--db-user", help="InfluxDB User")
parser.add_argument("--db-pw", help="InfluxDB User password")
# args, config and logging
args, config = prepare_app(parser, __loader__, __name__, "config.json")
# variables
nm = get_value("name", (unicode, str), default_name, args, config)
cb = config.get("cse_base", "onem2m")
ep = get_value("ep", (unicode, str), default_ep, args, config)
poas = config.get("poas", ["http://auto:23706"])
originator_pre = config.get("originator_pre", "//openmtc.org/mn-cse-1")
ssl_certs = config.get("ssl_certs", {})
lbl = get_value("labels", list, default_labels, args, config)
influx_host = get_value("influx_host", (unicode, str), "localhost", args, config)
influx_port = get_value("influx_port", (unicode, str), "8086", args, config)
influx_user = get_value("influx_user", (unicode, str), "root", args, config)
influx_password = get_value("influx_password", (unicode, str), "root", args, config)
db_name = get_value("db_name", (unicode, str), "example", args, config)
db_user = get_value("db_user", (unicode, str), "test", args, config)
db_pw = get_value("db_pw", (unicode, str), "test", args, config)
# start
app = InfluxDB(
name=nm, cse_base=cb, poas=poas,
labels=lbl,
originator_pre=originator_pre,
influx_host=influx_host,
influx_port=influx_port,
influx_user=influx_user,
influx_password=influx_password,
dbname=db_name,
dbuser=db_user,
dbuser_pw=db_pw,
**ssl_certs
)
Runner(app).run(ep)
print ("Exiting....")

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
from influxdb import InfluxDBClient
class InfluxDBConnector:
def __init__(
self,
host='localhost',
port=8086,
user='root',
password='root',
dbname='example',
dbuser='test',
dbuser_pw='test'):
self.host = host
self.port = port
self.user = user
self.password = password
self.dbname = dbname
self.dbuser = dbuser
self.dbuser_pw = dbuser_pw
self.client = InfluxDBClient(host, port, user, password, dbname)
self.client.create_database(dbname)
def update(
self,
cnt_senml,
application_name,
device_name,
device_labels,
sensor_name,
sensor_labels):
cnt_senml = cnt_senml
json_body = [
{
"measurement": cnt_senml["n"],
"tags": {
"application_name": application_name,
"device_name": device_name,
"device_labels": ";".join(device_labels),
"sensor_name": sensor_name,
"sensor_labels": ";".join(sensor_labels)
},
"time": int(cnt_senml["t"]),
"fields": {
"value": cnt_senml["v"],
"bn": cnt_senml["bn"],
"unit": cnt_senml["u"]
}
}
]
self.client.write_points(json_body, time_precision="s")

View File

@ -0,0 +1,55 @@
from openmtc_app.onem2m import ResourceManagementXAE
from connector import InfluxDBConnector
class InfluxDB(ResourceManagementXAE):
def __init__(
self,
labels=[],
influx_host='localhost',
influx_port=8086,
influx_user='root',
influx_password='root',
dbname='example',
dbuser='test',
dbuser_pw='test',
*args,
**kw
):
super(InfluxDB, self).__init__(*args, **kw)
if isinstance(labels, basestring):
self.labels = {labels}
elif hasattr(labels, '__iter__') and len(labels):
self.labels = set(labels)
else:
self.labels = []
self._entity_names = {}
# create database
self.connector = InfluxDBConnector(
host=influx_host,
port=influx_port,
user=influx_user,
password=influx_password,
dbname=dbname,
dbuser=dbuser,
dbuser_pw=dbuser_pw)
def _on_register(self):
self._discover_openmtc_ipe_entities()
def _sensor_filter(self, sensor_info):
if self.labels:
return len(self.labels.intersection(sensor_info['sensor_labels'])) > 0
else:
return True
def _sensor_data_cb(self, sensor_info, sensor_data):
self.connector.update(sensor_data,
sensor_info['ID'].split('/')[3],
sensor_info['dev_name'],
sensor_info['dev_labels'],
sensor_info['ID'].split('/')[-1],
sensor_info['sensor_labels'])

148
apps/InfluxDB/utils.py Normal file
View File

@ -0,0 +1,148 @@
import distutils.command.sdist
import distutils.command.build_py
import os
import subprocess
import sys
def echo(msg, *args):
if args:
msg = msg % args
sys.stdout.write(msg + "\n")
def get_packages(package, package_dir, excluded_list=None, included_list=None):
included_list = included_list or []
excluded_list = excluded_list or []
try:
root = package_dir[package]
except KeyError:
root = package_dir.get("", ".") + "/" + package
if not os.path.exists(root):
sys.stderr.write(
"Directory for package %s does not exist: %s\n" % (package, root))
sys.exit(1)
def on_error(error):
sys.stderr.write(
"Error while collecting packages for %s: %s\n" % (package, error))
sys.exit(1)
packages = [package]
r_prefix = len(root) + 1
for path, dirs, files in os.walk(root, onerror=on_error):
is_module = "__init__.py" in files and path != root
excluded = any(map(lambda x: x in path, excluded_list))
included = any(map(lambda x: x in path, included_list))
if is_module and (not excluded or included):
packages.append(package + "." + path[r_prefix:].replace("/", "."))
return packages
def get_pkg_files(base_dir, name):
package_files = []
pkg_dir = os.path.join(base_dir, 'src', name)
pkg_data_dir = os.path.join(pkg_dir, 'static')
for (path, directories, filenames) in os.walk(pkg_data_dir):
for filename in filenames:
package_files.append(os.path.join(os.path.relpath(path, pkg_dir),
filename))
return package_files
def enable_init_files(init_dir, init_dist_files):
for f in init_dist_files:
os.chmod(os.path.join(init_dir, os.path.basename(f)), 0755)
def move_config_files(config_dir, config_files):
for f in config_files:
target_file = os.path.join(config_dir, f)
if not os.path.exists(target_file):
echo("Installing config file %s", target_file)
os.rename(target_file + ".dist", target_file)
# os.chmod(target_file, 0644)
else:
echo("Not overwriting config file %s", target_file)
def create_openmtc_user(db_dir=None, log_dir=None):
try:
from pwd import getpwnam
except ImportError:
print "Could not import the 'pwd' module. Skipping user management"
else:
# assuming DB_DIR was created by setup already
try:
pw = getpwnam('openmtc')
except KeyError as e:
try:
# add system user openmtc:openmtc
# useradd --system -UM openmtc
useradd = "useradd --system -UM openmtc"
retcode = subprocess.call(useradd, shell=True)
if retcode:
raise Exception("Failed to add user 'openmtc'")
pw = getpwnam('openmtc')
except Exception as e:
sys.stderr.write("Error creating user: %s\n" % (e, ))
sys.exit(1)
uid = pw.pw_uid
gid = pw.pw_gid
# set path permissions
if db_dir:
os.chown(db_dir, uid, gid)
if log_dir:
os.chown(log_dir, uid, gid)
class OpenMTCSdist(distutils.command.sdist.sdist):
def make_release_tree(self, base_dir, files):
distutils.command.sdist.sdist.make_release_tree(self, base_dir, files)
script_name = os.path.basename(sys.argv[0])
if script_name != "setup.py":
os.rename(base_dir + "/" + script_name, base_dir + "/setup.py")
self.filelist.files.remove(script_name)
self.filelist.files.append("setup.py")
class OpenMTCSdistBinary(OpenMTCSdist, object):
def make_release_tree(self, base_dir, files):
super(OpenMTCSdistBinary, self).make_release_tree(base_dir, files)
script_name = os.path.basename(sys.argv[0])
build_py = self.get_finalized_command('build_py')
build_py.compile = 1
build_py.optimize = 2
build_py.retain_init_py = 1
build_py.build_lib = base_dir
build_py.byte_compile(
[base_dir + "/" + f for f in self.filelist.files if
f != script_name and f.endswith(".py")])
class OpenMTCBuildPy(distutils.command.build_py.build_py):
retain_init_py = 0
def byte_compile(self, files):
distutils.command.build_py.build_py.byte_compile(self, files)
class OpenMTCBuildPyBinary(OpenMTCBuildPy, object):
retain_init_py = 0
def byte_compile(self, files):
super(OpenMTCBuildPyBinary, self).byte_compile(files)
for f in files:
if (f.endswith('.py') and (os.path.basename(f) != "__init__.py" or
not self.retain_init_py)):
os.unlink(f)

9
apps/influx-db Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
cd $(dirname ${0})
. ./prep-env.sh
cd InfluxDB
PYTHONPATH=${PYTHONPATH}:src exec python -m influxdb $@