Skip to content

Kafka Connectors Shared Logic

Ahmed Elbahtemy edited this page May 29, 2019 · 24 revisions

Contents

Configuration

The configuration properties below are shared across all Kafka connectors in Brooklin.

Remember:

Property Description Default

defaultKeySerde

  • Name of SerDe (Serializer/Deserializer) to use with Kafka topic key

  • Name has to match one of the values specified in brooklin.server.serdeNames

(None)

defaultValueSerde

  • Name of SerDe (Serializer/Deserializer) to use with Kafka topic value

  • Name has to match one of the values specified in brooklin.server.serdeNames

(None)

isGroupIdHashingEnabled

  • A flag indicating whether Kafka consumer group ID should be hashed

  • If true, the consumer group ID for a datastream is set to <clusterName>.<groupIdMD5Hash>, where:

  • If false, the consumer group ID for a datastream is set to <sourceConnectionString>-to-<destConnectionString>, where:

false

commitIntervalMs

  • The time duration (in milliseconds) between successive offset commits to Kafka

  • Must be in the range [0, Long.MAX_VALUE] (inclusive)

60000

(=1 min)

commitTimeout

  • The timeout (in milliseconds) to spend waiting in calls to commitSync on the KafkaConsumer

  • Must be in the range [0, Long.MAX_VALUE] (inclusive)

30000

(=30 sec)

pollTimeoutMs

  • The timeout (in milliseconds) to spend waiting in poll calls to Kafka if no data is available

  • Specifying a value of 0 causes the KafkaConsumer to return immediately if no data is available

  • Must be in the range [0, Long.MAX_VALLUE] (inclusive)

30000

(=30 sec)

retryCount

The maximum number of poll attempts to Kafka in case of failure

5

retrySleepDurationMs

The time duration (in milliseconds) to wait between successive poll attempts to Kafka in case of failure

5000

(=5 sec)

pausePartitionOnError

A flag indicating whether to auto-pause a topic partition if dispatching its data for delivery to the destination system fails

false

pauseErrorPartitionDurationMs

The time duration (in milliseconds) to keep a topic partition paused after encountering send errors, before attempting to auto-resume

600000

(=10 min)

daemonThreadIntervalInSeconds

  • The time duration between successive attempts to restart unhealthy DatastreamTasks

  • Also used as the initial delay before checking the DatastreamTasks health status for the first time

300 sec

nonGoodStateThresholdMs

  • The maximum time duration (in milliseconds) to allow between successive polls from Kafka, before a DatastreamTask is deemed unhealthy

  • Must be in the range [60000, Long.MAX_VALUE] (inclusive)

600000

(=10 min)

processingDelayLogThreshold

The maximum time duration (in milliseconds) to allow between consuming data from Kafka and dispatching it for delivery to destination, before incrementing numProcessingOverThreshold

60000

(=1 min)

consumerFactoryClassName

KafkaConsumerFactoryImpl

enablePositionTracker

  • If set to true, will utilize an instance of the KafkaPositionTracker class per connector task to keep track of the consumer’s position when reading each topic partition.

  • If enabled, this data can be queried using the Diagnostics endpoints.

true

enableBrokerOffsetFetcher

  • Used only if enablePositionTracker is also set to true.

  • If set to true, the KafkaPositionTracker instances created for this connector will periodically query brokers whenever their offset data is stale.

true

consumer.*

Kafka consumer configuration properties

(None)

Diagnostics

The diagnostic endpoints below are shared across all Kafka connectors in Brooklin.

URL

GET /diag?q=:query&type=:component&scope=:scope&content=:componentQuery

URL Params

Required

  • query

    • Possible values are: status or allStatus

    • status retrieves data for a single Brooklin instance

    • allStatus retrieves aggregated data for all Brooklin instances in the cluster

  • type: only supported value is connector

  • scope: Name of the connector to query, as specified in brooklin.server.connectorNames

  • content

    • Represents a subquery to the component (connector) in question

    • Possible values are: datastream_state and position

      Subquery

      datastream_state

      Subquery Params

      Required

      datastreamName: Name of datastream to query

      Example

      datastream_state?datastream=:datastreamName

      Constraints

      • Datastream must exist

      Response

      Content-Type

      application/json

      Schema
      {
        "elements": [
        ],
        "paging": {
          "count": "[int]",
          "start": "[int]",
          "links": "[array]"
        }
      }

      Subquery

      position

      Response

      Content-Type

      application/json

      Schema
      • A paged list of one or more ServerComponentHealth objects.

      • The ServerComponentHealth objects appear under an elements key as in the example below

      {
        "elements": [
        ],
        "paging": {
          "count": "[int]",
          "start": "[int]",
          "links": "[array]"
        }
      }
      • If query was set to status: each ServerComponentHealth object will contain a JSON-serialized string in its status field. This string is the serialization of a list of position data tuples.

      [
        {
          "key": {},
          "value": {}
        },
      ]
      • If query was set to allStatus: each ServerComponentHealth object will contain a JSON-serialized string in its status field. This string is Map of hostnames to a list of position data tuples.

      {
        "[string]": [
          {
            "key": {},
            "value": {}
          }
        ]
      }
      • A position data tuple is a Map with two fields: key and value

Metrics

The metrics below are shared across all Kafka connectors in Brooklin.

General Metrics

General metrics prefix: <connectorName>.

Metric Name Description

numDatastreams

The number of datastreams using the connector in the entire cluster

numDatastreamTasks

The number of datastream tasks that belong to datastreams using the connector in the entire cluster

Aggregate Metrics

  • Aggregate metrics cover all datastreams in a single Brooklin instance.

  • Aggregate metrics prefix: <connectorName>.<connectorTask>.aggregate.

Metric Name Description

clientPollOverTimeout

The number of times polling Kafka consumer exceeds pollTimeoutMs in calls to KafkaConsumer::poll(long) calls, by more than 1 sec

errorRate

The rate of errors encountered when data is dispatched for delivery to the destination system

eventsByteProcessedRate

The rate of bytes processed and dispatched for delivery to destination

eventsProcessedRate

The rate of Kafka record consumption

numAutoPausedPartitionsAwaitingDestTopic

The number of auto-paused topic partitions awaiting destination topic creation

numAutoPausedPartitionsOnError

The number of auto-paused topic partitions due to errors encountered during dispatch for delivery

numAutoPausedPartitionsOnInFlightMessages

The number of auto-paused topic partitions due to exceeding their maximum in-flight messages thresholds

numConfigPausedPartitions

The number of topic partitions paused manually

numPartitions

The number of Kafka topic partitions

numProcessingOverThreshold

The number of times dispatching records to destination exceeds processingDelayLogThreshold

numTopics

The number of Kafka topics

pollIntervalOverSessionTimeout

The number of polls exceeding the configured maximum session timeout for KafkaConsumer

rebalanceRate

The rate of rebalances seen by the Kafka consumer

stuckPartitions

The number of stuck topic partitions

Datastream-Specific Metrics

  • Datastream-specific metrics prefix: <connectorName>.<connectorTask>.<datastreamName>.

Metric Name Description

errorRate

The rate of errors encountered when data is dispatched for delivery to the destination system

eventCountsPerPoll

The distribution (histogram) of the number of records retrieved from Kafka in every poll

eventsByteProcessedRate

The rate of bytes processed and dispatched for delivery to destination

eventsProcessedRate

The rate of Kafka record consumption

numAutoPausedPartitionsAwaitingDestTopic

The number of auto-paused topic partitions awaiting destination topic creation

numAutoPausedPartitionsOnError

The number of auto-paused topic partitions due to errors encountered during dispatch for delivery

numAutoPausedPartitionsOnInFlightMessages

The number of auto-paused topic partitions due to exceeding their maximum in-flight messages thresholds

numConfigPausedPartitions

The number of topic partitions paused manually

numPartitions

The number of Kafka topic partitions

numPolls

The rate of polls performed using the Kafka consumer

numProcessingOverThreshold

The number of times dispatching records to destination exceeds processingDelayLogThreshold

numTopics

The number of Kafka topics

pollIntervalOverSessionTimeout

The number of polls exceeding the configured maximum session timeout for KafkaConsumer

rebalanceRate

The rate of rebalances seen by the Kafka consumer

stuckPartitions

The number of stuck topic partitions

timeSinceLastEventReceivedMs

The time duration (in milliseconds) since the last non-empty ConsumerRecord was fetched using the Kafka consumer

REST API Data Models

KafkaDatastreamStatesResponse

Field Name Type Description

datastream

string

Datastream name

assignedTopicPartitions

array

Assigned topic partitions

autoPausedPartitions

map

Associates each auto-paused topic partition with metadata about the paused partitions

manualPausedPartitions

map

Associates each topic with a list of manually paused partitions

inFlightMessageCounts

map

Associates each topic partition with the number of in-flight messages

KafkaPositionKey

Field Name Type Description

topic

String

The Kafka topic we are consuming from

partition

int

The Kafka partition we are consuming from

datastreamTaskPrefix

String

The task prefix of the DatastreamTask the connector has been assigned (which is causing this topic partition to be consumed)

datastreamTaskName

String

The task name of the DatastreamTask the connector has been assigned (which is causing this topic partition to be consumed)

connectorTaskStartTime

Long

The time (in milliseconds since the Unix epoch) at which consumption of this topic partition started.

KafkaPositionValue

Field Name Type Description

brokerOffset

Long

The latest offset (the offset of the last produced message) on the Kafka broker for this topic partition. If the consumer is also at this position, then it is completely caught up and has no more messages to process.

consumerOffset

Long

The current offset that the Kafka consumer has for this topic partition. When the consumer receives new messages for this topic partition, the received messages will have an offset greater than or equal to this value.

assignmentTime

Long

The time (in milliseconds since the Unix epoch) that we were assigned this topic partition.

lastRecordReceivedTimestamp

Long

The timestamp (in milliseconds since the Unix epoch) of the last record that we received from this topic partition.

lastBrokerQueriedTime

Long

The last time (in milliseconds since the Unix epoch) that we queried a broker for its latest offset data — either by reading metrics data provided by the consumer, or by querying the broker for its latest offset data directly.

lastNonEmptyPollTime

Long

The last time that the consumer received messages for this topic partition.