diff --git a/src/main/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSink.scala index 5020b493..93aa34ac 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSink.scala @@ -71,10 +71,13 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl def buildPoint(m: MetricValue): Point = { val point = Point.measurement(m.definition.name) + for(globalLabels <- clusterGlobalLabels.get(m.clusterName); + (tagName, tagValue) <- globalLabels) + point.tag(tagName, tagValue) val fields = m.definition.labels zip m.labels fields.foreach { field => point.tag(field._1, field._2) } point.addField("value", m.value) - return point.build() + point.build() } override def remove(m: RemoveMetric): Unit = { @@ -92,8 +95,8 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl def connect(): InfluxDB = { val url = sinkConfig.endpoint + ":" + sinkConfig.port - if (!sinkConfig.username.isEmpty) return InfluxDBFactory.connect(url, sinkConfig.username, sinkConfig.password) - else return InfluxDBFactory.connect(url) + if (!sinkConfig.username.isEmpty) InfluxDBFactory.connect(url, sinkConfig.username, sinkConfig.password) + else InfluxDBFactory.connect(url) } def createDatabase() = @@ -103,7 +106,7 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl def successQueryHandler(): Consumer[QueryResult] = { - return new Consumer[QueryResult] { + new Consumer[QueryResult] { override def accept(result:QueryResult): Unit = { logger.info(result.toString()) } @@ -112,7 +115,7 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl def failQueryHandler(): Consumer[Throwable] = { - return new Consumer[Throwable] { + new Consumer[Throwable] { override def accept(throwable:Throwable): Unit = { handlingFailure(throwable) } @@ -121,7 +124,7 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl def createExceptionHandler(): BiConsumer[Iterable[Point], Throwable] = { - return new BiConsumer[Iterable[Point], Throwable] { + new BiConsumer[Iterable[Point], Throwable] { override def accept(failedPoints:Iterable[Point], throwable:Throwable): Unit = { handlingFailure(throwable) } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkTest.scala b/src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkSpec.scala similarity index 100% rename from src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkTest.scala rename to src/test/scala/com/lightbend/kafkalagexporter/GraphiteEndpointSinkSpec.scala diff --git a/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkTest.scala b/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkSpec.scala similarity index 78% rename from src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkTest.scala rename to src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkSpec.scala index ed66205c..b166d981 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkTest.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSinkSpec.scala @@ -17,7 +17,7 @@ import org.testcontainers.utility.DockerImageName import scala.jdk.CollectionConverters._ -class InfluxDBPusherSinkTest extends FixtureAnyFreeSpec +class InfluxDBPusherSinkSpec extends FixtureAnyFreeSpec with Matchers with BeforeAndAfterAll with TryValues @@ -87,5 +87,18 @@ class InfluxDBPusherSinkTest extends FixtureAnyFreeSpec eventually { doQuery(whitelist_query) should (include("cluster_test") and include("group_test") and include("100")) } eventually { doQuery(blacklist_query) should (not include("cluster_test") and not include("group_test") and not include("101")) } } + + "should get the correct global label names and values as tags" in { fixture => + val clustersGlobalValuesMap = Map( + "cluster_test" -> Map("environment" -> "integration") + ) + + val sink = InfluxDBPusherSink(new InfluxDBPusherSinkConfig("InfluxDBPusherSink", List("kafka_consumergroup_group_max_lag"), ConfigFactory.parseMap(mapAsJavaMap(fixture.properties))), clustersGlobalValuesMap) + sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster_test", "group_test", 100)) + + val tagQuery = "SELECT * FROM kafka_consumergroup_group_max_lag where environment='integration'" + + eventually { doQuery(tagQuery) should (include("cluster_test") and include("group_test") and include("100") and include("environment") and include("integration"))} + } } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkTest.scala b/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkSpec.scala similarity index 99% rename from src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkTest.scala rename to src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkSpec.scala index a519228a..f4abab96 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkTest.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSinkSpec.scala @@ -16,7 +16,7 @@ import org.scalatest.Outcome import org.scalatest.freespec.FixtureAnyFreeSpec import org.scalatest.matchers.should.Matchers -class PrometheusEndpointSinkTest extends FixtureAnyFreeSpec with Matchers { +class PrometheusEndpointSinkSpec extends FixtureAnyFreeSpec with Matchers { case class Fixture(server: HTTPServer, registry: CollectorRegistry, config: Config) type FixtureParam = Fixture