Skip to content

Get Data from Kafka to Flink

grittaweisheit edited this page Nov 18, 2019 · 7 revisions

Introduction

This is following the entry "Get Data from Database to Kafka".

We use the same example and constants.

You should already have the trainData.avsc, TrainDataProducer.java (and a way to run its runProducer() method), TrainDataSerializer and TrainData.java files.

Dependencies

You need the following additional maven dependencies in your pom.xml.

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
	<scope>compile</scope>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.avro</groupId>
	<artifactId>avro</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-core</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>commons-io</groupId>
	<artifactId>commons-io</artifactId>
	<version>2.6</version>
</dependency>

It is possible you need to remove the compile tag from some Flink dependencies because some classes can not be found.

Get Kafka topic as data source

Let's take a look at the main method doing the trick.

public static void main(String[] args) throws Exception {
	// set up the streaming execution environment
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	Properties properties = new Properties();
        // set properties of the consumer
	properties.setProperty("bootstrap.servers", KafkaConstants.KAFKA_BROKERS);
	properties.setProperty("group.id", KafkaConstants.GROUP_ID_CONFIG);
        // create consumer that reads TrainData objects from topic "test"
	FlinkKafkaConsumer011<TrainData> consumer;
	consumer = new FlinkKafkaConsumer011<TrainData>(KafkaConstants.TOPIC_NAME, AvroDeserializationSchema.forSpecific(TrainData.class), properties);
        // add consumer as source for data stream
	DataStream<TrainData> inputStream = env.addSource(consumer);

        // print stream to console
	inputStream.print();

	env.execute("Flink Streaming Java API Skeleton");
}