-
Notifications
You must be signed in to change notification settings - Fork 750
Kafka HDFS Ingestion
Gobblin provides two abstract classes, KafkaSource
and KafkaExtractor
. KafkaSource
creates a workunit for each Kafka topic partition to be pulled, then merges and groups the workunits based on the desired number of workunits specified by property mr.job.max.mappers
(this property is used in both standalone and MR mode). KafkaExtractor
extracts the partitions assigned to a workunit, based on the specified low watermark and high watermark.
To use them in a Kafka-HDFS ingestion job, one should subclass KafkaExtractor
and implement method decodeRecord(MessageAndOffset)
, which takes a MessageAndOffset
object pulled from the Kafka broker and decodes it into a desired object. One should also subclass KafkaSource
and implement getExtractor(WorkUnitState)
which should return an object of the Extractor class.
Gobblin currently provides two concrete implementations: KafkaSimpleSource
/KafkaSimpleExtractor
, and KafkaAvroSource
/KafkaAvroExtractor
.
KafkaSimpleExtractor
simply returns the payload of the MessageAndOffset
object as a byte array. A job that uses KafkaSimpleExtractor
may use a Converter
to convert the byte array to whatever format desired. For example, if the desired output format is JSON, one may implement an ByteArrayToJsonConverter
to convert the byte array to JSON. Alternatively one may implement a KafkaJsonExtractor
, which extends KafkaExtractor
and convert the MessageAndOffset
object into a JSON object in the decodeRecord
method. Both approaches should work equally well.
KafkaAvroExtractor
decodes the payload of the MessageAndOffset
object into an Avro GenericRecord
object. It requires that the byte 0 of the payload be 0, bytes 1-16 of the payload be a 16-byte schema ID, and the remaining bytes be the encoded Avro record. It also requires the existence of a schema registry that returns the Avro schema given the schema ID, which is used to decode the byte array. Thus this class is mainly applicable to LinkedIn's internal Kafka clusters.
For Writer and Publisher, one may use the AvroHdfsDataWriter
and the BaseDataPublisher
, similar as the Wikipedia example job. They will publish the records pulled in each task to a different folder as Avro files. Gobblin also has an AvroHdfsTimePartitionedWriter
and a TimePartitionedDataPublisher
. They publish records based on timestamp of the records, which means records pulled in the same task may be published to different folders, and records pulled in different tasks may be published to the same folder.
These are some of the job config properties used by KafkaSource
and KafkaExtractor
.
Property Name | Semantics |
---|---|
topic.whitelist (case insensitive regex) |
Kafka topics to be pulled. Default value = .* |
topic.blacklist (case insensitive regex) |
Kafka topics not to be pulled. Default value = empty |
kafka.brokers |
Comma separated Kafka brokers to ingest data from. |
mr.job.max.mappers |
Number of tasks to launch. In MR mode, this will be the number of mappers launched. If the number of topic partitions to be pulled is larger than the number of tasks, KafkaSource will assign partitions to tasks in a balanced manner. |
bootstrap.with.offset |
For new topics / partitions, this property controls whether they start at the earliest offset or the latest offset. Possible values: earliest, latest, skip. Default: latest |
reset.on.offset.out.of.range |
This property controls what to do if a partition's previously persisted offset is out of the range of the currently available offsets. Possible values: earliest (always move to earliest available offset), latest (always move to latest available offset), nearest (move to earliest if the previously persisted offset is smaller than the earliest offset, otherwise move to latest), skip (skip this partition). Default: nearest |
topics.move.to.latest.offset (case insensitive, NO regex) |
Topics in this list will always start from the latest offset (i.e., no records will be pulled). To move all topics to the latest offset, use "all". This property should rarely, if ever, be used. |
It is also possible to set a time limit for each task. For example, to set the time limit to 15 minutes, set the following properties:
extract.limit.enabled=true
extract.limit.type=time #(other possible values: rate, count, pool)
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes
Below is a sample job config file.
job.name=PullFromKafka
job.group=Kafka
job.description=Kafka Extractor for Gobblin
job.lock.enabled=false
source.class=gobblin.source.extractor.extract.kafka.KafkaAvroSource
extract.namespace=gobblin.extract.kafka
writer.destination.type=HDFS
writer.output.format=AVRO
writer.fs.uri=file://localhost/
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
topic.whitelist=CaptchaEvent
bootstrap.with.offset=earliest
kafka.brokers=#Kafka brokers URI
kafka.schema.registry.url=#schema registry URI
writer.partition.level=hourly
writer.partition.pattern=YYYY/MM/dd/HH
writer.builder.class=gobblin.writer.AvroTimePartitionedWriterBuilder
writer.file.path.type=tablename
writer.partition.column.name=header.time
mr.job.max.mappers=20
extract.limit.enabled=true
extract.limit.type=time #(other possible values: rate, count, pool)
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes
Launching the job in standalone mode involves similar steps as the Wikipedia example job. The job can also be launched in MR mode. See deployment for more details.
- Home
- [Getting Started](Getting Started)
- Architecture
- User Guide
- Working with Job Configuration Files
- [Deployment](Gobblin Deployment)
- Gobblin on Yarn
- Compaction
- [State Management and Watermarks] (State-Management-and-Watermarks)
- Working with the ForkOperator
- [Configuration Glossary](Configuration Properties Glossary)
- [Partitioned Writers](Partitioned Writers)
- Monitoring
- Schedulers
- [Job Execution History Store](Job Execution History Store)
- Gobblin Build Options
- Troubleshooting
- [FAQs] (FAQs)
- Case Studies
- Gobblin Metrics
- [Quick Start](Gobblin Metrics)
- [Existing Reporters](Existing Reporters)
- [Metrics for Gobblin ETL](Metrics for Gobblin ETL)
- [Gobblin Metrics Architecture](Gobblin Metrics Architecture)
- [Implementing New Reporters](Implementing New Reporters)
- [Gobblin Metrics Performance](Gobblin Metrics Performance)
- Developer Guide
- [Customization: New Source](Customization for New Source)
- [Customization: Converter/Operator](Customization for Converter and Operator)
- Code Style Guide
- IDE setup
- Monitoring Design
- Project
- [Feature List](Feature List)
- Contributors/Team
- [Talks/Tech Blogs](Talks and Tech Blogs)
- News/Roadmap
- Posts
- Miscellaneous