The role of the Streaming Worker is to listen to a particular topic of the Kafka cluster and consume the incoming messages. Streaming data is divided into batches (according to a time interval). These batches are deserialized by the Worker, according to the supported Avro schema, parsed and registered in the corresponding table of Hive.
Streaming Worker can only run on the central infrastructure and it can be deployed in local, client or cluster mode.
Dependencies in Python:
- avro - Data serialization system.
Dependencies in Linux OS:
- pip - Python package manager.
- virtualenv - Tool to create isolated Python environments.
- zip - Package and compress (archive) files.
The first time you execute the run.sh
script, it copies the .worker.json
configuration file under your home directory. For this reason, it is recommended to setup your configuration file before you execute the script.
{
"consumer": {
"bootstrap_servers": ["kafka_server:kafka_port"],
"group_id": ""
},
"database": "dbname",
"kerberos": {
"kinit": "/usr/bin/kinit",
"principal": "user",
"keytab": "/opt/security/user.keytab"
},
"zkQuorum": "zk_server:zk_port"
}
- "consumer": This section contains configuration parameters for the
KafkaConsumer
.- "bootstrap_servers": 'host[:port]' string (or list of 'host[:port]' strings) that the consumer should contact to bootstrap initial cluster metadata.
- _"group_id": The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets.
- "database": Name of Hive database where all the ingested data will be stored.
- "kerberos": Kerberos is a network authentication protocol and in this section you can define its parameters. To enable this feature, you also need to set the environment variable
KRB_AUTH
. - "zkQuorum": The connection string for the zookeeper connection in the form host:port. Multiple URLs can be given to allow fail-over.
The section "consumer" is required only for the simple worker.
Example of Configuration file
An example of the configuration file for flow
is given below:
{
"database": "spotdb",
"kerberos": {
"kinit": "/usr/bin/kinit",
"principal": "spotuser",
"keytab": "/opt/security/spotuser.keytab"
},
"zkQuorum": "cloudera01:2181"
}
Before you start using the Streaming Worker, you should print the usage message and check the available options.
usage: worker [OPTIONS]... -t <pipeline> --topic <topic> -p <partition>
Streaming Worker reads from Kafka cluster and writes to Hive through the Spark2
streaming module.
Optional Arguments:
-h, --help show this help message and exit
-a STRING, --app-name STRING name of the Spark Job to display on the
-b INTEGER, --batch-duration INTEGER time interval (in seconds) at which
streaming data will be divided into
batches
-c FILE, --config-file FILE path of configuration file
-g STRING, --group-id STRING name of the consumer group to join for
dynamic partition assignment
-l STRING, --log-level STRING determine the level of the logger
-v, --version show program's version number and exit
Required Arguments:
-p INTEGER, --partition INTEGER partition number to consume
--topic STRING topic to listen for new messages
-t STRING, --type STRING type of the data that will be ingested
The only mandatory arguments for the Streaming Worker are the topic, the partition and the type of the pipeline (dns
, flow
, ids_event
, ids_packet
or proxy
). Streaming Worker does not create a new topic, so you have to pass an existing one. By default, it loads configuration parameters from the ~/.d-collector.json
file, but you can override it with -c FILE, --config-file FILE
option.
To start Streaming Worker:
./run.sh -t "pipeline_configuration" --topic "my_topic" -p "num_of_partition"
Some examples are given below:
./run.sh -t flow --topic SPOT-INGEST-TEST-TOPIC -p 1
./run.sh -t flow --topic SPOT-INGEST-TEST-TOPIC -p 1 2>/tmp/spark2-submit.log
./run.sh -t dns --topic SPOT-INGEST-DNS-TEST-TOPIC -p 2 -a AppIngestDNS -b 10 -g DNS-GROUP
Simple worker is responsible for listening to a partition/topic of the Kafka cluster, consuming incoming messages and storing into HDFS.
Dependencies in Python:
- avro - Data serialization system.
- kafka-python - Python client for the Apache Kafka distributed stream processing system.
Dependencies in Linux OS:
- pip - Python package manager.
Installation of the Simple Worker requires a user with sudo
privileges.
-
Install packages from the given requirements file:
sudo -H pip install -r requirements.txt
-
Install package:
sudo python setup.py install --record install-files.txt
To uninstall Simple Worker, just delete installation files:
cat install-files.txt | sudo xargs rm -rf
If you want to avoid granting sudo
permissions to the user (or keeping isolated the current installation from the rest of the system), use the virtualenv
package.
-
Install
virtualenv
package asroot
user:
sudo apt-get -y install python-virtualenv
-
Switch to your user and create an isolated virtual environment:
virtualenv --no-site-packages venv
-
Activate the virtual environment and install source distribution:
source venv/bin/activate
pip install -r requirements.txt
python setup.py install
Simple Worker uses the same configuration file as the Streaming Worker. The required sections are the "consumer" and "kerberos" sections.
Before you start using the Simple Worker, you should print the usage message and check the available options.
usage: s-worker [OPTIONS]... --topic <topic> -p <partition> -d <HDFS folder>
Simple Worker listens to a partition/topic of the Kafka cluster and stores incoming
records to the HDFS.
Optional Arguments:
-h, --help show this help message and exit
-c FILE, --config-file FILE path of configuration file
-i INTEGER, --interval INTEGER milliseconds spent waiting in poll if data is not
available in the buffer
-l STRING, --log-level STRING determine the level of the logger
--parallel-processes INTEGER number of the parallel processes
-v, --version show program's version number and exit
Required Arguments:
-d STRING, --hdfs-directory STRING destination folder in HDFS
-p INTEGER, --partition INTEGER partition number to consume
--topic STRING topic to listen for new messages
The only mandatory arguments for the Simple Worker are the destination folder in HDFS, the number of the partition and the Kafka topic. Simple Worker does not create a new topic, so you have to pass an existing one. By default, it loads configuration parameters from the ~/.worker.json
file, but you can override it with -c FILE, --config-file FILE
option.
To start Simple Worker:
s-worker --topic "my_topic" -p "num_of_partition" -d "path_to_HDFS"
Some examples are given below:
s-worker --topic SPOT-INGEST-TEST-TOPIC -p 0 -d /user/spotuser/flow/stage
s-worker --topic SPOT-INGEST-DNS-TEST-TOPIC -p 0 -d /user/spotuser/dns/stage --parallel-processes 8 -i 30
s-worker --topic SPOT-INGEST-TEST-TOPIC -p 2 -d /user/spotuser/flow/stage -l DEBUG
The work for this contribution has received funding from the European Union's Horizon 2020 research and innovation programme under grant agreement No700199.