Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Send globalClusterLabels as tags in InfluxDBSink (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaszkrawiec authored Mar 22, 2021
1 parent 0b29942 commit fa29087
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl

def buildPoint(m: MetricValue): Point = {
val point = Point.measurement(m.definition.name)
for(globalLabels <- clusterGlobalLabels.get(m.clusterName);
(tagName, tagValue) <- globalLabels)
point.tag(tagName, tagValue)
val fields = m.definition.labels zip m.labels
fields.foreach { field => point.tag(field._1, field._2) }
point.addField("value", m.value)
return point.build()
point.build()
}

override def remove(m: RemoveMetric): Unit = {
Expand All @@ -92,8 +95,8 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl
def connect(): InfluxDB =
{
val url = sinkConfig.endpoint + ":" + sinkConfig.port
if (!sinkConfig.username.isEmpty) return InfluxDBFactory.connect(url, sinkConfig.username, sinkConfig.password)
else return InfluxDBFactory.connect(url)
if (!sinkConfig.username.isEmpty) InfluxDBFactory.connect(url, sinkConfig.username, sinkConfig.password)
else InfluxDBFactory.connect(url)
}

def createDatabase() =
Expand All @@ -103,7 +106,7 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl

def successQueryHandler(): Consumer[QueryResult] =
{
return new Consumer[QueryResult] {
new Consumer[QueryResult] {
override def accept(result:QueryResult): Unit = {
logger.info(result.toString())
}
Expand All @@ -112,7 +115,7 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl

def failQueryHandler(): Consumer[Throwable] =
{
return new Consumer[Throwable] {
new Consumer[Throwable] {
override def accept(throwable:Throwable): Unit = {
handlingFailure(throwable)
}
Expand All @@ -121,7 +124,7 @@ class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGl

def createExceptionHandler(): BiConsumer[Iterable[Point], Throwable] =
{
return new BiConsumer[Iterable[Point], Throwable] {
new BiConsumer[Iterable[Point], Throwable] {
override def accept(failedPoints:Iterable[Point], throwable:Throwable): Unit = {
handlingFailure(throwable)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.testcontainers.utility.DockerImageName

import scala.jdk.CollectionConverters._

class InfluxDBPusherSinkTest extends FixtureAnyFreeSpec
class InfluxDBPusherSinkSpec extends FixtureAnyFreeSpec
with Matchers
with BeforeAndAfterAll
with TryValues
Expand Down Expand Up @@ -87,5 +87,18 @@ class InfluxDBPusherSinkTest extends FixtureAnyFreeSpec
eventually { doQuery(whitelist_query) should (include("cluster_test") and include("group_test") and include("100")) }
eventually { doQuery(blacklist_query) should (not include("cluster_test") and not include("group_test") and not include("101")) }
}

"should get the correct global label names and values as tags" in { fixture =>
val clustersGlobalValuesMap = Map(
"cluster_test" -> Map("environment" -> "integration")
)

val sink = InfluxDBPusherSink(new InfluxDBPusherSinkConfig("InfluxDBPusherSink", List("kafka_consumergroup_group_max_lag"), ConfigFactory.parseMap(mapAsJavaMap(fixture.properties))), clustersGlobalValuesMap)
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster_test", "group_test", 100))

val tagQuery = "SELECT * FROM kafka_consumergroup_group_max_lag where environment='integration'"

eventually { doQuery(tagQuery) should (include("cluster_test") and include("group_test") and include("100") and include("environment") and include("integration"))}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.scalatest.Outcome
import org.scalatest.freespec.FixtureAnyFreeSpec
import org.scalatest.matchers.should.Matchers

class PrometheusEndpointSinkTest extends FixtureAnyFreeSpec with Matchers {
class PrometheusEndpointSinkSpec extends FixtureAnyFreeSpec with Matchers {

case class Fixture(server: HTTPServer, registry: CollectorRegistry, config: Config)
type FixtureParam = Fixture
Expand Down

0 comments on commit fa29087

Please sign in to comment.