Skip to content

Commit 98f44ae

Browse files
sunng87dsmith3197
andauthored
feat(new sink): Adding greptimedb metrics sink (vectordotdev#17198)
This patch adds [greptimedb](https://github.com/greptimeteam/greptimedb) sink. For now, we use greptimedb's git repository for adding its client. We will eventually split it from repo and publish it to crates.io, I hope this is not a road blocker at the moment. TODOs: - [x] website docs - [x] spellcheck cleanup - [x] integration test verification --------- Co-authored-by: Doug Smith <[email protected]>
1 parent 3989791 commit 98f44ae

File tree

22 files changed

+1465
-1
lines changed

22 files changed

+1465
-1
lines changed

.github/actions/spelling/allow.txt

+3
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,9 @@ gpg
296296
gql
297297
grafana
298298
graphiql
299+
greptime
300+
greptimecloud
301+
greptimedb
299302
gvisor
300303
gws
301304
hadoop

.github/workflows/changes.yml

+3
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ on:
7070
value: ${{ jobs.int_tests.outputs.fluent }}
7171
gcp:
7272
value: ${{ jobs.int_tests.outputs.gcp }}
73+
greptimedb:
74+
value: ${{ jobs.int_tests.outputs.greptimedb }}
7375
humio:
7476
value: ${{ jobs.int_tests.outputs.humio }}
7577
http-client:
@@ -194,6 +196,7 @@ jobs:
194196
eventstoredb: ${{ steps.filter.outputs.eventstoredb }}
195197
fluent: ${{ steps.filter.outputs.fluent }}
196198
gcp: ${{ steps.filter.outputs.gcp }}
199+
greptimedb: ${{ steps.filter.outputs.greptimedb }}
197200
humio: ${{ steps.filter.outputs.humio }}
198201
http-client: ${{ steps.filter.outputs.http-client }}
199202
influxdb: ${{ steps.filter.outputs.influxdb }}

.github/workflows/integration-comment.yml

+8
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@ jobs:
221221
max_attempts: 3
222222
command: bash scripts/ci-integration-test.sh gcp
223223

224+
- name: greptimedb
225+
if: ${{ contains(github.event.comment.body, '/ci-run-integration-greptimedb') || contains(github.event.comment.body, '/ci-run-all') }}
226+
uses: nick-fields/retry@v2
227+
with:
228+
timeout_minutes: 30
229+
max_attempts: 3
230+
command: bash scripts/ci-integration-test.sh greptimedb
231+
224232
- name: humio
225233
if: ${{ contains(github.event.comment.body, '/ci-run-integration-humio') || contains(github.event.comment.body, '/ci-run-all') }}
226234
uses: nick-fields/retry@v2

.github/workflows/integration.yml

+9
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ jobs:
6767
|| needs.changes.outputs.eventstoredb == 'true'
6868
|| needs.changes.outputs.fluent == 'true'
6969
|| needs.changes.outputs.gcp == 'true'
70+
|| needs.changes.outputs.greptimedb == 'true'
7071
|| needs.changes.outputs.humio == 'true'
7172
|| needs.changes.outputs.http-client == 'true'
7273
|| needs.changes.outputs.influxdb == 'true'
@@ -251,6 +252,14 @@ jobs:
251252
max_attempts: 3
252253
command: bash scripts/ci-integration-test.sh gcp
253254

255+
- if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.greptimedb == 'true' }}
256+
name: greptimedb
257+
uses: nick-fields/retry@v2
258+
with:
259+
timeout_minutes: 30
260+
max_attempts: 3
261+
command: bash scripts/ci-integration-test.sh greptimedb
262+
254263
- if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.humio == 'true' }}
255264
name: humio
256265
uses: nick-fields/retry@v2

Cargo.lock

+34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+7
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,9 @@ tui = { version = "0.19.0", optional = true, default-features = false, features
233233
hex = { version = "0.4.3", default-features = false, optional = true }
234234
sha2 = { version = "0.10.7", default-features = false, optional = true }
235235

236+
# GreptimeDB
237+
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true }
238+
236239
# External libs
237240
arc-swap = { version = "1.6", default-features = false, optional = true }
238241
async-compression = { version = "0.4.1", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
@@ -646,6 +649,7 @@ sinks-metrics = [
646649
"sinks-blackhole",
647650
"sinks-console",
648651
"sinks-datadog_metrics",
652+
"sinks-greptimedb",
649653
"sinks-humio",
650654
"sinks-influxdb",
651655
"sinks-kafka",
@@ -679,6 +683,7 @@ sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serd
679683
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
680684
sinks-file = ["dep:async-compression"]
681685
sinks-gcp = ["dep:base64", "gcp"]
686+
sinks-greptimedb = ["dep:greptimedb-client"]
682687
sinks-honeycomb = []
683688
sinks-http = []
684689
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
@@ -739,6 +744,7 @@ all-integration-tests = [
739744
"gcp-cloud-storage-integration-tests",
740745
"gcp-integration-tests",
741746
"gcp-pubsub-integration-tests",
747+
"greptimedb-integration-tests",
742748
"http-client-integration-tests",
743749
"humio-integration-tests",
744750
"influxdb-integration-tests",
@@ -800,6 +806,7 @@ fluent-integration-tests = ["docker", "sources-fluent"]
800806
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
801807
gcp-integration-tests = ["sinks-gcp"]
802808
gcp-pubsub-integration-tests = ["sinks-gcp", "sources-gcp_pubsub"]
809+
greptimedb-integration-tests = ["sinks-greptimedb"]
803810
humio-integration-tests = ["sinks-humio"]
804811
http-client-integration-tests = ["sources-http_client"]
805812
influxdb-integration-tests = ["sinks-influxdb"]

LICENSE-3rdparty.csv

+2
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ graphql-parser,https://github.com/graphql-rust/graphql-parser,MIT OR Apache-2.0,
224224
graphql_client,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <[email protected]>
225225
graphql_client_codegen,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <[email protected]>
226226
graphql_query_derive,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <[email protected]>
227+
greptime-proto,https://github.com/GreptimeTeam/greptime-proto,Apache-2.0,The greptime-proto Authors
228+
greptimedb-client,https://github.com/GreptimeTeam/greptimedb-client-rust,Apache-2.0,The greptimedb-client Authors
227229
grok,https://github.com/daschl/grok,Apache-2.0,Michael Nitschinger <[email protected]>
228230
h2,https://github.com/hyperium/h2,MIT,"Carl Lerche <[email protected]>, Sean McArthur <[email protected]>"
229231
hash_hasher,https://github.com/Fraser999/Hash-Hasher,Apache-2.0 OR MIT,Fraser Hutchison <[email protected]>

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ test-behavior: test-behavior-transforms test-behavior-formats test-behavior-conf
332332
test-integration: ## Runs all integration tests
333333
test-integration: test-integration-amqp test-integration-appsignal test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse
334334
test-integration: test-integration-databend test-integration-docker-logs test-integration-elasticsearch
335-
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-humio test-integration-http-client test-integration-influxdb
335+
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-greptimedb test-integration-humio test-integration-http-client test-integration-influxdb
336336
test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats
337337
test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar
338338
test-integration: test-integration-redis test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
version: '3'
2+
3+
services:
4+
greptimedb:
5+
image: docker.io/greptime/greptimedb:${CONFIG_VERSION}
6+
command: "standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001"
7+
healthcheck:
8+
test: "curl -f localhost:4000/health || exit 1"
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
features:
2+
- greptimedb-integration-tests
3+
4+
test_filter: '::greptimedb::'
5+
6+
runner:
7+
env:
8+
GREPTIMEDB_ENDPOINT: greptimedb:4001
9+
GREPTIMEDB_HTTP: http://greptimedb:4000
10+
11+
matrix:
12+
version: ['latest']

src/sinks/greptimedb/batch.rs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use vector_core::{
2+
event::{Metric, MetricValue},
3+
stream::batcher::limiter::ItemBatchSize,
4+
};
5+
6+
use super::request_builder::{
7+
DISTRIBUTION_QUANTILES, DISTRIBUTION_STAT_FIELD_COUNT, SUMMARY_STAT_FIELD_COUNT,
8+
};
9+
10+
const F64_BYTE_SIZE: usize = 8;
11+
const I64_BYTE_SIZE: usize = 8;
12+
13+
#[derive(Default)]
14+
pub(super) struct GreptimeDBBatchSizer;
15+
16+
impl GreptimeDBBatchSizer {
17+
pub(super) fn estimated_size_of(&self, item: &Metric) -> usize {
18+
// Metric name.
19+
item.series().name().name().len()
20+
// Metric namespace, with an additional 1 to account for the namespace separator.
21+
+ item.series().name().namespace().map(|s| s.len() + 1).unwrap_or(0)
22+
// Metric tags, with an additional 1 per tag to account for the tag key/value separator.
23+
+ item.series().tags().map(|t| {
24+
t.iter_all().map(|(k, v)| {
25+
k.len() + 1 + v.map(|v| v.len()).unwrap_or(0)
26+
})
27+
.sum()
28+
})
29+
.unwrap_or(0)
30+
// timestamp
31+
+ I64_BYTE_SIZE
32+
+
33+
// value size
34+
match item.value() {
35+
MetricValue::Counter { .. } | MetricValue::Gauge { .. } | MetricValue::Set { ..} => F64_BYTE_SIZE,
36+
MetricValue::Distribution { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
37+
MetricValue::AggregatedHistogram { buckets, .. } => F64_BYTE_SIZE * (buckets.len() + SUMMARY_STAT_FIELD_COUNT),
38+
MetricValue::AggregatedSummary { quantiles, .. } => F64_BYTE_SIZE * (quantiles.len() + SUMMARY_STAT_FIELD_COUNT),
39+
MetricValue::Sketch { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
40+
}
41+
}
42+
}
43+
44+
impl ItemBatchSize<Metric> for GreptimeDBBatchSizer {
45+
fn size(&self, item: &Metric) -> usize {
46+
self.estimated_size_of(item)
47+
}
48+
}
+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use chrono::{DateTime, Duration, Utc};
2+
use futures::stream;
3+
use vector_core::event::{Event, Metric, MetricKind, MetricValue};
4+
use vector_core::metric_tags;
5+
6+
use crate::sinks::util::test::load_sink;
7+
use crate::{
8+
config::{SinkConfig, SinkContext},
9+
test_util::{
10+
components::{run_and_assert_sink_compliance, SINK_TAGS},
11+
trace_init,
12+
},
13+
};
14+
15+
use super::GreptimeDBConfig;
16+
17+
#[tokio::test]
18+
async fn test_greptimedb_sink() {
19+
trace_init();
20+
let cfg = format!(
21+
r#"endpoint= "{}"
22+
"#,
23+
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned())
24+
);
25+
26+
let (config, _) = load_sink::<GreptimeDBConfig>(&cfg).unwrap();
27+
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();
28+
29+
let query_client = query_client();
30+
31+
// Drop the table and data inside
32+
let _ = query_client
33+
.get(&format!(
34+
"{}/v1/sql",
35+
std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned())
36+
))
37+
.query(&[("sql", "DROP TABLE ns_my_counter")])
38+
.send()
39+
.await
40+
.unwrap();
41+
42+
let base_time = Utc::now();
43+
let events: Vec<_> = (0..10).map(|idx| create_event(idx, base_time)).collect();
44+
run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await;
45+
46+
let query_response = query_client
47+
.get(&format!(
48+
"{}/v1/sql",
49+
std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned())
50+
))
51+
.query(&[("sql", "SELECT region, val FROM ns_my_counter")])
52+
.send()
53+
.await
54+
.unwrap()
55+
.text()
56+
.await
57+
.expect("Fetch json from greptimedb failed");
58+
let result: serde_json::Value =
59+
serde_json::from_str(&query_response).expect("Invalid json returned from greptimedb query");
60+
assert_eq!(
61+
result
62+
.pointer("/output/0/records/rows")
63+
.and_then(|v| v.as_array())
64+
.expect("Error getting greptimedb response array")
65+
.len(),
66+
10
67+
)
68+
}
69+
70+
fn query_client() -> reqwest::Client {
71+
reqwest::Client::builder().build().unwrap()
72+
}
73+
74+
fn create_event(i: i32, base_time: DateTime<Utc>) -> Event {
75+
Event::Metric(
76+
Metric::new(
77+
"my_counter".to_owned(),
78+
MetricKind::Incremental,
79+
MetricValue::Counter { value: i as f64 },
80+
)
81+
.with_namespace(Some("ns"))
82+
.with_tags(Some(metric_tags!(
83+
"region" => "us-west-1",
84+
"production" => "true",
85+
)))
86+
.with_timestamp(Some(base_time + Duration::seconds(i as i64))),
87+
)
88+
}

0 commit comments

Comments
 (0)