- sbt 1.3.8
- Scala 2.12.7
- jdk 1.8
- Flink
- Kafka
Download the 2.4.0 release and un-tar it.
> tar -xzf kafka_2.12-2.4.0.tgz
> cd kafka_2.12-2.4.0
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
Now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
Let's implement a simple word count program. Go to kafka_flink_wordcount_example
and follow the TODOs.
Create a Flink Kafka Consumer that consumes the data produced by our Kafka producer on the inputword
topic.
Push data into the Kafka topic using:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic inputword
Let's create a topics named filtered_sample
and sample_sensor
with a single partition and only one replica:
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic filtered_sample
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic sample_sensor
Create a virtual environment and install the requirements using:
pip install kafka-python
pip install numpy
Let's generate some sensor data by starting the producer:
cd data_generator
python kafka_producer.py
python kafka_consumer.py
Leave the programm running.
Go to kafka_flink_sensor_example
. Create a Flink Kafka Consumer that consumes the data produced by our Kafka producer on the sample_sensor
topic. Filter out all messages that have status: failed
. After filtering the messages, create new Flink Kafka producer that will publish the filtered stream on the filtered_sample
topic.
If you did everything correctly you should see the filtered messages being printed by our kafka python consumer.
To run the programm run:
sbt build
Hint: We need to parse a JSON string. You need to create a case class corresponding to the format of the sensor data. Then you can use the Play framework to parse the JSON strings. While parsing, filter the elements that failed to parse.