diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 8a967289a0e..56227314c25 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -1086,13 +1086,6 @@ public static AutoCreateMode parse(String value, String defaultValue) { public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(PostgresSourceInfoStructMaker.class.getName()); - public static final Field TASK_ID = Field.create("task.id") - .withDisplayName("ID of the connector task") - .withType(Type.INT) - .withDefault(0) - .withImportance(Importance.LOW) - .withDescription("Internal use only"); - public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns") .withDisplayName("Comma separated primary key fields") .withType(Type.STRING) @@ -1245,10 +1238,6 @@ public boolean isFlushLsnOnSource() { return flushLsnOnSource; } - public int taskId() { - return getConfig().getInteger(TASK_ID); - } - public String primaryKeyHashColumns() { return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 18850d0ca58..4d028fa65bc 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.stream.Collectors; +import io.debezium.connector.postgresql.metrics.YugabyteDBMetricsFactory; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.source.SourceRecord; @@ -123,9 +124,10 @@ public ChangeEventSourceCoordinator st final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry); schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter); - this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId()); + this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.getTaskId()); + final PostgresPartition.Provider partitionProvider = new PostgresPartition.Provider(connectorConfig, config); final Offsets previousOffsets = getPreviousOffsets( - new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig)); + partitionProvider, new PostgresOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset(); @@ -255,7 +257,7 @@ public ChangeEventSourceCoordinator st replicationConnection, slotCreatedInfo, slotInfo), - new DefaultChangeEventSourceMetricsFactory<>(), + new YugabyteDBMetricsFactory(partitionProvider.getPartitions()), dispatcher, schema, snapshotter, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index d1ee3609de1..9b9fd39b238 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -21,9 +21,9 @@ public class PostgresPartition extends AbstractPartition implements Partition { private static final String SERVER_PARTITION_KEY = "server"; private final String serverName; - private final int taskId; + private final String taskId; - public PostgresPartition(String serverName, String databaseName, int taskId) { + public PostgresPartition(String serverName, String databaseName, String taskId) { super(databaseName); this.serverName = serverName; this.taskId = taskId; @@ -57,7 +57,7 @@ public String toString() { } public String getPartitionIdentificationKey() { - return String.format("%s_%d", serverName, taskId); + return String.format("%s_%s", serverName, taskId); } static class Provider implements Partition.Provider { @@ -73,7 +73,7 @@ static class Provider implements Partition.Provider { public Set getPartitions() { return Collections.singleton(new PostgresPartition( connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), - connectorConfig.taskId())); + connectorConfig.getTaskId())); } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index 721c408681d..82728f09d6f 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -52,8 +52,8 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch this.schema = schema; } - protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy topicNamingStrategy, int taskId) { - super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet); + protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy topicNamingStrategy, String taskId) { + super(config.getContextName(), config.getLogicalName(), taskId, config.getCustomMetricTags(), Collections::emptySet); this.config = config; if (config.xminFetchInterval().toMillis() > 0) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index 0c875c11161..35f26721299 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -112,7 +112,7 @@ protected List> getConfigForParallelSnapshotConsumption(int for (int i = 0; i < maxTasks; ++i) { Map taskProps = new HashMap<>(this.props); - taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i)); + taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i)); long lowerBound = i * rangeSize; long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBPartitionMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBPartitionMetrics.java new file mode 100644 index 00000000000..6cc548a9d06 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBPartitionMetrics.java @@ -0,0 +1,110 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresTaskContext; +import io.debezium.data.Envelope; +import io.debezium.metrics.Metrics; +import io.debezium.pipeline.ConnectorEvent; +import io.debezium.pipeline.meters.CommonEventMeter; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.spi.schema.DataCollectionId; +import org.apache.kafka.connect.data.Struct; + +import java.util.Map; + +abstract class AbstractYugabyteDBPartitionMetrics extends YugabyteDBMetrics implements YugabyteDBPartitionMetricsMXBean { + private final CommonEventMeter commonEventMeter; + + AbstractYugabyteDBPartitionMetrics(CdcSourceTaskContext taskContext, Map tags, + EventMetadataProvider metadataProvider) { + super(taskContext, tags); + this.commonEventMeter = new CommonEventMeter(taskContext.getClock(), metadataProvider); + } + + @Override + public String getLastEvent() { + return commonEventMeter.getLastEvent(); + } + + @Override + public long getMilliSecondsSinceLastEvent() { + return commonEventMeter.getMilliSecondsSinceLastEvent(); + } + + @Override + public long getTotalNumberOfEventsSeen() { + return commonEventMeter.getTotalNumberOfEventsSeen(); + } + + @Override + public long getTotalNumberOfCreateEventsSeen() { + return commonEventMeter.getTotalNumberOfCreateEventsSeen(); + } + + @Override + public long getTotalNumberOfUpdateEventsSeen() { + return commonEventMeter.getTotalNumberOfUpdateEventsSeen(); + } + + @Override + public long getTotalNumberOfDeleteEventsSeen() { + return commonEventMeter.getTotalNumberOfDeleteEventsSeen(); + } + + @Override + public long getNumberOfEventsFiltered() { + return commonEventMeter.getNumberOfEventsFiltered(); + } + + @Override + public long getNumberOfErroneousEvents() { + return commonEventMeter.getNumberOfErroneousEvents(); + } + + /** + * Invoked if an event is processed for a captured table. + */ + void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) { + commonEventMeter.onEvent(source, offset, key, value, operation); + } + + /** + * Invoked for events pertaining to non-captured tables. + */ + void onFilteredEvent(String event) { + commonEventMeter.onFilteredEvent(); + } + + /** + * Invoked for events pertaining to non-captured tables. + */ + void onFilteredEvent(String event, Envelope.Operation operation) { + commonEventMeter.onFilteredEvent(operation); + } + + /** + * Invoked for events that cannot be processed. + */ + void onErroneousEvent(String event) { + commonEventMeter.onErroneousEvent(); + } + + /** + * Invoked for events that cannot be processed. + */ + void onErroneousEvent(String event, Envelope.Operation operation) { + commonEventMeter.onErroneousEvent(operation); + } + + /** + * Invoked for events that represent a connector event. + */ + void onConnectorEvent(ConnectorEvent event) { + } + + @Override + public void reset() { + commonEventMeter.reset(); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBTaskMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBTaskMetrics.java new file mode 100644 index 00000000000..8cf03d36e52 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBTaskMetrics.java @@ -0,0 +1,118 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresPartition; +import io.debezium.data.Envelope; +import io.debezium.metrics.Metrics; +import io.debezium.pipeline.ConnectorEvent; +import io.debezium.pipeline.metrics.ChangeEventSourceMetrics; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.spi.schema.DataCollectionId; +import io.debezium.util.Collect; +import org.apache.kafka.connect.data.Struct; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; + +abstract class AbstractYugabyteDBTaskMetrics extends YugabyteDBMetrics + implements ChangeEventSourceMetrics, YugabyteDBTaskMetricsMXBean { + + private final ChangeEventQueueMetrics changeEventQueueMetrics; + private final Map beans = new HashMap<>(); + + AbstractYugabyteDBTaskMetrics(CdcSourceTaskContext taskContext, + String contextName, + ChangeEventQueueMetrics changeEventQueueMetrics, + Collection partitions, + Function beanFactory) { + super(taskContext, Collect.linkMapOf( + "server", taskContext.getConnectorName(), + "task", taskContext.getTaskId(), + "context", contextName)); + this.changeEventQueueMetrics = changeEventQueueMetrics; + + for (PostgresPartition partition : partitions) { + beans.put(partition, beanFactory.apply(partition)); + } + } + + @Override + public synchronized void register() { + super.register(); + beans.values().forEach(YugabyteDBMetrics::register); + } + + @Override + public synchronized void unregister() { + beans.values().forEach(YugabyteDBMetrics::unregister); + super.unregister(); + } + + @Override + public void reset() { + beans.values().forEach(B::reset); + } + + @Override + public void onEvent(PostgresPartition partition, DataCollectionId source, OffsetContext offset, Object key, + Struct value, Envelope.Operation operation) { + onPartitionEvent(partition, bean -> bean.onEvent(source, offset, key, value, operation)); + } + + @Override + public void onFilteredEvent(PostgresPartition partition, String event) { + onPartitionEvent(partition, bean -> bean.onFilteredEvent(event)); + } + + @Override + public void onFilteredEvent(PostgresPartition partition, String event, Envelope.Operation operation) { + onPartitionEvent(partition, bean -> bean.onFilteredEvent(event, operation)); + } + + @Override + public void onErroneousEvent(PostgresPartition partition, String event) { + onPartitionEvent(partition, bean -> bean.onErroneousEvent(event)); + } + + @Override + public void onErroneousEvent(PostgresPartition partition, String event, Envelope.Operation operation) { + onPartitionEvent(partition, bean -> bean.onErroneousEvent(event, operation)); + } + + @Override + public void onConnectorEvent(PostgresPartition partition, ConnectorEvent event) { + onPartitionEvent(partition, bean -> bean.onConnectorEvent(event)); + } + + @Override + public int getQueueTotalCapacity() { + return changeEventQueueMetrics.totalCapacity(); + } + + @Override + public int getQueueRemainingCapacity() { + return changeEventQueueMetrics.remainingCapacity(); + } + + @Override + public long getMaxQueueSizeInBytes() { + return changeEventQueueMetrics.maxQueueSizeInBytes(); + } + + @Override + public long getCurrentQueueSizeInBytes() { + return changeEventQueueMetrics.currentQueueSizeInBytes(); + } + + protected void onPartitionEvent(PostgresPartition partition, Consumer handler) { + B bean = beans.get(partition); + if (bean == null) { + throw new IllegalArgumentException("MBean for partition " + partition + " are not registered"); + } + handler.accept(bean); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBMetrics.java new file mode 100644 index 00000000000..e4454abf59f --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBMetrics.java @@ -0,0 +1,92 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.metrics.Metrics; +import io.debezium.pipeline.JmxUtils; +import io.debezium.util.Collect; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Metrics class to instantiate metrics for the {@link io.debezium.connector.postgresql.YugabyteDBConnector} + */ +public class YugabyteDBMetrics { + private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBMetrics.class); + private final ObjectName name; + private volatile boolean registered = false; + + protected YugabyteDBMetrics(CdcSourceTaskContext taskContext, Map tags) { + tags.putAll(taskContext.getCustomMetricTags()); + this.name = metricName(taskContext.getConnectorType(), tags); + } + + protected YugabyteDBMetrics(CommonConnectorConfig connectorConfig, String contextName, boolean multiPartitionMode) { + String connectorType = connectorConfig.getContextName(); + String connectorName = connectorConfig.getLogicalName(); + if (multiPartitionMode) { + Map tags = Collect.linkMapOf( + "server", connectorName, + "task", connectorConfig.getTaskId(), + "context", contextName); + tags.putAll(connectorConfig.getCustomMetricTags()); + this.name = metricName(connectorType, tags); + } + else { + this.name = metricName(connectorType, connectorName, contextName, connectorConfig.getCustomMetricTags()); + } + } + + /** + * Registers a metrics MBean into the platform MBean server. + * The method is intentionally synchronized to prevent preemption between registration and unregistration. + */ + public synchronized void register() { + + JmxUtils.registerMXBean(name, this); + // If the old metrics MBean is present then the connector will try to unregister it + // upon shutdown. + registered = true; + } + + /** + * Unregisters a metrics MBean from the platform MBean server. + * The method is intentionally synchronized to prevent preemption between registration and unregistration. + */ + public synchronized void unregister() { + if (this.name != null && registered) { + JmxUtils.unregisterMXBean(name); + registered = false; + } + } + + protected ObjectName metricName(String connectorType, String connectorName, String contextName, Map customTags) { + Map tags = Collect.linkMapOf("context", contextName, "server", connectorName); + tags.putAll(customTags); + return metricName(connectorType, tags); + } + + /** + * Create a JMX metric name for the given metric. + * @return the JMX metric name + */ + protected ObjectName metricName(String connectorType, Map tags) { + final String metricName = "debezium." + connectorType.toLowerCase() + ":type=connector-metrics," + + tags.entrySet().stream() + .map(e -> e.getKey() + "=" + Sanitizer.jmxSanitize(e.getValue())) + .collect(Collectors.joining(",")); + try { + return new ObjectName(metricName); + } + catch (MalformedObjectNameException e) { + throw new ConnectException("Invalid metric name '" + metricName + "'"); + } + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBMetricsFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBMetricsFactory.java new file mode 100644 index 00000000000..8d3173d580a --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBMetricsFactory.java @@ -0,0 +1,34 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresPartition; +import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; +import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; +import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory; +import io.debezium.pipeline.source.spi.EventMetadataProvider; + +import java.util.Collection; + +public class YugabyteDBMetricsFactory implements ChangeEventSourceMetricsFactory { + + private final Collection partitions; + + public YugabyteDBMetricsFactory(Collection partitions) { + this.partitions = partitions; + } + + @Override + public SnapshotChangeEventSourceMetrics getSnapshotMetrics(T taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider) { + return new YugabyteDBSnapshotTaskMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider, partitions); + } + + @Override + public StreamingChangeEventSourceMetrics getStreamingMetrics(T taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider eventMetadataProvider) { + return new YugabyteDBStreamingTaskMetrics(taskContext, changeEventQueueMetrics, eventMetadataProvider, partitions); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBPartitionMetricsMXBean.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBPartitionMetricsMXBean.java new file mode 100644 index 00000000000..76367abf3a3 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBPartitionMetricsMXBean.java @@ -0,0 +1,8 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.pipeline.metrics.traits.CommonEventMetricsMXBean; +import io.debezium.pipeline.metrics.traits.SchemaMetricsMXBean; + +public interface YugabyteDBPartitionMetricsMXBean extends CommonEventMetricsMXBean, SchemaMetricsMXBean { + void reset(); +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotPartitionMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotPartitionMetrics.java new file mode 100644 index 00000000000..b25598c4bac --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotPartitionMetrics.java @@ -0,0 +1,142 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresTaskContext; +import io.debezium.pipeline.meters.SnapshotMeter; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.relational.TableId; +import io.debezium.spi.schema.DataCollectionId; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +class YugabyteDBSnapshotPartitionMetrics extends AbstractYugabyteDBPartitionMetrics + implements YugabyteDBSnapshotPartitionMetricsMXBean { + private final SnapshotMeter snapshotMeter; + + YugabyteDBSnapshotPartitionMetrics(CdcSourceTaskContext taskContext, Map tags, + EventMetadataProvider metadataProvider) { + super(taskContext, tags, metadataProvider); + snapshotMeter = new SnapshotMeter(taskContext.getClock()); + } + + @Override + public int getTotalTableCount() { + return snapshotMeter.getTotalTableCount(); + } + + @Override + public int getRemainingTableCount() { + return snapshotMeter.getRemainingTableCount(); + } + + @Override + public boolean getSnapshotRunning() { + return snapshotMeter.getSnapshotRunning(); + } + + @Override + public boolean getSnapshotPaused() { + return snapshotMeter.getSnapshotPaused(); + } + + @Override + public boolean getSnapshotCompleted() { + return snapshotMeter.getSnapshotCompleted(); + } + + @Override + public boolean getSnapshotAborted() { + return snapshotMeter.getSnapshotAborted(); + } + + @Override + public long getSnapshotDurationInSeconds() { + return snapshotMeter.getSnapshotDurationInSeconds(); + } + + @Override + public long getSnapshotPausedDurationInSeconds() { + return snapshotMeter.getSnapshotPausedDurationInSeconds(); + } + + @Override + public String[] getCapturedTables() { + return snapshotMeter.getCapturedTables(); + } + + void monitoredDataCollectionsDetermined(Iterable dataCollectionIds) { + snapshotMeter.monitoredDataCollectionsDetermined(dataCollectionIds); + } + + void dataCollectionSnapshotCompleted(DataCollectionId dataCollectionId, long numRows) { + snapshotMeter.dataCollectionSnapshotCompleted(dataCollectionId, numRows); + } + + void snapshotStarted() { + snapshotMeter.snapshotStarted(); + } + + void snapshotPaused() { + snapshotMeter.snapshotPaused(); + } + + void snapshotResumed() { + snapshotMeter.snapshotResumed(); + } + + void snapshotCompleted() { + snapshotMeter.snapshotCompleted(); + } + + void snapshotAborted() { + snapshotMeter.snapshotAborted(); + } + + void rowsScanned(TableId tableId, long numRows) { + snapshotMeter.rowsScanned(tableId, numRows); + } + + @Override + public ConcurrentMap getRowsScanned() { + return snapshotMeter.getRowsScanned(); + } + + void currentChunk(String chunkId, Object[] chunkFrom, Object[] chunkTo) { + snapshotMeter.currentChunk(chunkId, chunkFrom, chunkTo); + } + + void currentChunk(String chunkId, Object[] chunkFrom, Object[] chunkTo, Object tableTo[]) { + snapshotMeter.currentChunk(chunkId, chunkFrom, chunkTo, tableTo); + } + + @Override + public String getChunkId() { + return snapshotMeter.getChunkId(); + } + + @Override + public String getChunkFrom() { + return snapshotMeter.getChunkFrom(); + } + + @Override + public String getChunkTo() { + return snapshotMeter.getChunkTo(); + } + + @Override + public String getTableFrom() { + return snapshotMeter.getTableFrom(); + } + + @Override + public String getTableTo() { + return snapshotMeter.getTableTo(); + } + + @Override + public void reset() { + snapshotMeter.reset(); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotPartitionMetricsMXBean.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotPartitionMetricsMXBean.java new file mode 100644 index 00000000000..ff5da4a7471 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotPartitionMetricsMXBean.java @@ -0,0 +1,7 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.pipeline.metrics.traits.SnapshotMetricsMXBean; + +public interface YugabyteDBSnapshotPartitionMetricsMXBean extends SnapshotMetricsMXBean, + YugabyteDBPartitionMetricsMXBean { +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotTaskMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotTaskMetrics.java new file mode 100644 index 00000000000..9fb9399aacd --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBSnapshotTaskMetrics.java @@ -0,0 +1,81 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresPartition; +import io.debezium.connector.postgresql.PostgresTaskContext; +import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.relational.TableId; +import io.debezium.spi.schema.DataCollectionId; +import io.debezium.util.Collect; + +import java.util.Collection; + +class YugabyteDBSnapshotTaskMetrics extends AbstractYugabyteDBTaskMetrics + implements SnapshotChangeEventSourceMetrics { + + YugabyteDBSnapshotTaskMetrics(CdcSourceTaskContext taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider metadataProvider, + Collection partitions) { + super(taskContext, "snapshot", changeEventQueueMetrics, partitions, + (PostgresPartition partition) -> new YugabyteDBSnapshotPartitionMetrics(taskContext, + Collect.linkMapOf( + "server", taskContext.getConnectorName(), + "task", taskContext.getTaskId(), + "context", "snapshot", + "partition", partition.getPartitionIdentificationKey()), + metadataProvider)); + } + + @Override + public void snapshotStarted(PostgresPartition partition) { + onPartitionEvent(partition, YugabyteDBSnapshotPartitionMetrics::snapshotStarted); + } + + @Override + public void snapshotPaused(PostgresPartition partition) { + onPartitionEvent(partition, YugabyteDBSnapshotPartitionMetrics::snapshotPaused); + } + + @Override + public void snapshotResumed(PostgresPartition partition) { + onPartitionEvent(partition, YugabyteDBSnapshotPartitionMetrics::snapshotResumed); + } + + @Override + public void monitoredDataCollectionsDetermined(PostgresPartition partition, Iterable dataCollectionIds) { + onPartitionEvent(partition, bean -> bean.monitoredDataCollectionsDetermined(dataCollectionIds)); + } + + @Override + public void snapshotCompleted(PostgresPartition partition) { + onPartitionEvent(partition, YugabyteDBSnapshotPartitionMetrics::snapshotCompleted); + } + + @Override + public void snapshotAborted(PostgresPartition partition) { + onPartitionEvent(partition, YugabyteDBSnapshotPartitionMetrics::snapshotAborted); + } + + @Override + public void dataCollectionSnapshotCompleted(PostgresPartition partition, DataCollectionId dataCollectionId, long numRows) { + onPartitionEvent(partition, bean -> bean.dataCollectionSnapshotCompleted(dataCollectionId, numRows)); + } + + @Override + public void rowsScanned(PostgresPartition partition, TableId tableId, long numRows) { + onPartitionEvent(partition, bean -> bean.rowsScanned(tableId, numRows)); + } + + @Override + public void currentChunk(PostgresPartition partition, String chunkId, Object[] chunkFrom, Object[] chunkTo) { + onPartitionEvent(partition, bean -> bean.currentChunk(chunkId, chunkFrom, chunkTo)); + } + + @Override + public void currentChunk(PostgresPartition partition, String chunkId, Object[] chunkFrom, Object[] chunkTo, Object[] tableTo) { + onPartitionEvent(partition, bean -> bean.currentChunk(chunkId, chunkFrom, chunkTo, tableTo)); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingPartitionMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingPartitionMetrics.java new file mode 100644 index 00000000000..ac304feca3b --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingPartitionMetrics.java @@ -0,0 +1,62 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresTaskContext; +import io.debezium.data.Envelope; +import io.debezium.pipeline.meters.StreamingMeter; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.spi.schema.DataCollectionId; +import org.apache.kafka.connect.data.Struct; + +import java.util.Map; + +class YugabyteDBStreamingPartitionMetrics extends AbstractYugabyteDBPartitionMetrics + implements YugabyteDBStreamingPartitionMetricsMXBean { + + private final StreamingMeter streamingMeter; + + YugabyteDBStreamingPartitionMetrics(CdcSourceTaskContext taskContext, + Map tags, + EventMetadataProvider metadataProvider) { + super(taskContext, tags, metadataProvider); + streamingMeter = new StreamingMeter(taskContext, metadataProvider); + } + + @Override + void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value, Envelope.Operation operation) { + super.onEvent(source, offset, key, value, operation); + streamingMeter.onEvent(source, offset, key, value); + } + + @Override + public String[] getCapturedTables() { + return streamingMeter.getCapturedTables(); + } + + @Override + public long getMilliSecondsBehindSource() { + return streamingMeter.getMilliSecondsBehindSource(); + } + + @Override + public long getNumberOfCommittedTransactions() { + return streamingMeter.getNumberOfCommittedTransactions(); + } + + @Override + public Map getSourceEventPosition() { + return streamingMeter.getSourceEventPosition(); + } + + @Override + public String getLastTransactionId() { + return streamingMeter.getLastTransactionId(); + } + + @Override + public void reset() { + super.reset(); + streamingMeter.reset(); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingPartitionMetricsMXBean.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingPartitionMetricsMXBean.java new file mode 100644 index 00000000000..c0b242a0e0c --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingPartitionMetricsMXBean.java @@ -0,0 +1,7 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.pipeline.metrics.traits.StreamingMetricsMXBean; + +public interface YugabyteDBStreamingPartitionMetricsMXBean extends StreamingMetricsMXBean, + YugabyteDBPartitionMetricsMXBean { +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingTaskMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingTaskMetrics.java new file mode 100644 index 00000000000..e9344454117 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingTaskMetrics.java @@ -0,0 +1,43 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.connector.base.ChangeEventQueueMetrics; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresPartition; +import io.debezium.connector.postgresql.PostgresTaskContext; +import io.debezium.pipeline.meters.ConnectionMeter; +import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.util.Collect; + +import java.util.Collection; + +public class YugabyteDBStreamingTaskMetrics extends AbstractYugabyteDBTaskMetrics + implements StreamingChangeEventSourceMetrics, YugabyteDBStreamingTaskMetricsMXBean { + + private final ConnectionMeter connectionMeter; + + YugabyteDBStreamingTaskMetrics(CdcSourceTaskContext taskContext, + ChangeEventQueueMetrics changeEventQueueMetrics, + EventMetadataProvider metadataProvider, + Collection partitions) { + super(taskContext, "streaming", changeEventQueueMetrics, partitions, + (PostgresPartition partition) -> new YugabyteDBStreamingPartitionMetrics(taskContext, + Collect.linkMapOf( + "server", taskContext.getConnectorName(), + "task", taskContext.getTaskId(), + "context", "streaming", + "partition", partition.getPartitionIdentificationKey()), + metadataProvider)); + connectionMeter = new ConnectionMeter(); + } + + @Override + public boolean isConnected() { + return connectionMeter.isConnected(); + } + + @Override + public void connected(boolean connected) { + connectionMeter.connected(connected); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingTaskMetricsMXBean.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingTaskMetricsMXBean.java new file mode 100644 index 00000000000..b83eb7f31f9 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBStreamingTaskMetricsMXBean.java @@ -0,0 +1,6 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.pipeline.metrics.traits.ConnectionMetricsMXBean; + +public interface YugabyteDBStreamingTaskMetricsMXBean extends ConnectionMetricsMXBean, YugabyteDBTaskMetricsMXBean { +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBTaskMetricsMXBean.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBTaskMetricsMXBean.java new file mode 100644 index 00000000000..51653077f16 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/YugabyteDBTaskMetricsMXBean.java @@ -0,0 +1,7 @@ +package io.debezium.connector.postgresql.metrics; + +import io.debezium.pipeline.metrics.traits.QueueMetricsMXBean; + +public interface YugabyteDBTaskMetricsMXBean extends QueueMetricsMXBean { + void reset(); +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java index e4fdabf23a4..57b197e75c9 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java @@ -20,7 +20,7 @@ public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotSta super.init(config, sourceInfo, slotState); this.sourceInfo = sourceInfo; - LOGGER.info("Initialised ParallelSnapshotter for task {}", config.taskId()); + LOGGER.info("Initialised ParallelSnapshotter for task {}", config.getTaskId()); } @Override diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java index 201f32792ef..f66797152ad 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java @@ -11,11 +11,11 @@ public class PostgresPartitionTest extends AbstractPartitionTest]+)>([^:]+)" + - pattern: "debezium.([^:]+)]+), partition=([^>]+)>([^:]+)" name: "debezium_metrics_$6" labels: plugin: "$1" name: "$2" task: "$3" context: "$4" - database: "$5" + partition: "$5" - pattern: "debezium.([^:]+)]+)>([^:]+)" name: "debezium_metrics_$5" labels: