Skip to content

Commit

Permalink
PC example with Prometheus integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
nachomdo committed Dec 14, 2022
1 parent 6852574 commit 6a3df9d
Show file tree
Hide file tree
Showing 28 changed files with 4,724 additions and 51 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM amazoncorretto:17-alpine3.16
COPY ./parallel-consumer-examples/parallel-consumer-example-core/target/pc-core-metrics-jar-with-dependencies.jar /tmp
ENTRYPOINT ["java", "-jar", "/tmp/pc-core-metrics-jar-with-dependencies.jar"]
16 changes: 10 additions & 6 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -433,26 +433,30 @@ Where `${project.version}` is the version to be used:
[source,java,indent=0]
----
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <1>
Producer<String, String> kafkaProducer = getKafkaProducer();
// Producer<String, String> kafkaProducer = getKafkaProducer();
Metrics.addRegistry(metricsRegistry);
new KafkaClientMetrics(kafkaConsumer).bindTo(Metrics.globalRegistry);
new JvmMemoryMetrics().bindTo(Metrics.globalRegistry);
new JvmHeapPressureMetrics().bindTo(Metrics.globalRegistry);
new JvmGcMetrics().bindTo(Metrics.globalRegistry);
new JvmThreadMetrics().bindTo(Metrics.globalRegistry);
new KafkaClientMetrics(kafkaConsumer).bindTo(metricsRegistry);
new KafkaClientMetrics(kafkaProducer).bindTo(metricsRegistry);
// new KafkaClientMetrics(kafkaProducer).bindTo(Metrics.globalRegistry);
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(KEY) // <2>
.maxConcurrency(1000) // <3>
.consumer(kafkaConsumer)
.producer(kafkaProducer)
//.producer(kafkaProducer)
.build();
ParallelStreamProcessor<String, String> eosStreamProcessor =
ParallelStreamProcessor.createEosStreamProcessor(options);
eosStreamProcessor.subscribe(of(inputTopic)); // <4>
pcMetricsTracker = new PCMetricsTracker(((AbstractParallelEoSStreamProcessor)eosStreamProcessor)::calculateMetricsWithIncompletes,
UniLists.of(Tag.of("region", "eu-west-1"), Tag.of("instance", "pc1")));
pcMetricsTracker.bindTo(metricsRegistry);
pcMetricsTracker.bindTo(Metrics.globalRegistry);
return eosStreamProcessor;
----

Expand Down
Loading

0 comments on commit 6a3df9d

Please sign in to comment.