A layer for easy and structured real-time data exchange between assets.
The setup is based on the following requirements:
Install Docker version 1.10.0+
Install Docker Compose version 1.6.0+
Clone this Repository:
git clone https://github.com/i-Asset/distribution-network.git cd distribution-network git checkout staging
Apache Kafka is used as Core delivery framework. The easiest way to set up a Kafka cluster, is with Docker Compose.
docker-compose -f setup/kafka/docker-compose.yml up --build -d
docker-compose -f setup/kafka/docker-compose.yml ps # for stati of the services
docker-compose -f setup/kafka/docker-compose.yml logs -f # for continuous logs
docker-compose -f setup/kafka/docker-compose.yml down # shut down the cluster, remove data with -v flag
To test the Kafka setup, list, create and delete topics, Kafka binaries are required (see the next subsection (optional) Install Kafka binaries for that). For information to set up a Kafka cluster on multiple nodes without Docker (which is suggested) for production, check this guide.
This step is only required for developing the Distribution Network, because the service is run on the host. For the productive mode, the service runs within a Docker container and therefore doesn't need Kafka binaries on the host. It is also important to note, that it is hard to install Kafka on Windows.
The following lines installs Kafka version 2.7 locally in the directory /kafka
sudo apt-get update
sh setup/kafka/install-kafka.sh
sh setup/kafka/install-kafka-libs.sh
With the Kafka binaries topics can be listed, created and deleted. Moreover, messages can be produced and consumed.
/kafka/bin/kafka-topics.sh --zookeeper :2181 --list
/kafka/bin/kafka-topics.sh --zookeeper :2181 --create --topic test_single --replication-factor 1 --partitions 1
/kafka/bin/kafka-topics.sh --zookeeper :2181 --create --topic test --replication-factor 3 --partitions 3 --config cleanup.policy=compact --config retention.ms=3628800000 --config retention.bytes=-1
/kafka/bin/kafka-topics.sh --zookeeper :2181 --delete --topic test_single
/kafka/bin/kafka-topics.sh --zookeeper :2181 --list
/kafka/bin/kafka-console-producer.sh --broker-list :9092 --topic test
>Hello Kafka
> [Ctrl]+C
/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic test --from-beginning
Hello Kafka
PostgreSQL is used as relational database for static data within the Distribution Network.
The easiest way to setup PostgreSQL is via Docker Compose:
docker-compose -f setup/postgresql/docker-compose.yml up --build -d
docker-compose -f setup/postgresql/docker-compose.yml ps # status of postgres
docker-compose -f setup/postgresql/docker-compose.yml logs -f # for continuous logs
docker-compose -f setup/postgresql/docker-compose.yml down # shut down, remove data with -v flag
As an alternative, e.g., one can also setup postgreSQL directly on the host. There are various instructions on how to install postgres, this one works for Ubuntu 20.04:
sudo apt install libpq-dev
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install postgresql
sudo -u postgres psql -c "CREATE ROLE postgres LOGIN PASSWORD 'postgres';"
sudo -u postgres psql -c "CREATE DATABASE distributionnetworkdb OWNER postgres;"
sudo -u postgres psql -f setup/postgresql/initdb/db_init.sql
The database distributionnetworkdb
is created and filled with demo-data automatically
with the startup, as configured in setup/postgresql/initdb/db_init.sql
Check the entries in the tables using commands like:
docker exec -it postgresql_postgresdb_1 psql -U postgres -d distributionnetworkdb -c "select * from users;"
Note that dummy users and companies have negative ids, real entries have always non-negative entries.
The Distribution Service depends can be set up either on the host (preferred for development) or within a Docker container (easier to set up). Select the appropriate option below.
In both cases, the Distribution Service depends on the Delivery Framework Apache Kafka and the Database Postgres. The respective dependency-variables have to be valid and are required to set up the Distribution Service.
Make sure postgres
is available on port 5432
and postgres
is the owner of the database distributionnetworkdb
docker-compose -f /server/postgresql/docker-compose.yml up --build -d
Then set the environment variable DOCKER_HOST_IP
and start the main docker compose:
export DOCKER_HOST_IP=$(hostname -I | cut -d ' ' -f1)
docker-compose up --build -d
Check if everything works using:
docker ps
docker-compose logs -f
docker-compose logs -f distribution-network
docker inspect distribution-network_distribution-network_1
Make sure to start a new virtual env for this setup! Then, install the python modules via:
virtualenv --python=/path/to/python /path/to/new/venv
source /path/to/new/venv/bin/activate # e.g. /home/chris/anaconda3/envs/venv_iot4cps/bin/activate
pip install -r setup/requirements.txt
Additionally, a running Kafka instance is required, e.g.:
export DOCKER_HOST_IP=$(hostname -I | cut -d ' ' -f1)
docker-compose -f /server/kafka/docker-compose.yml up --build -d
Make sure that the file server/.env
directs to the correct configuration set, that is
either development
, production
, docker
or platform-only
(that doesn't interact with the
Kafka data streaming).
The platform can be started by running:
export FLASK_APP=$(pwd)/server/app.py
python -m flask run --host $(hostname -I | cut -d " " -f1) --port 1908
One easy way to check the platform is via the light-weight user interface that comes with the distribution-network:
The RestAPI is the preferred user interface and is documented in swagger on server/distributionnetwork/swagger-ui.html.
Execute without parameters and authorization.
Creates the specified system with dependencies and Kafka Topics. Requires that the user and company exist. In contrast to POST, the PUT method allows an edit of the system.
i-Asset Server:
"company_id": 455,
"description": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. \n Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec \n quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim.",
"kafka_servers": "localhost:9092",
"mqtt_broker": {
"mqtt_server": "mqtt.eclipse.org:1883",
"mqtt_version": ""
"workcenter": "labor",
"station": "testStation"
curl -X POST "https://iasset.salzburgresearch.at/distributionnetwork/systems_by_person/453" -H "accept: application/json" -H "Authorization: [bearer-token]" -H "Content-Type: application/json" -d "{ \"company_id\": 455, \"description\": \"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. \ Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec \ quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim.\", \"kafka_servers\": \"localhost:9092\", \"mqtt_broker\": { \"mqtt_server\": \"mqtt.eclipse.org:1883\", \"mqtt_version\": \"\" }, \"workcenter\": \"labor\", \"station\": \"testStation\"}"
Get systems by user_id
and password
(= personId
and bearer_token
curl -X GET --header 'Accept: application/json' --header 'Authorization: asdf' 'localhost:1908/distributionnetwork/systems_by_person/-1'
or as Python request:
import requests
res = requests.get(url="http://localhost:1908/distributionnetwork/systems_by_person/-2",
headers={'content-type': 'application/json',
'Authorization': "asdf"})
status_code = res.status_code
result = res.json()
i-Asset Server:
persionId = 453
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/systems_by_person/453" -H "accept: application/json" -H "Authorization: [bearer-token]"
Get systems by user_id
and password
(= personId
and bearer_token
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
curl -X DELETE "https://iasset.salzburgresearch.at/distributionnetwork/delete_system/453/ee.455.puch.testStation" -H "accept: application/json" -H "Authorization: [bearer-token]"
Requires user_id
, system_name
and password
(= personId
and bearer_token
) in the header as Python request:
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/client_apps/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]"
Creates a Client App for a specified system. Requires that the user and system exist.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
"name": "client_app_1",
"resource_uri": "https://iasset.salzburgresearch.at/registry/sec_uuid",
"on_kafka": true,
"description": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit."
curl -X POST "https://iasset.salzburgresearch.at/distributionnetwork/client_apps/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]" -H "Content-Type: application/json" -d "{ \"name\": \"client_app_1\", \"resource_uri\": \"https://iasset.salzburgresearch.at/registry/sec_uuid\", \"on_kafka\": true, \"description\": \"Lorem ipsum dolor sit amet, consectetuer adipiscing elit.\"}"
Requires user_id
, system_name
, client_name
and password
(= personId
and bearer_token
in the header. The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
client_name = client_app_1
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/client_apps/453/ee_455_labor_testStation/client_app_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
Deletes the specified Client App within a system.
Requires user_id
, system_name
, client_name
and password
(= personId
and bearer_token
in the header. The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
client_name = client_app_1
curl -X DELETE "https://iasset.salzburgresearch.at/distributionnetwork/delete_client_app/453/ee_455_labor_testStation/client_app_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
A Thing connection is a digital instance that connects the distribution network with a metadata resource of that thing.
Requires user_id
, system_name
and password
(= personId
and bearer_token
) in the header as Python request:
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/things/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]"
Creates a Thing Connection for a specified system. Requires that the user and system exist.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
"name": "thing_1",
"resource_uri": "https://iasset.salzburgresearch.at/registry/sec_uuid",
"on_kafka": true,
"description": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit."
curl -X POST "https://iasset.salzburgresearch.at/distributionnetwork/things/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]" -H "Content-Type: application/json" -d "{ \"name\": \"thing_1\", \"resource_uri\": \"https://iasset.salzburgresearch.at/registry/sec_uuid\", \"on_kafka\": true, \"description\": \"Lorem ipsum dolor sit amet, consectetuer adipiscing elit.\"}"
Requires user_id
, system_name
, thing_name
and password
(= personId
and bearer_token
in the header. The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
thing_name = thing_1
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/things/453/ee_455_labor_testStation/thing_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
Deletes a specific Thing Connection within the system.
Requires user_id
, system_name
, thing_name
and password
(= personId
and bearer_token
in the header. The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
thing_name = thing_1
curl -X DELETE "https://iasset.salzburgresearch.at/distributionnetwork/delete_thing/453/ee_455_labor_testStation/thing_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
Note that Industry 4.0 Components usually constitute a 1-1-connection of a Client App as digital instance and a thing as abstraction of the physical instance. Therefore, for each I4.0 component a Client App as well as a Thing connection has to be created.
Requires user_id
, system_name
and password
(= personId
and bearer_token
) in the header.
Optionally one can narrow down the number of hits by specifying thing_name
or client_name
The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
client_name = client_app_1
thing_name = thing_1
# For System:
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/datastreams/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]"
# For System and Client App:
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/datastreams_per_client/453/ee_455_labor_testStation/client_app_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
# For System and Thing Connection:
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/datastreams_per_thing/453/ee_455_labor_testStation/thing_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
Creates datastreams for a specified system, client app and aas connection. Requires that all instances exist.
i-Asset Server:
persionId = 453
system_name = ee_455_labor_testStation # note that '.' must be replaced by '_'
"name": "Air Temperature",
"shortname": "temperature",
"description": "Lorem ipsum",
"thing_name": "thing_1",
"client_name": "client_app_1"
"name": "Air Humidity",
"shortname": "humidity",
"description": "Lorem ipsum",
"thing_name": "thing_1",
"client_name": "client_app_1"
curl -X POST "https://iasset.salzburgresearch.at/distributionnetwork/datastreams/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]" -H "Content-Type: application/json" -d "[ { \"name\": \"Air Temperature\", \"shortname\": \"temperature\", \"description\": \"Lorem ipsum\", \"thing_name\": \"thing_1\", \"client_name\": \"client_app_1\" }, { \"name\": \"Air Humidity\", \"shortname\": \"humidity\", \"description\": \"Lorem ipsum\", \"thing_name\": \"thing_1\", \"client_name\": \"client_app_1\" }]"
"name": "Air Temperature",
"shortname": "temperature",
"description": "Lorem ipsum",
"thing_name": "Weatherstation_1",
"client_name": "weatherstation_1"
"name": "Air Humidity",
"shortname": "humidity",
"description": "Lorem ipsum",
"thing_name": "Weatherstation_1",
"client_name": "weatherstation_1"
"name": "Air Temperature",
"shortname": "temperature",
"description": "Lorem ipsum",
"thing_name": "Weatherstation_2",
"client_name": "weatherstation_2"
"name": "Air Humidity",
"shortname": "humidity",
"description": "Lorem ipsum",
"thing_name": "Weatherstation_2",
"client_name": "weatherstation_2"
Delete datastreams from a specified system.
persionId = 453
system_name = ee_455_labor_testStation # note that '.' must be replaced by '_'
thing_name = thing_1
curl -X DELETE "http://localhost:1908/distributionnetwork/delete_datastreams/-1/at.srfg.MachineFleet.Machine1" -H "accept: application/json" -H "Authorization: asdf" -H "Content-Type: application/json" -d "[\"temperature2\"]"
Requires user_id
, system_name
and password
(= personId
and bearer_token
) in the header.
Optionally one can narrow down the number of hits by specifying the client_name
The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
client_name = client_app_1
# For System:
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/subscriptions/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]"
# For System and Client App:
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/subscriptions_per_client/453/ee_455_labor_testStation/client_app_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
Creates datastream subscriptions in specified system for a Client App. Requires that all referred instances exist.
i-Asset Server:
persionId = 453
system_name = ee_455_labor_testStation # note that '.' must be replaced by '_'
client_app = client_app_1
"shortname": "temperature",
"thing_name": "thing_1",
"system_name": "ee.455.labor.testStation"
"shortname": "humidity",
"thing_name": "thing_1",
"system_name": "ee.455.labor.testStation"
"shortname": "temperature",
"thing_name": "Weatherstation_1",
"system_name": "at.srfg.WeatherService.Stations"
"shortname": "humidity",
"thing_name": "Weatherstation_1",
"system_name": "at.srfg.WeatherService.Stations"
"shortname": "temperature",
"thing_name": "Weatherstation_2",
"system_name": "at.srfg.WeatherService.Stations"
"shortname": "humidity",
"thing_name": "Weatherstation_2",
"system_name": "at.srfg.WeatherService.Stations"
curl -X PUT "http://localhost:1908/distributionnetwork/subscriptions_per_client/-2/at_srfg_WeatherService_Stations/weather_analytics" -H "accept: application/json" -H "Authorization: asdf" -H "Content-Type: application/json" -d "[ { \"shortname\": \"temperature\", \"thing_name\": \"Weatherstation_1\", \"system_name\": \"at.srfg.WeatherService.Stations\" }, { \"shortname\": \"humidity\", \"thing_name\": \"Weatherstation_1\", \"system_name\": \"at.srfg.WeatherService.Stations\" }, { \"shortname\": \"temperature\", \"thing_name\": \"Weatherstation_2\", \"system_name\": \"at.srfg.WeatherService.Stations\" }, { \"shortname\": \"humidity\", \"thing_name\": \"Weatherstation_2\", \"system_name\": \"at.srfg.WeatherService.Stations\" }]"
Delete datastream subscriptions from a specified system and client.
i-Asset Server:
persionId = 453
system_name = ee_455_labor_testStation
client_app = client_app_1
"shortname": "humidity",
"thing_name": "thing_1",
"system_name": "ee_455_labor_testStation"
curl -X DELETE "http://localhost:1908/distributionnetwork/delete_subscriptions/-2/at.srfg.WeatherService.Stations/weather_analytics" -H "accept: application/json" -H "Authorization: asdf" -H "Content-Type: application/json" -d "[ { \"shortname\": \"humidity\", \"thing_name\": \"Weatherstation_1\", \"system_name\": \"at.srfg.WeatherService.Stations\" }]"
"shortname": "humidity",
"thing_name": "Weatherstation_1",
"system_name": "at.srfg.WeatherService.Stations"
curl -X DELETE "http://localhost:1908/distributionnetwork/delete_subscriptions/-2/at.srfg.WeatherService.Stations/weather_analytics" -H "accept: application/json" -H "Authorization: asdf" -H "Content-Type: application/json" -d "[ { \"shortname\": \"humidity\", \"thing_name\": \"Weatherstation_1\", \"system_name\": \"at.srfg.WeatherService.Stations\" }]"
A Stream Application is rule-based forwarder for datastreams from one system to another.
Requires user_id
, system_name
and password
(= personId
and bearer_token
) in the header as Python request:
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/stream_apps/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]"
Creates a Stream App for a specified system. Requires that the user and both systems exist.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
"name": "stream_app_1",
"target_system": "at.srfg.Analytics.MachineAnalytics",
"logic": "SELECT * FROM * WHERE quantity='temperature' AND result<2.7;",
"description": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit."
curl -X POST "https://iasset.salzburgresearch.at/distributionnetwork/stream_apps/453/ee_455_labor_testStation" -H "accept: application/json" -H "Authorization: [bearer-token]" -H "Content-Type: application/json" -d "{ \"name\": \"stream_app_1\", \"target_system\": \"at.srfg.Analytics.MachineAnalytics\", \"logic\": \"SELECT * FROM * WHERE quantity='temperature' AND result<2.7;\", \"description\": \"Lorem ipsum dolor sit amet, consectetuer adipiscing elit.\"}"
Requires user_id
, system_name
, stream_name
and password
(= personId
and bearer_token
in the header. The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
stream_name = stream_app_1
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/stream_apps/453/ee_455_labor_testStation/stream_app_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
Deletes a specific Stream App within the system.
Requires user_id
, system_name
, stream_name
and password
(= personId
and bearer_token
in the header. The system_name
should use _
as level separator.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
stream_name = stream_app_1
curl -X DELETE "https://iasset.salzburgresearch.at/distributionnetwork/delete_thing/453/ee_455_labor_testStation/thing_1" -H "accept: application/json" -H "Authorization: [bearer-token]"
Requires user_id
, system_name
, stream_name
and password
(= personId
and bearer_token
in the header as Python request.
Some of the statistics are only available for a running Stream App.
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
stream_name = stream_app_1
statistic is a string and one of 'status' (default), 'is_running', 'stats', 'short_stats', 'logs_{number_of_logs}' or 'config'.
curl -X GET "https://iasset.salzburgresearch.at/distributionnetwork/stream_app_statistic/453/ee_455_labor_testStation/stream_app_1?statistic=config" -H "accept: application/json" -H "Authorization: [bearer-token]"
statistic is a string and one of 'status' (default), 'is_running', 'stats', 'short_stats', 'logs_{number_of_logs}' or 'config'.
Requires user_id
, system_name
, stream_name
and password
(= personId
and bearer_token
in the header as Python request:
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
stream_name = stream_app_1
curl -X POST "https://iasset.salzburgresearch.at/distributionnetwork/stream_app_deploy/453/ee_455_labor_testStation/stream_app_1" -H "accept: application/json" -H "Authorization: [bearer-token]" -H "Content-Type: application/json" -d "{}"
Requires user_id
, system_name
, stream_name
and password
(= personId
and bearer_token
in the header as Python request:
i-Asset Server:
persionId = 453
system = ee_455_labor_testStation
stream_name = stream_app_1
