The current implementation demonstrates the following usages in Kafka Streams along with an HTTP based interactive query service:
- Data ingestion
- Data transformation using Kafka Streams Procedure based implementation
- Implementing a custom state store (based on bloom filter)
- Managing local state with custom state store
- Interactive query service with HTTP end points
The implementation is based on the ClarkNet dataset, which has to be downloaded in a local folder.
By default the application runs through an embedded local Kafka Server. In case you want to run separate instances of Kafka and Zookeeper servers, change kafka.localserver
to false
in application.conf
.
To run the application, do the following steps.
This example application depends on kafka-streams-scala and kafka-streams-query. Ensure that you have the proper versions of these libraries in your classpath. Note that in this example Scala 2.12.4 and Kafka 1.0.0 are used.
This is only required if the setting of
kafka.localserver
isfalse
inapplication.conf
. If this is set totrue
, the application runs with an embedded local Kafka server. However, note that if you want to run the application in a distributed mode(see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server.
Start ZooKeeper and Kafka, if not already running. You can download Kafka 1.0.0 for Scala 2.12 here, then follow the Quick Start instructions for running ZooKeeper and Kafka, steps 1 and 2.
Download the ClarkNet dataset and put it in a convenient local folder.
Copy src/main/resources/application-proc.conf.template
to src/main/resources/application-proc.conf
.
Edit src/main/resources/application-proc.conf
and set the entry for directorytowatch
to match the folder name where you installed the ClarkNet dataset.
And note that you can run the application with a bundled local Kafka server by setting kafka.localserver
to true
in the application.conf
file.
This is only required if the setting of
kafka.localserver
isfalse
inapplication.conf
. If this is set totrue
, the application runs with an embedded local Kafka server and creates all necessary topics on its own. However, note that if you want to run the application in a distributed mode(see below for details of running in distributed mode), you need to run a separate Kafka and Zookeeper server.
Create the topics using the kafka-topics.sh
command that comes with the Kafka distribution. We'll refer to the directory where you installed Kafka as $KAFKA_HOME
. Run the following commands:
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logerr-proc
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic server-log-proc
Now run the application as follows:
$ sbt
> clean
> compile
> proc
This will start the application. Now you can query on the global state using curl
:
$ ## We are querying against a bloom filter based store which checks membership.
$ ## Since world.std.com is a hostkey present in the dataset, we get true here.
$ curl http://localhost:7071/weblog/access/check/world.std.com
true
$
$ ## We are querying against a bloom filter based store which checks membership.
$ ## Since world.std.co is not a valid hostkey in the dataset, we get false
$ ## here.
$ curl http://localhost:7071/weblog/access/check/world.stx.co
false
The http query layer is designed to work even when your application runs in the distributed mode. Running your Kafka Streams application in the distributed mode means that all the instances must have the same application id.
In order to run the application in distributed mode, you need to run an external Kafka and Zookeeper server. Set
kafka.localserver
tofalse
to enable this setting.
Here are the steps that you need to follow to run the application in distributed mode. We assume here you are running both the instances in the same node with different port numbers. It's fairly easy to scale this on different nodes.
$ sbt
> procPackage/universal:packageZipTarball
This creates a distribution under a folder <project home>/build
.
$ pwd
<project home>
$ cd build/proc/target/universal
$ ls
procpackage-0.0.1.tgz
## unpack the distribution
$ tar xvfz procpackage-0.0.1.tgz
$ cd procpackage-0.0.1
$ ls
bin conf lib
$ cd conf
$ ls
application.conf logback.xml
## change the above 2 files based on your requirements.
$ cd ..
$ pwd
<...>/procpackage-0.0.1
Ensure the following:
- Zookeeper and Kafka are running
- All topics mentioned above are created
- The folder mentioned in
directoryToWatch
inapplication.conf
has the data file
$ pwd
<...>/procpackage-0.0.1
$ bin/dslpackage
This starts the single instance of the application. After some time you will see data printed in the console regarding the host access information as present from the data file.
In the log file, created under <...>/procpackage-0.0.1/logs
, check if the REST service has started and note the host and port details. It should be something like localhost:7070
(the default setting in application.conf
).
If you decide to run multiple instances of the application you may choose to split the dataset into 2 parts and keep them in different folders. Also you need to copy the current distribution in some other folder and start the second instance from there, since you need to run it with changed settings in application.conf
. Say we want to copy in a folder named clarknet-2
.
$ cp <project home>/build/proc/target/universal/procpackage-0.0.1.tgz clarknet-2
$ cd clarknet-2
$ tar xvfz procpackage-0.0.1.tgz
## unpack the distribution
$ cd procpackage-0.0.1
$ ls
bin conf lib
$ cd conf
$ ls
application.conf logback.xml
## change the above 2 files based on your requirements.
$ cd ..
$ pwd
<...>/procpackage-0.0.1
The following settings need to be changed in application.conf
before you can run the second instance:
dcos.kafka.statestoredir
- This is the folder where the local state information gets persisted by Kafka streams. This has to be different for every new instance set up.dcos.kafka.loader.directorytowatch
- The data folder because we would like to ingest different data for the 2 instances.dcos.http.interface
anddcos.http.port
- The REST service endpoints. If the node is not different then it can belocalhost
for both.
$ pwd
<...>/procpackage-0.0.1
$ bin/procpackage
This will start the second instance. Check the log file to verify that the REST endpoints are properly started.
The idea of a distributed interactive query interface is to allow the user to query for all keys using any of the end points where the REST service are running. Assume that the 2 instances are running at localhost:7070
and localhost:7071
.
Here are a few examples:
## world.std.com was loaded by the first instance of the app
## Query using the end points corresponding to the first instance gives correct result
$ curl localhost:7070/weblog/access/check/world.std.com
true
## we get correct result even if we query using the end points of of the second instance
$ curl localhost:7071/weblog/access/check/world.std.com
true
## ppp19.glas.apc.org was loaded by the second instance of the app
## Query using the end points corresponding to the first instance also gives correct result
$ curl localhost:7070/weblog/access/check/ppp19.glas.apc.org
true