add csv injector app

This commit is contained in:
Christian Klopp 2018-04-05 14:08:27 +02:00
parent 72eeb68563
commit 955903a464
15 changed files with 710 additions and 0 deletions

9
apps/csv-injector Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
cd $(dirname ${0})
. ./prep-env.sh
cd csvInjector
PYTHONPATH=${PYTHONPATH}:src exec python -m csvinjector $@

View File

@ -0,0 +1 @@
include utils.py

View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
exec python -m csvinjector $@

View File

@ -0,0 +1,26 @@
{
"name": "csvInjector",
"ep": "http://localhost:8000",
"cse_base": "onem2m",
"poas": [
"http://auto:28300"
],
"originator_pre": "//openmtc.org/mn-cse-1",
"ssl_certs": {
"cert_file": null,
"key_file": null,
"ca_certs": null
},
"logging": {
"level": "ERROR",
"file": null
},
"csv_path": "~/test.csv",
"csv_delim": ",",
"csv_quotechar": "|",
"device_classifier": "sensor_id",
"date_classifier": "date",
"time_format":"%d/%m/%Y-%H:%M",
"duration": 300,
"repeat": false
}

View File

@ -0,0 +1,73 @@
#!/usr/bin/env bash
CONFIG_FILE="/etc/openmtc/csvinjector/config.json"
NAME=${NAME-"csvInjector"}
EP=${EP-"http://localhost:8000"}
CSE_BASE=${CSE_BASE-"onem2m"}
POAS=${POAS-'["http://auto:28300"]'}
ORIGINATOR_PRE=${ORIGINATOR_PRE-"//openmtc.org/mn-cse-1"}
SSL_CRT=${SSL_CRT-"/etc/openmtc/certs/csvinjector.cert.pem"}
SSL_KEY=${SSL_KEY-"/etc/openmtc/certs/csvinjector.key.pem"}
SSL_CA=${SSL_CA-"/etc/openmtc/certs/ca-chain.cert.pem"}
CSV_PATH=${CSV_PATH-"/test.csv"}
CSV_DELIM=${CSV_DELIM-","}
CSV_QUOTECHAR=${CSV_QUOTECHAR-"|"}
CSV_DEVICE_CLASSIFIER=${CSV_DEVICE_CLASSIFIER-""}
CSV_DATE_CLASSIFIER=${CSV_DATE_CLASSIFIER-""}
CSV_TIME_FORMAT=${CSV_TIME_FORMAT-"%d/%m/%Y-%H:%M"}
DURATION=${DURATION-300}
REPEAT=${REPEAT-"False"}
# defaults logging
LOGGING_FILE=${LOGGING_FILE-"/var/log/openmtc/csvinjector.log"}
LOGGING_LEVEL=${LOGGING_LEVEL-"ERROR"}
# ensure correct level
case ${LOGGING_LEVEL} in
FATAL|ERROR|WARN|INFO|DEBUG)
;;
*)
LOGGING_LEVEL="ERROR"
;;
esac
# local ip
LOCAL_IP=$(ip r get 8.8.8.8 | awk 'NR==1 {print $NF}')
# set hostname
HOST_NAME=${EXTERNAL_IP-${LOCAL_IP}}
# Configuration of the service.
CONFIG_TEMP=${CONFIG_FILE}".tmp"
echo -n "Configuring M2M csvinjector..."
JQ_STRING='.'
# basics
JQ_STRING=${JQ_STRING}' |
.name = "'${NAME}'" |
.ep = "'${EP}'" |
.cse_base = "'${CSE_BASE}'" |
.poas = '${POAS}' |
.originator_pre = "'${ORIGINATOR_PRE}'" |
.ssl_certs.cert_file = "'${SSL_CRT}'" |
.ssl_certs.key_file = "'${SSL_KEY}'" |
.ssl_certs.ca_certs = "'${SSL_CA}'" |
.logging.file = "'${LOGGING_FILE}'" |
.logging.level = "'${LOGGING_LEVEL}'" |
.csv_path = "'${CSV_PATH}'" |
.csv_delim = "'${CSV_DELIM}'" |
.csv_quotechar = "'${CSV_QUOTECHAR}'" |
.device_classifier = '${CSV_DEVICE_CLASSIFIER}' |
.date_classifier = '${CSV_DATE_CLASSIFIER}' |
.time_format = '${CSV_TIME_FORMAT}' |
.duration = '${DURATION}' |
.repeat = "'${REPEAT}'"
'
cat ${CONFIG_FILE} | jq -M "${JQ_STRING}"> ${CONFIG_TEMP}
mv ${CONFIG_TEMP} ${CONFIG_FILE}
echo "done"
exec python -m csvinjector $@

View File

@ -0,0 +1,30 @@
############################################################
# Dockerfile to run openmtc csvinjector binary
############################################################
# Set the base image to use openmtc/sdk
FROM openmtc/sdk-amd64:latest
ENV MOD_NAME=csvinjector
# Set the file maintainer
MAINTAINER rst
# install openmtc dependencies
COPY tmp/$MOD_NAME-dependencies.txt /tmp/requirements.txt
RUN pip install --upgrade --requirement /tmp/requirements.txt
# install openmtc-csvinjector
COPY tmp/openmtc-$MOD_NAME.tar.gz /tmp/openmtc-$MOD_NAME.tar.gz
RUN tar xzf /tmp/openmtc-$MOD_NAME.tar.gz -C / \
--owner root --group root --no-same-owner --no-overwrite-dir \
--transform 's/json\.dist/json/' --show-transformed
RUN mkdir -p /var/log/openmtc
# add change config
COPY configure-$MOD_NAME-and-start /usr/local/bin/configure-and-start
# entry point
ENTRYPOINT ["/usr/local/bin/configure-and-start"]
CMD [""]

View File

@ -0,0 +1,30 @@
############################################################
# Dockerfile to run openmtc csvinjector binary
############################################################
# Set the base image to use openmtc/sdk
FROM openmtc/sdk-arm:latest
ENV MOD_NAME=csvinjector
# Set the file maintainer
MAINTAINER rst
# install openmtc dependencies
COPY tmp/$MOD_NAME-dependencies.txt /tmp/requirements.txt
RUN pip install --upgrade --requirement /tmp/requirements.txt
# install openmtc-csvinjector
COPY tmp/openmtc-$MOD_NAME.tar.gz /tmp/openmtc-$MOD_NAME.tar.gz
RUN tar xzf /tmp/openmtc-$MOD_NAME.tar.gz -C / \
--owner root --group root --no-same-owner --no-overwrite-dir \
--transform 's/json\.dist/json/' --show-transformed
RUN mkdir -p /var/log/openmtc
# add change config
COPY configure-$MOD_NAME-and-start /usr/local/bin/configure-and-start
# entry point
ENTRYPOINT ["/usr/local/bin/configure-and-start"]
CMD [""]

View File

@ -0,0 +1,26 @@
{
"name": "csvInjector",
"ep": "http://localhost:8000",
"cse_base": "onem2m",
"poas": [
"http://auto:28300"
],
"originator_pre": "//openmtc.org/mn-cse-1",
"ssl_certs": {
"cert_file": "/etc/openmtc/certs/csvinjector.cert.pem",
"key_file": "/etc/openmtc/certs/csvinjector.key.pem",
"ca_certs": "/etc/openmtc/certs/ca-chain.cert.pem"
},
"logging": {
"level": "INFO",
"file": "/var/log/openmtc/csvinjector.log"
},
"csv_path": "~/test.csv",
"csv_delim": ",",
"csv_quotechar": "|",
"device_classifier": "sensor_id",
"date_classifier": "sensor_id",
"time_format":"%d/%m/%Y-%H:%M",
"duration": 300,
"repeat": false
}

View File

@ -0,0 +1,10 @@
[Unit]
Description=OpenMTC csvInjector
After=network.target
Wants=ntp.service
[Service]
ExecStart=/usr/local/bin/csv-injector
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,82 @@
#!/usr/bin/env python
from setuptools import setup
from distutils.core import setup
from glob import glob
import sys
from utils import get_packages, get_pkg_files, OpenMTCSdist, move_config_files
# name and dir
NAME = "csvinjector"
BASE_DIR = "."
# import pkg
sys.path.append(BASE_DIR + "/src")
pkg = __import__(NAME)
# setup name and version
SETUP_NAME = "openmtc-" + NAME
SETUP_VERSION = pkg.__version__
SETUP_DESCRIPTION = pkg.__description__
# meta
SETUP_AUTHOR = pkg.__author_name__
SETUP_AUTHOR_EMAIL = pkg.__author_mail__
SETUP_URL = "http://www.openmtc.org"
SETUP_LICENSE = "Fraunhofer FOKUS proprietary"
# requirements
SETUP_REQUIRES = pkg.__requires__
SETUP_INSTALL_REQUIRES = pkg.__requires__
# packages
PACKAGES = [NAME]
PACKAGE_DIR = {"": BASE_DIR + "/src"}
all_packages = []
for package in PACKAGES:
all_packages.extend(get_packages(package, PACKAGE_DIR))
# scripts
SETUP_SCRIPTS = glob(BASE_DIR + "/bin/*")
# package data
PACKAGE_DATA = {NAME: get_pkg_files(BASE_DIR, NAME)}
# data files
CONFIG_FILES = ("config.json",)
CONFIG_DIR = "/etc/openmtc/" + NAME
CONFIG_DIST_FILES = (BASE_DIR + "/etc/conf/config.json.dist",)
DATA_FILES = [(CONFIG_DIR, CONFIG_DIST_FILES)]
# cmd class
CMD_CLASS = {'sdist': OpenMTCSdist}
if __name__ == "__main__":
if 'bdist_wheel' in sys.argv:
raise RuntimeError("This setup.py does not support wheels")
############################################################################
# setup
setup(name=SETUP_NAME,
version=SETUP_VERSION,
description=SETUP_DESCRIPTION,
author=SETUP_AUTHOR,
author_email=SETUP_AUTHOR_EMAIL,
url=SETUP_URL,
license=SETUP_LICENSE,
requires=SETUP_REQUIRES,
install_requires=SETUP_INSTALL_REQUIRES,
package_dir=PACKAGE_DIR,
packages=all_packages,
scripts=SETUP_SCRIPTS,
package_data=PACKAGE_DATA,
data_files=DATA_FILES,
cmdclass=CMD_CLASS
)
############################################################################
# install
if "install" in sys.argv:
# only do this during install
move_config_files(CONFIG_DIR, CONFIG_FILES)

View File

@ -0,0 +1,9 @@
"""
App to inject data from a csv file to OpenMTC
"""
__version__ = "0.1"
__description__ = "csvInjector"
__author_name__ = "Christian Klopp"
__author_mail__ = "christian.klopp@fokus.fraunhofer.de"
__requires__ = []

View File

@ -0,0 +1,85 @@
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from openmtc_app.util import prepare_app, get_value
from openmtc_app.runner import AppRunner as Runner
from .csv_injector import csvInjector
# defaults
default_name = "csvInjector"
default_ep = "http://localhost:8000"
default_csv_path = "~/test.csv"
default_csv_delim = ","
default_csv_quotechar = "|"
default_device_classifier = ""
default_date_classifier = "DATE"
default_time_format = "%d/%m/%Y-%H:%M"
default_duration = 300
default_repeat = False
# args parser
parser = ArgumentParser(
description="An IPE called csvInjector",
prog="csvInjector",
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("-n", "--name", help="Name used for the AE.")
parser.add_argument("-s", "--ep", help="URL of the local Endpoint.")
parser.add_argument("-f", "--csv-path", help="Path to CSV File")
parser.add_argument("--csv-delim", help="Delimiter used for the provided csv")
parser.add_argument(
"--csv-quotechar", help="Quotechar used for the provided csv")
parser.add_argument(
"--device-classifier", help="Column used to specify different devices in csv")
parser.add_argument(
"--date-classifier", help="Column used to specify where dates are defined in csv")
parser.add_argument(
"--time-format", help="Format of the date column in csv (see https://docs.python.org/2/library/datetime.html#strftime-strptime-behavior)")
parser.add_argument(
"--duration", help="Time to inject the csv (if csv time data does not fit, it will be scaled)")
parser.add_argument(
"--repeat", help="Repeat after csv is injected")
# args, config and logging
args, config = prepare_app(parser, __loader__, __name__, "config.json")
# variables
nm = get_value("name", (unicode, str), default_name, args, config)
cb = config.get("cse_base", "onem2m")
ep = get_value("ep", (unicode, str), default_ep, args, config)
poas = config.get("poas", ["http://auto:28300"])
originator_pre = config.get("originator_pre", "//openmtc.org/mn-cse-1")
ssl_certs = config.get("ssl_certs", {})
csv_path = get_value("csv_path", (unicode, str), default_csv_path, args,
config)
csv_delim = get_value("csv_delim", (unicode, str), default_csv_delim, args,
config)
csv_quotechar = get_value("csv_quotechar", (unicode, str),
default_csv_quotechar, args, config)
device_classifier = get_value("device_classifier", (unicode, str),
default_device_classifier, args, config)
date_classifier = get_value("date_classifier", (unicode, str, list),
default_date_classifier, args, config)
time_format = get_value("time_format", (unicode, str, list),
default_time_format, args, config)
duration = get_value("duration", (int, float),
default_duration, args, config)
repeat = get_value("repeat", (unicode, str),
default_repeat, args, config)
# start
app = csvInjector(
name=nm,
cse_base=cb,
poas=poas,
originator_pre=originator_pre,
csv_path=csv_path,
csv_delim=csv_delim,
csv_quotechar=csv_quotechar,
device_classifier=device_classifier,
date_classifier=date_classifier,
time_format=time_format,
csv_inject_duration=duration,
repeat=repeat,
**ssl_certs)
Runner(app).run(ep)
print("Exiting....")

View File

@ -0,0 +1,110 @@
from openmtc_app.onem2m import XAE
from openmtc_onem2m.model import Container
from csv_process import csvProcessor
import sched
import time
import datetime
class csvInjector(XAE):
def __init__(self,
csv_path,
csv_delim,
csv_quotechar,
device_classifier,
date_classifier,
time_format,
csv_inject_duration=0,
repeat=False,
*args,
**kw):
super(csvInjector, self).__init__(*args, **kw)
self._recognized_sensors = {}
self._recognized_measurement_containers = {}
# csv key to differ between devices
self.device_classifier = device_classifier
self.date_classifier = date_classifier
self.csv_path = csv_path
self.csv_delim = csv_delim
self.csv_quotechar = csv_quotechar
self.time_format = time_format
self.csv_inject_duration = csv_inject_duration
self.repeat = repeat
def _on_register(self):
# start endless loop
self._init_scheduler()
self.scheduler.run()
if self.repeat:
while True:
self._init_scheduler()
self.scheduler.run()
def _init_scheduler(self):
# read csv
self.csv_data_list = csvProcessor(self.csv_path, self.csv_delim,
self.csv_quotechar, self.time_format,
self.csv_inject_duration,
self.date_classifier).csv_data
# setup scheduler
self.scheduler = sched.scheduler(time.time, time.sleep)
for event in self.csv_data_list:
if isinstance(self.date_classifier, list):
self.scheduler.enter(event["timestamp_schedule"], 1,
self.push_data, (event, ))
else:
self.scheduler.enter(event[self.date_classifier], 1,
self.push_data, (event, ))
def _create_measurement_container(self, device_name, name):
measurement_container = self.create_container(
self._recognized_sensors[device_name].path,
Container(resourceName=name),
max_nr_of_instances=0,
labels=[
'openmtc:sensor_data:{}'.format(name), 'openmtc:sensor_data'
])
self._recognized_measurement_containers[device_name][
name] = measurement_container
def _create_sensor_structure(self, event):
device_container = self.create_container(
None,
Container(resourceName=event[self.device_classifier]),
labels=['openmtc:device'],
max_nr_of_instances=0)
self._recognized_sensors[event[
self.device_classifier]] = device_container
self._recognized_measurement_containers[event[
self.device_classifier]] = {}
for k in event.keys():
if k == "Date" or k == self.device_classifier or k in ("", None):
continue
self._create_measurement_container(event[self.device_classifier],
k)
def push_data(self, event):
sensor = event[self.device_classifier]
if not sensor in self._recognized_sensors:
self._create_sensor_structure(event)
device_container = self._recognized_sensors[sensor]
for k in event.keys():
if k == "Date" or k == self.device_classifier or event[k] in (
"", None):
continue
if not k in self._recognized_measurement_containers[sensor].keys():
self._create_measurement_container(sensor, k)
timestamp = time.mktime(datetime.datetime.now().timetuple())
senml = {
"bn": "csv_extracted",
"n": k,
"u": "None",
"t": timestamp,
"v": event[k]
}
self.logger.debug("sensor {} sends data: {}".format(sensor, senml))
self.push_content(
self._recognized_measurement_containers[sensor][k], [senml])

View File

@ -0,0 +1,68 @@
import csv
from datetime import datetime
from futile.logging import LoggerMixin
class csvProcessor(LoggerMixin):
def __init__(self,
path,
delim=",",
quotechar="|",
time_format="%d/%m/%Y-%H:%M",
duration=0,
date_classifier="date"):
with open(path, 'rb') as csvfile:
self.csv_data = list(csv.DictReader(csvfile))
self.time_format = time_format
self.duration = duration
self.date_classifier = date_classifier
# Date Processing
if isinstance(self.date_classifier, list):
self._join_multiple_timestamps()
self._date_string_to_date_object()
self.csv_data = sorted(
self.csv_data, key=lambda k: k[self.date_classifier])
self._date_to_seconds_since_first()
self._scale_to_duration()
def _join_multiple_timestamps(self):
for entry in self.csv_data:
entry["timestamp_schedule"] = "-".join([entry[k] for k in self.date_classifier])
for date_c in self.date_classifier:
entry.pop(date_c, None)
self.date_classifier = "timestamp_schedule"
self.time_format = "-".join(self.time_format)
def _date_string_to_date_object(self):
for entry in self.csv_data:
entry[self.date_classifier] = datetime.strptime(
entry[self.date_classifier], self.time_format)
def _date_to_seconds_since_first(self):
for entry in self.csv_data:
if 'first' in locals():
entry[self.date_classifier] = (
entry[self.date_classifier] - first).total_seconds()
else:
first = entry[self.date_classifier]
entry[self.date_classifier] = 0.0
def _scale_to_duration(self):
if self.duration <= 0:
return
scaling_factor = self.duration / self.csv_data[-1][
self.date_classifier]
self.logger.debug("Set scaling factor to {}".format(scaling_factor))
for entry in self.csv_data:
entry[self.date_classifier] = entry[
self.date_classifier] * scaling_factor
def getList(self):
return self.csv_data
if __name__ == "__main__":
p = csvProcessor("example.csv", duration=300)
for e in p.csv_data:
print e

148
apps/csvInjector/utils.py Normal file
View File

@ -0,0 +1,148 @@
import distutils.command.sdist
import distutils.command.build_py
import os
import subprocess
import sys
def echo(msg, *args):
if args:
msg = msg % args
sys.stdout.write(msg + "\n")
def get_packages(package, package_dir, excluded_list=None, included_list=None):
included_list = included_list or []
excluded_list = excluded_list or []
try:
root = package_dir[package]
except KeyError:
root = package_dir.get("", ".") + "/" + package
if not os.path.exists(root):
sys.stderr.write(
"Directory for package %s does not exist: %s\n" % (package, root))
sys.exit(1)
def on_error(error):
sys.stderr.write(
"Error while collecting packages for %s: %s\n" % (package, error))
sys.exit(1)
packages = [package]
r_prefix = len(root) + 1
for path, dirs, files in os.walk(root, onerror=on_error):
is_module = "__init__.py" in files and path != root
excluded = any(map(lambda x: x in path, excluded_list))
included = any(map(lambda x: x in path, included_list))
if is_module and (not excluded or included):
packages.append(package + "." + path[r_prefix:].replace("/", "."))
return packages
def get_pkg_files(base_dir, name):
package_files = []
pkg_dir = os.path.join(base_dir, 'src', name)
pkg_data_dir = os.path.join(pkg_dir, 'static')
for (path, directories, filenames) in os.walk(pkg_data_dir):
for filename in filenames:
package_files.append(os.path.join(os.path.relpath(path, pkg_dir),
filename))
return package_files
def enable_init_files(init_dir, init_dist_files):
for f in init_dist_files:
os.chmod(os.path.join(init_dir, os.path.basename(f)), 0755)
def move_config_files(config_dir, config_files):
for f in config_files:
target_file = os.path.join(config_dir, f)
if not os.path.exists(target_file):
echo("Installing config file %s", target_file)
os.rename(target_file + ".dist", target_file)
# os.chmod(target_file, 0644)
else:
echo("Not overwriting config file %s", target_file)
def create_openmtc_user(db_dir=None, log_dir=None):
try:
from pwd import getpwnam
except ImportError:
print "Could not import the 'pwd' module. Skipping user management"
else:
# assuming DB_DIR was created by setup already
try:
pw = getpwnam('openmtc')
except KeyError as e:
try:
# add system user openmtc:openmtc
# useradd --system -UM openmtc
useradd = "useradd --system -UM openmtc"
retcode = subprocess.call(useradd, shell=True)
if retcode:
raise Exception("Failed to add user 'openmtc'")
pw = getpwnam('openmtc')
except Exception as e:
sys.stderr.write("Error creating user: %s\n" % (e, ))
sys.exit(1)
uid = pw.pw_uid
gid = pw.pw_gid
# set path permissions
if db_dir:
os.chown(db_dir, uid, gid)
if log_dir:
os.chown(log_dir, uid, gid)
class OpenMTCSdist(distutils.command.sdist.sdist):
def make_release_tree(self, base_dir, files):
distutils.command.sdist.sdist.make_release_tree(self, base_dir, files)
script_name = os.path.basename(sys.argv[0])
if script_name != "setup.py":
os.rename(base_dir + "/" + script_name, base_dir + "/setup.py")
self.filelist.files.remove(script_name)
self.filelist.files.append("setup.py")
class OpenMTCSdistBinary(OpenMTCSdist, object):
def make_release_tree(self, base_dir, files):
super(OpenMTCSdistBinary, self).make_release_tree(base_dir, files)
script_name = os.path.basename(sys.argv[0])
build_py = self.get_finalized_command('build_py')
build_py.compile = 1
build_py.optimize = 2
build_py.retain_init_py = 1
build_py.build_lib = base_dir
build_py.byte_compile(
[base_dir + "/" + f for f in self.filelist.files if
f != script_name and f.endswith(".py")])
class OpenMTCBuildPy(distutils.command.build_py.build_py):
retain_init_py = 0
def byte_compile(self, files):
distutils.command.build_py.build_py.byte_compile(self, files)
class OpenMTCBuildPyBinary(OpenMTCBuildPy, object):
retain_init_py = 0
def byte_compile(self, files):
super(OpenMTCBuildPyBinary, self).byte_compile(files)
for f in files:
if (f.endswith('.py') and (os.path.basename(f) != "__init__.py" or
not self.retain_init_py)):
os.unlink(f)