From e411cccd0470e7f354030a6a47528c5d4d469575 Mon Sep 17 00:00:00 2001 From: dielhennr Date: Tue, 20 Apr 2021 16:41:12 -0700 Subject: [PATCH 1/6] global-topic-count-metric-and-test --- .../kafka/controller/ControllerMetrics.java | 6 +++ .../kafka/controller/QuorumController.java | 2 +- .../controller/QuorumControllerMetrics.java | 27 +++++++++++ .../controller/ReplicationControlManager.java | 12 ++++- .../controller/MockControllerMetrics.java | 18 ++++++- .../ReplicationControlManagerTest.java | 48 ++++++++++++++++++- 6 files changed, 108 insertions(+), 5 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java index fd4f3befb805f..5cc7d4d0a9fe2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java @@ -26,4 +26,10 @@ public interface ControllerMetrics { void updateEventQueueTime(long durationMs); void updateEventQueueProcessingTime(long durationMs); + + int topicCount(); + + void incTopicCount(); + + void decTopicCount(); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 86faa5ede8e6c..42bc308c24160 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -928,7 +928,7 @@ private QuorumController(LogContext logContext, this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder); this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, - configurationControl, clusterControl); + configurationControl, clusterControl, controllerMetrics); this.logManager = logManager; this.metaLogListener = new QuorumMetaLogListener(); this.curClaimEpoch = -1L; diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java index ad56faf3da99c..2c442f2084238 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java @@ -30,11 +30,16 @@ public final class QuorumControllerMetrics implements ControllerMetrics { "kafka.controller", "ControllerEventManager", "EventQueueTimeMs", null); private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = new MetricName( "kafka.controller", "ControllerEventManager", "EventQueueProcessingTimeMs", null); + private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName( + "kafka.controller", "ReplicationControlManager", "GlobalTopicCount", null); + private volatile boolean active; private final Gauge activeControllerCount; private final Histogram eventQueueTime; private final Histogram eventQueueProcessingTime; + private int topicCount; + private final Gauge globalTopicCount; public QuorumControllerMetrics(MetricsRegistry registry) { this.active = false; @@ -46,6 +51,13 @@ public Integer value() { }); this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true); this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true); + this.topicCount = 0; + this.globalTopicCount = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge() { + @Override + public Integer value() { + return topicCount; + } + }); } @Override @@ -67,4 +79,19 @@ public void updateEventQueueTime(long durationMs) { public void updateEventQueueProcessingTime(long durationMs) { eventQueueTime.update(durationMs); } + + @Override + public int topicCount() { + return topicCount; + } + + @Override + public void incTopicCount() { + topicCount++; + } + + @Override + public void decTopicCount() { + topicCount--; + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index f169d1fb4aafc..109df162ba414 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -274,6 +274,11 @@ public String toString() { */ private final ClusterControlManager clusterControl; + /** + * A reference to the controller's metrics registry. + */ + private final ControllerMetrics controllerMetrics; + /** * Maps topic names to topic UUIDs. */ @@ -294,12 +299,14 @@ public String toString() { short defaultReplicationFactor, int defaultNumPartitions, ConfigurationControlManager configurationControl, - ClusterControlManager clusterControl) { + ClusterControlManager clusterControl, + ControllerMetrics controllerMetrics) { this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(ReplicationControlManager.class); this.defaultReplicationFactor = defaultReplicationFactor; this.defaultNumPartitions = defaultNumPartitions; this.configurationControl = configurationControl; + this.controllerMetrics = controllerMetrics; this.clusterControl = clusterControl; this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0); this.topics = new TimelineHashMap<>(snapshotRegistry, 0); @@ -310,6 +317,7 @@ public void replay(TopicRecord record) { topicsByName.put(record.name(), record.topicId()); topics.put(record.topicId(), new TopicControlInfo(record.name(), snapshotRegistry, record.topicId())); + controllerMetrics.incTopicCount(); log.info("Created topic {} with ID {}.", record.name(), record.topicId()); } @@ -378,7 +386,7 @@ public void replay(RemoveTopicRecord record) { } } brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER); - + controllerMetrics.decTopicCount(); log.info("Removed topic {} with ID {}.", topic.name, record.topicId()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java index 4e6523e37f286..2c6e37f8fb800 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java @@ -17,12 +17,13 @@ package org.apache.kafka.controller; - public final class MockControllerMetrics implements ControllerMetrics { private volatile boolean active; + private int topicCount; public MockControllerMetrics() { this.active = false; + this.topicCount = 0; } @Override @@ -44,4 +45,19 @@ public void updateEventQueueTime(long durationMs) { public void updateEventQueueProcessingTime(long durationMs) { // nothing to do } + + @Override + public int topicCount() { + return this.topicCount; + } + + @Override + public void incTopicCount() { + this.topicCount++; + } + + @Override + public void decTopicCount() { + this.topicCount--; + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index e524581f32e36..85d85ba91d710 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -87,6 +87,7 @@ private static class ReplicationControlTestContext { final ClusterControlManager clusterControl = new ClusterControlManager( logContext, time, snapshotRegistry, 1000, new SimpleReplicaPlacementPolicy(random)); + final ControllerMetrics metrics = new MockControllerMetrics(); final ConfigurationControlManager configurationControl = new ConfigurationControlManager( new LogContext(), snapshotRegistry, Collections.emptyMap()); final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry, @@ -94,7 +95,8 @@ private static class ReplicationControlTestContext { (short) 3, 1, configurationControl, - clusterControl); + clusterControl, + metrics); void replay(List records) throws Exception { ControllerTestUtils.replayAll(clusterControl, records); @@ -181,6 +183,50 @@ public void testCreateTopics() throws Exception { ctx.replicationControl.iterator(Long.MAX_VALUE)); } + @Test + public void testGlobalTopicMetric() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(-1).setReplicationFactor((short) -1)); + + registerBroker(0, ctx); + unfenceBroker(0, ctx); + registerBroker(1, ctx); + unfenceBroker(1, ctx); + registerBroker(2, ctx); + unfenceBroker(2, ctx); + + List topicsToDelete = new ArrayList<>(); + + ControllerResult result = + replicationControl.createTopics(request); + topicsToDelete.add(result.response().topics().find("foo").topicId()); + + ControllerTestUtils.replayAll(replicationControl, result.records()); + assertEquals(1, ctx.metrics.topicCount()); + + request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("bar"). + setNumPartitions(-1).setReplicationFactor((short) -1)); + request.topics().add(new CreatableTopic().setName("baz"). + setNumPartitions(-1).setReplicationFactor((short) -1)); + result = replicationControl.createTopics(request); + ControllerTestUtils.replayAll(replicationControl, result.records()); + assertEquals(3, ctx.metrics.topicCount()); + + topicsToDelete.add(result.response().topics().find("baz").topicId()); + ControllerResult> deleteResult = replicationControl.deleteTopics(topicsToDelete); + ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); + assertEquals(1, ctx.metrics.topicCount()); + + Uuid topicToDelete = result.response().topics().find("bar").topicId(); + deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete)); + ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); + assertEquals(0, ctx.metrics.topicCount()); + } + @Test public void testValidateNewTopicNames() { Map topicErrors = new HashMap<>(); From d1b502b7bf69a3b44c7f07867026d56b5c5eb9e3 Mon Sep 17 00:00:00 2001 From: dielhennr Date: Tue, 20 Apr 2021 17:45:41 -0700 Subject: [PATCH 2/6] add-global-partition-and-topic-metrics --- .../kafka/controller/ControllerMetrics.java | 6 +++++ .../controller/QuorumControllerMetrics.java | 25 +++++++++++++++++++ .../controller/ReplicationControlManager.java | 2 ++ .../controller/MockControllerMetrics.java | 17 +++++++++++++ .../ReplicationControlManagerTest.java | 11 +++++--- 5 files changed, 57 insertions(+), 4 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java index 5cc7d4d0a9fe2..35e23fb6385bb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java @@ -32,4 +32,10 @@ public interface ControllerMetrics { void incTopicCount(); void decTopicCount(); + + int partitionCount(); + + void incPartitionCount(); + + void decPartitionCount(); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java index 2c442f2084238..07c5ec8e47e7d 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java @@ -32,6 +32,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics { "kafka.controller", "ControllerEventManager", "EventQueueProcessingTimeMs", null); private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName( "kafka.controller", "ReplicationControlManager", "GlobalTopicCount", null); + private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName( + "kafka.controller", "ReplicationControlManager", "GlobalPartitionCount", null); private volatile boolean active; @@ -40,6 +42,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics { private final Histogram eventQueueProcessingTime; private int topicCount; private final Gauge globalTopicCount; + private int partitionCount; + private final Gauge globalPartitionCount; public QuorumControllerMetrics(MetricsRegistry registry) { this.active = false; @@ -58,6 +62,12 @@ public Integer value() { return topicCount; } }); + this.globalPartitionCount = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge() { + @Override + public Integer value() { + return partitionCount; + } + }); } @Override @@ -94,4 +104,19 @@ public void incTopicCount() { public void decTopicCount() { topicCount--; } + + @Override + public int partitionCount() { + return partitionCount; + } + + @Override + public void incPartitionCount() { + partitionCount++; + } + + @Override + public void decPartitionCount() { + partitionCount--; + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 109df162ba414..a6e83c07d77eb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -346,6 +346,7 @@ public void replay(PartitionRecord record) { newPartInfo.leader); } } + controllerMetrics.incPartitionCount(); } public void replay(PartitionChangeRecord record) { @@ -384,6 +385,7 @@ public void replay(RemoveTopicRecord record) { for (int i = 0; i < partition.isr.length; i++) { brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]); } + controllerMetrics.decPartitionCount(); } brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER); controllerMetrics.decTopicCount(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java index 2c6e37f8fb800..360df2f3b7c63 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java @@ -20,10 +20,12 @@ public final class MockControllerMetrics implements ControllerMetrics { private volatile boolean active; private int topicCount; + private int partitionCount; public MockControllerMetrics() { this.active = false; this.topicCount = 0; + this.partitionCount = 0; } @Override @@ -60,4 +62,19 @@ public void incTopicCount() { public void decTopicCount() { this.topicCount--; } + + @Override + public int partitionCount() { + return this.partitionCount; + } + + @Override + public void incPartitionCount() { + partitionCount++; + } + + @Override + public void decPartitionCount() { + partitionCount--; + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 85d85ba91d710..ce756f8b38158 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -184,12 +184,12 @@ public void testCreateTopics() throws Exception { } @Test - public void testGlobalTopicMetric() throws Exception { + public void testGlobalTopicAndPartitionMetrics() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext(); ReplicationControlManager replicationControl = ctx.replicationControl; CreateTopicsRequestData request = new CreateTopicsRequestData(); request.topics().add(new CreatableTopic().setName("foo"). - setNumPartitions(-1).setReplicationFactor((short) -1)); + setNumPartitions(1).setReplicationFactor((short) -1)); registerBroker(0, ctx); unfenceBroker(0, ctx); @@ -209,22 +209,25 @@ public void testGlobalTopicMetric() throws Exception { request = new CreateTopicsRequestData(); request.topics().add(new CreatableTopic().setName("bar"). - setNumPartitions(-1).setReplicationFactor((short) -1)); + setNumPartitions(1).setReplicationFactor((short) -1)); request.topics().add(new CreatableTopic().setName("baz"). - setNumPartitions(-1).setReplicationFactor((short) -1)); + setNumPartitions(2).setReplicationFactor((short) -1)); result = replicationControl.createTopics(request); ControllerTestUtils.replayAll(replicationControl, result.records()); assertEquals(3, ctx.metrics.topicCount()); + assertEquals(4, ctx.metrics.partitionCount()); topicsToDelete.add(result.response().topics().find("baz").topicId()); ControllerResult> deleteResult = replicationControl.deleteTopics(topicsToDelete); ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); assertEquals(1, ctx.metrics.topicCount()); + assertEquals(1, ctx.metrics.partitionCount()); Uuid topicToDelete = result.response().topics().find("bar").topicId(); deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete)); ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); assertEquals(0, ctx.metrics.topicCount()); + assertEquals(0, ctx.metrics.partitionCount()); } @Test From d65e8e56a6f54d78baa108b59ffbeb90c6e7c709 Mon Sep 17 00:00:00 2001 From: dielhennr Date: Wed, 21 Apr 2021 10:15:17 -0700 Subject: [PATCH 3/6] comments-addressed --- .../kafka/controller/ControllerMetrics.java | 12 +++---- .../controller/QuorumControllerMetrics.java | 36 +++++++------------ .../controller/ReplicationControlManager.java | 13 ++++--- .../controller/MockControllerMetrics.java | 28 +++++---------- .../ReplicationControlManagerTest.java | 14 ++++---- 5 files changed, 42 insertions(+), 61 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java index 35e23fb6385bb..406a533e7d7e4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java @@ -27,15 +27,11 @@ public interface ControllerMetrics { void updateEventQueueProcessingTime(long durationMs); - int topicCount(); + void setGlobalTopicsCount(int topicCount); - void incTopicCount(); + int globalTopicsCount(); - void decTopicCount(); + void setGlobalPartitionCount(int partitionCount); - int partitionCount(); - - void incPartitionCount(); - - void decPartitionCount(); + int globalPartitionCount(); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java index 07c5ec8e47e7d..3934ba3272aa6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java @@ -37,13 +37,13 @@ public final class QuorumControllerMetrics implements ControllerMetrics { private volatile boolean active; + private volatile int topicCount; + private volatile int partitionCount; private final Gauge activeControllerCount; + private final Gauge globalPartitionCount; + private final Gauge globalTopicCount; private final Histogram eventQueueTime; private final Histogram eventQueueProcessingTime; - private int topicCount; - private final Gauge globalTopicCount; - private int partitionCount; - private final Gauge globalPartitionCount; public QuorumControllerMetrics(MetricsRegistry registry) { this.active = false; @@ -62,7 +62,7 @@ public Integer value() { return topicCount; } }); - this.globalPartitionCount = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge() { + this.globalPartitionCount = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge() { @Override public Integer value() { return partitionCount; @@ -91,32 +91,22 @@ public void updateEventQueueProcessingTime(long durationMs) { } @Override - public int topicCount() { - return topicCount; - } - - @Override - public void incTopicCount() { - topicCount++; - } - - @Override - public void decTopicCount() { - topicCount--; + public void setGlobalTopicsCount(int topicCount) { + this.topicCount = topicCount; } @Override - public int partitionCount() { - return partitionCount; + public int globalTopicsCount() { + return this.topicCount; } @Override - public void incPartitionCount() { - partitionCount++; + public void setGlobalPartitionCount(int partitionCount) { + this.partitionCount = partitionCount; } @Override - public void decPartitionCount() { - partitionCount--; + public int globalPartitionCount() { + return this.partitionCount; } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index a6e83c07d77eb..339e2c1f4c225 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -264,6 +264,8 @@ public String toString() { */ private final int defaultNumPartitions; + private int globalPartitionCount; + /** * A reference to the controller's configuration control manager. */ @@ -308,6 +310,7 @@ public String toString() { this.configurationControl = configurationControl; this.controllerMetrics = controllerMetrics; this.clusterControl = clusterControl; + this.globalPartitionCount = 0; this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0); this.topics = new TimelineHashMap<>(snapshotRegistry, 0); this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry); @@ -317,7 +320,7 @@ public void replay(TopicRecord record) { topicsByName.put(record.name(), record.topicId()); topics.put(record.topicId(), new TopicControlInfo(record.name(), snapshotRegistry, record.topicId())); - controllerMetrics.incTopicCount(); + controllerMetrics.setGlobalTopicsCount(topics.size()); log.info("Created topic {} with ID {}.", record.name(), record.topicId()); } @@ -335,6 +338,7 @@ public void replay(PartitionRecord record) { topicInfo.parts.put(record.partitionId(), newPartInfo); brokersToIsrs.update(record.topicId(), record.partitionId(), null, newPartInfo.isr, NO_LEADER, newPartInfo.leader); + globalPartitionCount++; } else { String diff = newPartInfo.diff(prevPartInfo); if (!diff.isEmpty()) { @@ -346,7 +350,7 @@ public void replay(PartitionRecord record) { newPartInfo.leader); } } - controllerMetrics.incPartitionCount(); + controllerMetrics.setGlobalPartitionCount(globalPartitionCount); } public void replay(PartitionChangeRecord record) { @@ -385,10 +389,11 @@ public void replay(RemoveTopicRecord record) { for (int i = 0; i < partition.isr.length; i++) { brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]); } - controllerMetrics.decPartitionCount(); + globalPartitionCount--; } brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER); - controllerMetrics.decTopicCount(); + controllerMetrics.setGlobalTopicsCount(topics.size()); + controllerMetrics.setGlobalPartitionCount(globalPartitionCount); log.info("Removed topic {} with ID {}.", topic.name, record.topicId()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java index 360df2f3b7c63..10cf217ebb087 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java @@ -19,8 +19,8 @@ public final class MockControllerMetrics implements ControllerMetrics { private volatile boolean active; - private int topicCount; - private int partitionCount; + private volatile int topicCount; + private volatile int partitionCount; public MockControllerMetrics() { this.active = false; @@ -49,32 +49,22 @@ public void updateEventQueueProcessingTime(long durationMs) { } @Override - public int topicCount() { - return this.topicCount; + public void setGlobalTopicsCount(int topicCount) { + this.topicCount = topicCount; } @Override - public void incTopicCount() { - this.topicCount++; + public int globalTopicsCount() { + return this.topicCount; } @Override - public void decTopicCount() { - this.topicCount--; + public void setGlobalPartitionCount(int partitionCount) { + this.partitionCount = partitionCount; } @Override - public int partitionCount() { + public int globalPartitionCount() { return this.partitionCount; } - - @Override - public void incPartitionCount() { - partitionCount++; - } - - @Override - public void decPartitionCount() { - partitionCount--; - } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index ce756f8b38158..be75ed5e1c48b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -205,7 +205,7 @@ public void testGlobalTopicAndPartitionMetrics() throws Exception { topicsToDelete.add(result.response().topics().find("foo").topicId()); ControllerTestUtils.replayAll(replicationControl, result.records()); - assertEquals(1, ctx.metrics.topicCount()); + assertEquals(1, ctx.metrics.globalTopicsCount()); request = new CreateTopicsRequestData(); request.topics().add(new CreatableTopic().setName("bar"). @@ -214,20 +214,20 @@ public void testGlobalTopicAndPartitionMetrics() throws Exception { setNumPartitions(2).setReplicationFactor((short) -1)); result = replicationControl.createTopics(request); ControllerTestUtils.replayAll(replicationControl, result.records()); - assertEquals(3, ctx.metrics.topicCount()); - assertEquals(4, ctx.metrics.partitionCount()); + assertEquals(3, ctx.metrics.globalTopicsCount()); + assertEquals(4, ctx.metrics.globalPartitionCount()); topicsToDelete.add(result.response().topics().find("baz").topicId()); ControllerResult> deleteResult = replicationControl.deleteTopics(topicsToDelete); ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); - assertEquals(1, ctx.metrics.topicCount()); - assertEquals(1, ctx.metrics.partitionCount()); + assertEquals(1, ctx.metrics.globalTopicsCount()); + assertEquals(1, ctx.metrics.globalPartitionCount()); Uuid topicToDelete = result.response().topics().find("bar").topicId(); deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete)); ControllerTestUtils.replayAll(replicationControl, deleteResult.records()); - assertEquals(0, ctx.metrics.topicCount()); - assertEquals(0, ctx.metrics.partitionCount()); + assertEquals(0, ctx.metrics.globalTopicsCount()); + assertEquals(0, ctx.metrics.globalPartitionCount()); } @Test From b21974c528bcea2552d3c8026245191ce27ea11a Mon Sep 17 00:00:00 2001 From: dielhennr Date: Mon, 10 May 2021 16:58:53 -0700 Subject: [PATCH 4/6] adding-offline-partition-and-replica-imbalance-metrics --- .../kafka/controller/BrokersToIsrs.java | 10 +++ .../kafka/controller/ControllerMetrics.java | 8 +++ .../controller/QuorumControllerMetrics.java | 61 ++++++++++++++++--- .../controller/ReplicationControlManager.java | 20 +++++- .../controller/MockControllerMetrics.java | 40 +++++++++--- .../ReplicationControlManagerTest.java | 54 ++++++++++++++++ 6 files changed, 175 insertions(+), 18 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java index d8e0319f94f7e..441ee32eb8c92 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java @@ -326,4 +326,14 @@ PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) { boolean hasLeaderships(int brokerId) { return iterator(brokerId, true).hasNext(); } + + int offlinePartitionCount() { + PartitionsOnReplicaIterator noLeaderIterator = partitionsWithNoLeader(); + int offlinePartitionCount = 0; + while (noLeaderIterator.hasNext()) { + noLeaderIterator.next(); + offlinePartitionCount++; + } + return offlinePartitionCount; + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java index 406a533e7d7e4..7c862bc33c059 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java @@ -34,4 +34,12 @@ public interface ControllerMetrics { void setGlobalPartitionCount(int partitionCount); int globalPartitionCount(); + + void setOfflinePartitionCount(int offlinePartitions); + + int offlinePartitionCount(); + + void setPreferredReplicaImbalanceCount(int replicaImbalances); + + int preferredReplicaImbalanceCount(); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java index 3934ba3272aa6..c33c383ffc972 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java @@ -34,19 +34,31 @@ public final class QuorumControllerMetrics implements ControllerMetrics { "kafka.controller", "ReplicationControlManager", "GlobalTopicCount", null); private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName( "kafka.controller", "ReplicationControlManager", "GlobalPartitionCount", null); + private final static MetricName OFFLINE_PARTITION_COUNT = new MetricName( + "kafka.controller", "ReplicationControlManager", "OfflinePartitionCount", null); + private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = new MetricName( + "kafka.controller", "ReplicationControlManager", "PreferredReplicaImbalanceCount", null); private volatile boolean active; - private volatile int topicCount; - private volatile int partitionCount; + private volatile int topics; + private volatile int partitions; + private volatile int offlinePartitions; + private volatile int preferredReplicaImbalances; private final Gauge activeControllerCount; private final Gauge globalPartitionCount; private final Gauge globalTopicCount; + private final Gauge offlinePartitionCount; + private final Gauge preferredReplicaImbalanceCount; private final Histogram eventQueueTime; private final Histogram eventQueueProcessingTime; public QuorumControllerMetrics(MetricsRegistry registry) { this.active = false; + this.topics = 0; + this.partitions = 0; + this.offlinePartitions = 0; + this.preferredReplicaImbalances = 0; this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge() { @Override public Integer value() { @@ -55,17 +67,28 @@ public Integer value() { }); this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true); this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true); - this.topicCount = 0; this.globalTopicCount = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge() { @Override public Integer value() { - return topicCount; + return topics; } }); this.globalPartitionCount = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge() { @Override public Integer value() { - return partitionCount; + return partitions; + } + }); + this.offlinePartitionCount = registry.newGauge(OFFLINE_PARTITION_COUNT, new Gauge() { + @Override + public Integer value() { + return offlinePartitions; + } + }); + this.preferredReplicaImbalanceCount = registry.newGauge(PREFERRED_REPLICA_IMBALANCE_COUNT, new Gauge() { + @Override + public Integer value() { + return preferredReplicaImbalances; } }); } @@ -92,21 +115,41 @@ public void updateEventQueueProcessingTime(long durationMs) { @Override public void setGlobalTopicsCount(int topicCount) { - this.topicCount = topicCount; + this.topics = topicCount; } @Override public int globalTopicsCount() { - return this.topicCount; + return this.topics; } @Override public void setGlobalPartitionCount(int partitionCount) { - this.partitionCount = partitionCount; + this.partitions = partitionCount; } @Override public int globalPartitionCount() { - return this.partitionCount; + return this.partitions; + } + + @Override + public void setOfflinePartitionCount(int offlinePartitions) { + this.offlinePartitions = offlinePartitions; + } + + @Override + public int offlinePartitionCount() { + return this.offlinePartitions; + } + + @Override + public void setPreferredReplicaImbalanceCount(int replicaImbalances) { + this.preferredReplicaImbalances = replicaImbalances; + } + + @Override + public int preferredReplicaImbalanceCount() { + return this.preferredReplicaImbalances; } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 113bcf397fc31..ed7994dd41f23 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -354,14 +354,16 @@ public void replay(PartitionRecord record) { topicInfo.parts.put(record.partitionId(), newPartInfo); brokersToIsrs.update(record.topicId(), record.partitionId(), null, newPartInfo.isr, NO_LEADER, newPartInfo.leader); - } else if (!newPartInfo.equals(prevPartInfo)) { globalPartitionCount++; + } else if (!newPartInfo.equals(prevPartInfo)) { newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo); topicInfo.parts.put(record.partitionId(), newPartInfo); brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader); } controllerMetrics.setGlobalPartitionCount(globalPartitionCount); + controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount()); + controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount()); } public void replay(PartitionChangeRecord record) { @@ -383,6 +385,8 @@ public void replay(PartitionChangeRecord record) { String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " + record.topicId(); newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo); + controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount()); + controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount()); } public void replay(RemoveTopicRecord record) { @@ -407,6 +411,8 @@ public void replay(RemoveTopicRecord record) { brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER); controllerMetrics.setGlobalTopicsCount(topics.size()); controllerMetrics.setGlobalPartitionCount(globalPartitionCount); + controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount()); + controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount()); log.info("Removed topic {} with ID {}.", topic.name, record.topicId()); } @@ -471,6 +477,18 @@ public void replay(RemoveTopicRecord record) { return ControllerResult.atomicOf(records, data); } + private int preferredReplicaImbalanceCount() { + int count = 0; + for (TopicControlInfo topic : topics.values()) { + for (PartitionControlInfo part : topic.parts.values()) { + if (part.leader != part.preferredReplica()) { + count++; + } + } + } + return count; + } + private ApiError createTopic(CreatableTopic topic, List records, Map successes) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java index 10cf217ebb087..f13b24865a637 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java @@ -19,13 +19,17 @@ public final class MockControllerMetrics implements ControllerMetrics { private volatile boolean active; - private volatile int topicCount; - private volatile int partitionCount; + private volatile int topics; + private volatile int partitions; + private volatile int offlinePartitions; + private volatile int preferredReplicaImbalances; public MockControllerMetrics() { this.active = false; - this.topicCount = 0; - this.partitionCount = 0; + this.topics = 0; + this.partitions = 0; + this.offlinePartitions = 0; + this.preferredReplicaImbalances = 0; } @Override @@ -50,21 +54,41 @@ public void updateEventQueueProcessingTime(long durationMs) { @Override public void setGlobalTopicsCount(int topicCount) { - this.topicCount = topicCount; + this.topics = topicCount; } @Override public int globalTopicsCount() { - return this.topicCount; + return this.topics; } @Override public void setGlobalPartitionCount(int partitionCount) { - this.partitionCount = partitionCount; + this.partitions = partitionCount; } @Override public int globalPartitionCount() { - return this.partitionCount; + return this.partitions; + } + + @Override + public void setOfflinePartitionCount(int offlinePartitions) { + this.offlinePartitions = offlinePartitions; + } + + @Override + public int offlinePartitionCount() { + return this.offlinePartitions; + } + + @Override + public void setPreferredReplicaImbalanceCount(int replicaImbalances) { + this.preferredReplicaImbalances = replicaImbalances; + } + + @Override + public int preferredReplicaImbalanceCount() { + return this.preferredReplicaImbalances; } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 38d2cd9cd5fcf..badd1f86dd73e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -252,6 +252,60 @@ public void testGlobalTopicAndPartitionMetrics() throws Exception { assertEquals(0, ctx.metrics.globalPartitionCount()); } + @Test + public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replicationControl = ctx.replicationControl; + + for (int i = 0; i < 4; i++) { + registerBroker(i, ctx); + unfenceBroker(i, ctx); + } + + ctx.createTestTopic("foo", + new int[][] { + new int[] {0, 2}, + new int[] {0, 1} + }); + + ctx.createTestTopic("zar", + new int[][] { + new int[] {0, 1, 2}, + new int[] {1, 2, 3}, + new int[] {1, 2, 0} + }); + + ControllerResult result = replicationControl.unregisterBroker(0); + ctx.replay(result.records()); + + // All partitions should still be online after unregistering broker 0 + assertEquals(0, ctx.metrics.offlinePartitionCount()); + // Three partitions should not have their preferred (first) replica 0 + assertEquals(3, ctx.metrics.preferredReplicaImbalanceCount()); + + result = replicationControl.unregisterBroker(1); + ctx.replay(result.records()); + + // After unregistering broker 1, 1 partition for topic foo should go offline + assertEquals(1, ctx.metrics.offlinePartitionCount()); + // All five partitions should not have their preferred (first) replica at this point + assertEquals(5, ctx.metrics.preferredReplicaImbalanceCount()); + + result = replicationControl.unregisterBroker(2); + ctx.replay(result.records()); + + // After unregistering broker 2, the last partition for topic foo should go offline + // and 2 partitions for topic zar should go offline + assertEquals(4, ctx.metrics.offlinePartitionCount()); + + result = replicationControl.unregisterBroker(3); + ctx.replay(result.records()); + + // After unregistering broker 3 the last partition for topic zar should go offline + assertEquals(5, ctx.metrics.offlinePartitionCount()); + } + + @Test public void testValidateNewTopicNames() { Map topicErrors = new HashMap<>(); From 7b75ac9743bf8435ea9e969a8c8af5d8a444fa59 Mon Sep 17 00:00:00 2001 From: dielhennr Date: Wed, 12 May 2021 10:50:30 -0700 Subject: [PATCH 5/6] global-topic-partition-counts --- .../kafka/controller/BrokersToIsrs.java | 10 ---- .../kafka/controller/ControllerMetrics.java | 8 --- .../controller/QuorumControllerMetrics.java | 43 --------------- .../controller/ReplicationControlManager.java | 18 ------- .../controller/MockControllerMetrics.java | 24 --------- .../ReplicationControlManagerTest.java | 54 ------------------- 6 files changed, 157 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java index 441ee32eb8c92..d8e0319f94f7e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java @@ -326,14 +326,4 @@ PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) { boolean hasLeaderships(int brokerId) { return iterator(brokerId, true).hasNext(); } - - int offlinePartitionCount() { - PartitionsOnReplicaIterator noLeaderIterator = partitionsWithNoLeader(); - int offlinePartitionCount = 0; - while (noLeaderIterator.hasNext()) { - noLeaderIterator.next(); - offlinePartitionCount++; - } - return offlinePartitionCount; - } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java index 7c862bc33c059..406a533e7d7e4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java @@ -34,12 +34,4 @@ public interface ControllerMetrics { void setGlobalPartitionCount(int partitionCount); int globalPartitionCount(); - - void setOfflinePartitionCount(int offlinePartitions); - - int offlinePartitionCount(); - - void setPreferredReplicaImbalanceCount(int replicaImbalances); - - int preferredReplicaImbalanceCount(); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java index c33c383ffc972..90ceba3515057 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java @@ -34,22 +34,13 @@ public final class QuorumControllerMetrics implements ControllerMetrics { "kafka.controller", "ReplicationControlManager", "GlobalTopicCount", null); private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName( "kafka.controller", "ReplicationControlManager", "GlobalPartitionCount", null); - private final static MetricName OFFLINE_PARTITION_COUNT = new MetricName( - "kafka.controller", "ReplicationControlManager", "OfflinePartitionCount", null); - private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = new MetricName( - "kafka.controller", "ReplicationControlManager", "PreferredReplicaImbalanceCount", null); - private volatile boolean active; private volatile int topics; private volatile int partitions; - private volatile int offlinePartitions; - private volatile int preferredReplicaImbalances; private final Gauge activeControllerCount; private final Gauge globalPartitionCount; private final Gauge globalTopicCount; - private final Gauge offlinePartitionCount; - private final Gauge preferredReplicaImbalanceCount; private final Histogram eventQueueTime; private final Histogram eventQueueProcessingTime; @@ -57,8 +48,6 @@ public QuorumControllerMetrics(MetricsRegistry registry) { this.active = false; this.topics = 0; this.partitions = 0; - this.offlinePartitions = 0; - this.preferredReplicaImbalances = 0; this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge() { @Override public Integer value() { @@ -79,18 +68,6 @@ public Integer value() { return partitions; } }); - this.offlinePartitionCount = registry.newGauge(OFFLINE_PARTITION_COUNT, new Gauge() { - @Override - public Integer value() { - return offlinePartitions; - } - }); - this.preferredReplicaImbalanceCount = registry.newGauge(PREFERRED_REPLICA_IMBALANCE_COUNT, new Gauge() { - @Override - public Integer value() { - return preferredReplicaImbalances; - } - }); } @Override @@ -132,24 +109,4 @@ public void setGlobalPartitionCount(int partitionCount) { public int globalPartitionCount() { return this.partitions; } - - @Override - public void setOfflinePartitionCount(int offlinePartitions) { - this.offlinePartitions = offlinePartitions; - } - - @Override - public int offlinePartitionCount() { - return this.offlinePartitions; - } - - @Override - public void setPreferredReplicaImbalanceCount(int replicaImbalances) { - this.preferredReplicaImbalances = replicaImbalances; - } - - @Override - public int preferredReplicaImbalanceCount() { - return this.preferredReplicaImbalances; - } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index ed7994dd41f23..c66251cef573c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -362,8 +362,6 @@ public void replay(PartitionRecord record) { newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader); } controllerMetrics.setGlobalPartitionCount(globalPartitionCount); - controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount()); - controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount()); } public void replay(PartitionChangeRecord record) { @@ -385,8 +383,6 @@ public void replay(PartitionChangeRecord record) { String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " + record.topicId(); newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo); - controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount()); - controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount()); } public void replay(RemoveTopicRecord record) { @@ -411,8 +407,6 @@ public void replay(RemoveTopicRecord record) { brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER); controllerMetrics.setGlobalTopicsCount(topics.size()); controllerMetrics.setGlobalPartitionCount(globalPartitionCount); - controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount()); - controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount()); log.info("Removed topic {} with ID {}.", topic.name, record.topicId()); } @@ -477,18 +471,6 @@ public void replay(RemoveTopicRecord record) { return ControllerResult.atomicOf(records, data); } - private int preferredReplicaImbalanceCount() { - int count = 0; - for (TopicControlInfo topic : topics.values()) { - for (PartitionControlInfo part : topic.parts.values()) { - if (part.leader != part.preferredReplica()) { - count++; - } - } - } - return count; - } - private ApiError createTopic(CreatableTopic topic, List records, Map successes) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java index f13b24865a637..45a69d74c5573 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java @@ -21,15 +21,11 @@ public final class MockControllerMetrics implements ControllerMetrics { private volatile boolean active; private volatile int topics; private volatile int partitions; - private volatile int offlinePartitions; - private volatile int preferredReplicaImbalances; public MockControllerMetrics() { this.active = false; this.topics = 0; this.partitions = 0; - this.offlinePartitions = 0; - this.preferredReplicaImbalances = 0; } @Override @@ -71,24 +67,4 @@ public void setGlobalPartitionCount(int partitionCount) { public int globalPartitionCount() { return this.partitions; } - - @Override - public void setOfflinePartitionCount(int offlinePartitions) { - this.offlinePartitions = offlinePartitions; - } - - @Override - public int offlinePartitionCount() { - return this.offlinePartitions; - } - - @Override - public void setPreferredReplicaImbalanceCount(int replicaImbalances) { - this.preferredReplicaImbalances = replicaImbalances; - } - - @Override - public int preferredReplicaImbalanceCount() { - return this.preferredReplicaImbalances; - } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index badd1f86dd73e..38d2cd9cd5fcf 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -252,60 +252,6 @@ public void testGlobalTopicAndPartitionMetrics() throws Exception { assertEquals(0, ctx.metrics.globalPartitionCount()); } - @Test - public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception { - ReplicationControlTestContext ctx = new ReplicationControlTestContext(); - ReplicationControlManager replicationControl = ctx.replicationControl; - - for (int i = 0; i < 4; i++) { - registerBroker(i, ctx); - unfenceBroker(i, ctx); - } - - ctx.createTestTopic("foo", - new int[][] { - new int[] {0, 2}, - new int[] {0, 1} - }); - - ctx.createTestTopic("zar", - new int[][] { - new int[] {0, 1, 2}, - new int[] {1, 2, 3}, - new int[] {1, 2, 0} - }); - - ControllerResult result = replicationControl.unregisterBroker(0); - ctx.replay(result.records()); - - // All partitions should still be online after unregistering broker 0 - assertEquals(0, ctx.metrics.offlinePartitionCount()); - // Three partitions should not have their preferred (first) replica 0 - assertEquals(3, ctx.metrics.preferredReplicaImbalanceCount()); - - result = replicationControl.unregisterBroker(1); - ctx.replay(result.records()); - - // After unregistering broker 1, 1 partition for topic foo should go offline - assertEquals(1, ctx.metrics.offlinePartitionCount()); - // All five partitions should not have their preferred (first) replica at this point - assertEquals(5, ctx.metrics.preferredReplicaImbalanceCount()); - - result = replicationControl.unregisterBroker(2); - ctx.replay(result.records()); - - // After unregistering broker 2, the last partition for topic foo should go offline - // and 2 partitions for topic zar should go offline - assertEquals(4, ctx.metrics.offlinePartitionCount()); - - result = replicationControl.unregisterBroker(3); - ctx.replay(result.records()); - - // After unregistering broker 3 the last partition for topic zar should go offline - assertEquals(5, ctx.metrics.offlinePartitionCount()); - } - - @Test public void testValidateNewTopicNames() { Map topicErrors = new HashMap<>(); From 2e2513f77bfdbd3d5f3a25c5c13d758d2ce948f8 Mon Sep 17 00:00:00 2001 From: dielhennr Date: Thu, 13 May 2021 10:25:28 -0700 Subject: [PATCH 6/6] address-comments --- .../controller/QuorumControllerMetrics.java | 32 +++++++++---------- .../controller/ReplicationControlManager.java | 5 ++- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java index 90ceba3515057..a9de1ff0a6836 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java @@ -31,23 +31,23 @@ public final class QuorumControllerMetrics implements ControllerMetrics { private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = new MetricName( "kafka.controller", "ControllerEventManager", "EventQueueProcessingTimeMs", null); private final static MetricName GLOBAL_TOPIC_COUNT = new MetricName( - "kafka.controller", "ReplicationControlManager", "GlobalTopicCount", null); + "kafka.controller", "KafkaController", "GlobalTopicCount", null); private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName( - "kafka.controller", "ReplicationControlManager", "GlobalPartitionCount", null); + "kafka.controller", "KafkaController", "GlobalPartitionCount", null); private volatile boolean active; - private volatile int topics; - private volatile int partitions; + private volatile int globalTopicCount; + private volatile int globalPartitionCount; private final Gauge activeControllerCount; - private final Gauge globalPartitionCount; - private final Gauge globalTopicCount; + private final Gauge globalPartitionCountGauge; + private final Gauge globalTopicCountGauge; private final Histogram eventQueueTime; private final Histogram eventQueueProcessingTime; public QuorumControllerMetrics(MetricsRegistry registry) { this.active = false; - this.topics = 0; - this.partitions = 0; + this.globalTopicCount = 0; + this.globalPartitionCount = 0; this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge() { @Override public Integer value() { @@ -56,16 +56,16 @@ public Integer value() { }); this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true); this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true); - this.globalTopicCount = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge() { + this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge() { @Override public Integer value() { - return topics; + return globalTopicCount; } }); - this.globalPartitionCount = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge() { + this.globalPartitionCountGauge = registry.newGauge(GLOBAL_PARTITION_COUNT, new Gauge() { @Override public Integer value() { - return partitions; + return globalPartitionCount; } }); } @@ -92,21 +92,21 @@ public void updateEventQueueProcessingTime(long durationMs) { @Override public void setGlobalTopicsCount(int topicCount) { - this.topics = topicCount; + this.globalTopicCount = topicCount; } @Override public int globalTopicsCount() { - return this.topics; + return this.globalTopicCount; } @Override public void setGlobalPartitionCount(int partitionCount) { - this.partitions = partitionCount; + this.globalPartitionCount = partitionCount; } @Override public int globalPartitionCount() { - return this.partitions; + return this.globalPartitionCount; } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index c66251cef573c..d039c2c7c51aa 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -279,6 +279,9 @@ public String toString() { */ private final int defaultNumPartitions; + /** + * A count of the total number of partitions in the cluster. + */ private int globalPartitionCount; /** @@ -355,13 +358,13 @@ public void replay(PartitionRecord record) { brokersToIsrs.update(record.topicId(), record.partitionId(), null, newPartInfo.isr, NO_LEADER, newPartInfo.leader); globalPartitionCount++; + controllerMetrics.setGlobalPartitionCount(globalPartitionCount); } else if (!newPartInfo.equals(prevPartInfo)) { newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo); topicInfo.parts.put(record.partitionId(), newPartInfo); brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader); } - controllerMetrics.setGlobalPartitionCount(globalPartitionCount); } public void replay(PartitionChangeRecord record) {