diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java index 09c7c3fe93cf..4824787f90bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java @@ -83,7 +83,7 @@ public double getAccumulatedBucketSize(int endIndex) { } @Test - public void testConvert_successfulyConvertCounters() { + public void testConvert_successfullyConvertCounters() { String step = "testStepName"; Map emptyHistograms = new HashMap<>(); Map counters = new HashMap(); @@ -374,7 +374,7 @@ public void testConvert_convertCountersAndHistograms() { } @Test - public void testConvert_successfulyConvertGauges() { + public void testConvert_successfullyConvertGauges() { String step = "testStepName"; Map emptyHistograms = new HashMap<>(); Map counters = new HashMap(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java index 4bb0a01ec96a..c19d736aeb04 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -19,9 +19,9 @@ import com.google.auto.value.AutoValue; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; @@ -74,21 +74,19 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsImpl.class); - static ConcurrentHashMap latencyHistograms = - new ConcurrentHashMap(); + private static final Map LATENCY_HISTOGRAMS = + new HashMap(); - abstract ConcurrentHashMap> perTopicRpcLatencies(); + abstract HashMap> perTopicRpcLatencies(); - static ConcurrentHashMap backlogGauges = new ConcurrentHashMap(); - - abstract ConcurrentHashMap perTopicPartitionBacklogs(); + abstract HashMap perTopicPartitionBacklogs(); abstract AtomicBoolean isWritable(); public static KafkaMetricsImpl create() { return new AutoValue_KafkaMetrics_KafkaMetricsImpl( - new ConcurrentHashMap>(), - new ConcurrentHashMap(), + new HashMap>(), + new HashMap(), new AtomicBoolean(true)); } @@ -96,9 +94,9 @@ public static KafkaMetricsImpl create() { @Override public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { if (isWritable().get()) { - ConcurrentLinkedQueue latencies = perTopicRpcLatencies().get(topic); + ArrayList latencies = perTopicRpcLatencies().get(topic); if (latencies == null) { - latencies = new ConcurrentLinkedQueue(); + latencies = new ArrayList(); latencies.add(elapsedTime); perTopicRpcLatencies().putIfAbsent(topic, latencies); } else { @@ -122,18 +120,18 @@ public void updateBacklogBytes(String topicName, int partitionId, long backlog) /** Record rpc latency histogram metrics for all recorded topics. */ private void recordRpcLatencyMetrics() { - for (Map.Entry> topicLatencies : + for (Map.Entry> topicLatencies : perTopicRpcLatencies().entrySet()) { Histogram topicHistogram; - if (latencyHistograms.containsKey(topicLatencies.getKey())) { - topicHistogram = latencyHistograms.get(topicLatencies.getKey()); + if (LATENCY_HISTOGRAMS.containsKey(topicLatencies.getKey())) { + topicHistogram = LATENCY_HISTOGRAMS.get(topicLatencies.getKey()); } else { topicHistogram = KafkaSinkMetrics.createRPCLatencyHistogram( KafkaSinkMetrics.RpcMethod.POLL, topicLatencies.getKey()); - latencyHistograms.put(topicLatencies.getKey(), topicHistogram); + LATENCY_HISTOGRAMS.put(topicLatencies.getKey(), topicHistogram); } - // update all the latencies + // Update all the latencies for (Duration d : topicLatencies.getValue()) { Preconditions.checkArgumentNotNull(topicHistogram); topicHistogram.update(d.toMillis());