Skip to content

Commit

Permalink
Addressed PR comments, implemented using JmxReporter by default
Browse files Browse the repository at this point in the history
Signed-off-by: Marko Strukelj <[email protected]>
  • Loading branch information
mstruk committed Jul 4, 2023
1 parent db294ff commit 611c52c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 24 deletions.
45 changes: 40 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1235,24 +1235,59 @@ If `OAUTH_ENABLE_METRICS` env variable is set or if `oauth.enable.metrics` syste

The OAuth metrics ignores the Kafka `metric.reporters` option in order to prevent automatically instantiating double instances of reporters. Most reporters may expect that they are singleton object and may not function properly in multiple copies.
Instead, there is `strimzi.oauth.metric.reporters` option where the reporters that support multiple copies can be specified for the purpose of metrics integration:
- `strimzi.oauth.metric.reporters` (e.g.: "org.apache.kafka.common.metrics.JmxReporter", use ',' as a separator to enable multiple reporters)
- `strimzi.oauth.metric.reporters` (e.g.: "org.apache.kafka.common.metrics.JmxReporter,org.some.package.SomeMetricReporter", use ',' as a separator to enable multiple reporters.)

The configuration option `strimzi.oauth.metric.reporters` has to be configured as an env variable or a system property. Using it on the Kafka broker inside `server.properties` does not work reliably due to multiple pluggability mechanisms that can be used (authorizer, authentication callback handler, client).
If this configuration option is not set and OAuth metrics is enabled for some component, then a new instance of `org.apache.kafka.common.metrics.JmxReporter` will automatically be instantiated to provide JMX integration for OAuth metrics.
However, if `strimzi.oauth.metric.reporters` is set, then only the reporters specified in the list will be instantiated and integrated. Setting the option to an empty string will result in no reporters instantiated.

The configuration option `strimzi.oauth.metric.reporters` on the Kafka broker has to be configured as an env variable or a system property. Using it on the Kafka broker inside `server.properties` does not work reliably due to multiple pluggability mechanisms that can be used (authorizer, authentication callback handler, inter-broker client).
Some of these mechanisms get the `server.properties` filtered so only configuration recognised by Kafka makes it through. However, this is a global OAuth Metrics configuration, and it is initialized on first use by any of the components, using the configuration provided to that component.
Specifically, the inter-broker client using OAUTHBEARER might be the first to trigger OAuth Metrics initialisation on the broker, and does not see this config option.

In order to reliably configure `strimzi.oauth.metric.reporters` one of the following options should be used when starting a Kafka broker:
- `STRIMZI_OAUTH_METRIC_REPORTERS` env variable
- `strimzi.oauth.metric.reporters` env variable or system property

When OAuth metrics are enabled the OAuth layer can not reuse existing metric reporter instances, and have to create its own copies of metric reporters. Noteably, it does not create its copy of `org.apache.kafka.common.metrics.JmxReporter` automatically. Instead, it has to be explicitly specified in `strimzi.oauth.metric.reporters` configuration as item in the list of reporters to instantiate.
At the moment there is no way to integrate with the existing Kafka metrics / reporters objects already instantiated in different Kafka runtimes (producer, consumer, broker, ...).
When OAuth metrics is enabled the OAuth layer has to create its own copies of metric reporters.

NOTE: This breaks compatibility with OAuth versions preceding 0.13.0 where `metric.reporters` was used to configure reporters which were consequently automatically instantiated twice.

NOTE: In OAuth versions preceding 0.13.0 the configuration option `metric.reporters` was used to configure reporters which were consequently automatically instantiated twice.
The configuration option `metric.reporters` is no longer used.

The Kafka options that control sampling are honored: `metrics.num.samples`, `metrics.recording.level` and `metrics.sample.window.ms`.

### Example for Kafka broker:
```
# Enable OAuth metrics for all listeners, Keycloak authorizer, and inter-broker clients:
export OAUTH_ENABLE_METRICS=true
# Use a custom metric reporter rather than the default JmxReporter
export STRIMZI_OAUTH_METRIC_REPORTERS=org.some.package.SomeMetricReporter
bin/kafka-server-start.sh config/server.properties
```

### Example for Kafka client:
```
# Show the content of client properties file
cat ~/client.properties
...
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="https://server/token-endpoint" oauth.client.id="clientId" oauth.client.secret="client-secret" oauth.enable.metrics="true";
strimzi.oauth.metric.reporters=org.some.package.SomeMetricReporter
...
# Start the client
bin/kafka-console-producer.sh --broker-list kafka:9092 --topic my-topic --producer.config=$HOME/client.properties
```

### Simplest example for Kafka broker:
```
# Enable OAuth metrics for all listeners, Keycloak authorizer, and inter-broker clients:
# With no 'strimzi.oauth.metric.reporters' specified 'org.apache.kafka.common.metrics.JmxReporter' will be used automatically
export OAUTH_ENABLE_METRICS=true
bin/kafka-server-start.sh config/server.properties
```

A common use-case is for metrics to be exposed through JMX managed beans. They can then also be exposed as Prometheus metrics by using the Prometheus JMX Exporter agent, and mapping the JMX metrics names to prometheus metrics names.
If `oauth.config.id` is specified in JAAS configuration of the listener or the client, it will be available in MBean / metric name as `contextId` attribute. If not specified, it will be calculated from JAAS configuration for the validator or default to `client` in client JAAS config, or `keycloak-authorizer` for KeycloakAuthorizer metrics.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -43,17 +44,25 @@
* There is a one-to-one mapping between a <code>SensorKey</code> and a <code>Sensor</code>, and one-to-one mapping between a <code>Sensor</code> and an <code>MBean</code> name.
* <p>
* MBeans are registered as requested by <code>JmxReporter</code> attached to the <code>Metrics</code> object.
* The <code>JmxReporter</code> has to be explicitly configured using a config option <code>strimzi.oauth.metric.reporters</code>, which is
* analogous to the Kafka <code>metric.reporters</code> configuration option. It is not configured by default.
* The <code>JmxReporter</code> either has to be explicitly configured using a config option <code>strimzi.oauth.metric.reporters</code>,
* or if that config option si not set, a new instance is configured by default.
* <p>
* Since OAuth instantiates its own <code>Metrics</code> object it also has to instantiate reporters to attach them to this <code>Metrics</code> object.
* To prevent double instantiation of <code>MetricReporter</code> objects that require to be singleton, all <code>MetricReporter</code> objects
* to be integrated with <code>OAuthMetrics</code> have to be separately instantiated.
* <p>
* Example:
* Example 1:
* <pre>
* strimzi.oauth.metric.reporters=org.apache.kafka.common.metrics.JmxReporter
* strimzi.oauth.metric.reporters=org.apache.kafka.common.metrics.JmxReporter,org.some.package.SomeMetricReporter
* </pre>
* The above will instantiate and integrate with OAuth metrics the JmxReporter instance, and a SomeMetricReporter instance.
* <p>
* Example 2:
* <pre>
* strimzi.oauth.metric.reporters=
* </pre>
* The above will not instantiate and integrate any metric reporters with OAuth metrics, not even JmxReporter.
* <p>
* Note: On the Kafka broker it is best to use <code>STRIMZI_OAUTH_METRIC_REPORTERS</code> env variable or <code>strimzi.oauth.metric.reporters</code> system property,
* rather than a `server.properties` global configuration option.
*/
Expand Down Expand Up @@ -112,10 +121,10 @@ private List<MetricsReporter> initReporters() {
AbstractConfig kafkaConfig = initKafkaConfig();
if (configMap.get(STRIMZI_OAUTH_METRIC_REPORTERS) != null) {
return kafkaConfig.getConfiguredInstances(STRIMZI_OAUTH_METRIC_REPORTERS, MetricsReporter.class);
} else {
log.warn("Configuration option '{}' is not set, OAuth metrics will not be exported to JMX", STRIMZI_OAUTH_METRIC_REPORTERS);
}
return Collections.emptyList();
JmxReporter reporter = new JmxReporter();
reporter.configure(configMap);
return Collections.singletonList(reporter);
}

private AbstractConfig initKafkaConfig() {
Expand Down Expand Up @@ -143,7 +152,7 @@ private Map<String, String> toMapOfStringValues(Map<String, ?> configMap) {
result.put(ent.getKey(), ((Class<?>) val).getCanonicalName());
} else if (val instanceof List) {
String stringVal = ((List<?>) val).stream().map(String::valueOf).collect(Collectors.joining(","));
if (!"".equals(stringVal)) {
if (!stringVal.isEmpty()) {
result.put(ent.getKey(), stringVal);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ private void testKafkaClientConfig() throws Exception {
}

List<String> lines = logReader.readNext();
Assert.assertTrue("Contains log warning", TestUtil.checkLogForRegex(lines, "OAuth metrics will not be exported to JMX"));
Assert.assertTrue("Instantiated JMX Reporter", TestUtil.checkLogForRegex(lines, "reporters: \\[org\\.apache\\.kafka\\.common\\.metrics\\.JmxReporter"));

producerProps.put(GlobalConfig.STRIMZI_OAUTH_METRIC_REPORTERS, "org.apache.kafka.common.metrics.JmxReporter");

producerProps.put(GlobalConfig.STRIMZI_OAUTH_METRIC_REPORTERS, "io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter");

// Clear the configured metrics in order to trigger reinitialisation
Services.close();
Expand All @@ -106,8 +107,7 @@ private void testKafkaClientConfig() throws Exception {
}

lines = logReader.readNext();
Assert.assertFalse("Contains log warning", TestUtil.checkLogForRegex(lines, "OAuth metrics will not be exported to JMX"));
Assert.assertTrue("Instantiated JMX Reporter", TestUtil.checkLogForRegex(lines, "reporters: \\[org.apache.kafka.common.metrics.JmxReporter"));
Assert.assertTrue("Instantiated TestMetricsReporter", TestUtil.checkLogForRegex(lines, "reporters: \\[io\\.strimzi\\.testsuite\\.oauth\\.common\\.metrics\\.TestMetricsReporter[^,]+\\]"));
}

private void initJaas(Map<String, String> oauthConfig, Properties additionalProps) throws Exception {
Expand Down Expand Up @@ -248,18 +248,15 @@ private void testOAuthMetricReporters() throws IOException, InterruptedException
reinitServices(configs);
Services.getInstance().getMetrics();
LOG.info("Waiting for: reporters: TestMetricsReporter"); // Make sure to not repeat the below condition in the string here
logReader.waitFor("reporters: \\[io\\.strimzi\\.testsuite\\.oauth\\.common\\.metrics\\.TestMetricsReporter");
logReader.waitFor("reporters: \\[io\\.strimzi\\.testsuite\\.oauth\\.common\\.metrics\\.TestMetricsReporter[^,]+\\]");

configs.remove("strimzi.oauth.metric.reporters");
configs.put("metric.reporters", "io.strimzi.testsuite.oauth.common.metrics.TestMetricsReporter");
reinitServices(configs);
// No reporter will be instantiated
// JmxReporter will be instantiated, 'metric.reporters' setting is ignored
Services.getInstance().getMetrics();
LOG.info("Waiting for reporters warning"); // Make sure to not repeat the below condition in the string here
logReader.waitFor("OAuth metrics will not be exported to JMX");

LOG.info("Waiting for: reporters: <empty>"); // Make sure to not repeat the below condition in the string here
logReader.waitFor("reporters: \\[\\]");
LOG.info("Waiting for: reporters: JmxReporter"); // Make sure to not repeat the below condition in the string here
logReader.waitFor("reporters: \\[org\\.apache\\.kafka\\.common\\.metrics\\.JmxReporter");

configs.put("strimzi.oauth.metric.reporters", "org.apache.kafka.common.metrics.JmxReporter");
reinitServices(configs);
Expand Down

0 comments on commit 611c52c

Please sign in to comment.