Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Seng globalClusterLabels as tags in InfluxDBSink
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszkrawiec committed Mar 18, 2021
1 parent 0b29942 commit 1770c57
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl

def buildPoint(m: MetricValue): Point = {
val point = Point.measurement(m.definition.name)
clusterGlobalLabels.get(m.clusterName)
.foreach(globalLabels => globalLabels.foreach(
tag => { point.tag(tag._1, tag._2) }
))
val fields = m.definition.labels zip m.labels
fields.foreach { field => point.tag(field._1, field._2) }
point.addField("value", m.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,21 @@ class InfluxDBPusherSinkTest extends FixtureAnyFreeSpec
eventually { doQuery(whitelist_query) should (include("cluster_test") and include("group_test") and include("100")) }
eventually { doQuery(blacklist_query) should (not include("cluster_test") and not include("group_test") and not include("101")) }
}

"should get the correct global label names and values as tags" in { fixture =>
val clustersGlobalValuesMap = Map(
"cluster_test" -> Map("environment" -> "integration")
)

val sink = InfluxDBPusherSink(new InfluxDBPusherSinkConfig("InfluxDBPusherSink", List("kafka_consumergroup_group_max_lag"), ConfigFactory.parseMap(mapAsJavaMap(fixture.properties))), clustersGlobalValuesMap)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster_test", "group_test", 100))

val port = fixture.port
val url = s"http://localhost:$port"

val tagQuery = "SELECT * FROM kafka_consumergroup_group_max_lag where environment='integration'"

eventually { doQuery(tagQuery) should (include("cluster_test") and include("group_test") and include("100") and include("environment") and include("integration"))}
}
}
}

0 comments on commit 1770c57

Please sign in to comment.