Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flags to control telemetry behaviors #25849

Merged
merged 4 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3";
/// The default bind address for the HTTP API.
pub const DEFAULT_HTTP_BIND_ADDR: &str = "0.0.0.0:8181";

pub const DEFAULT_TELMETRY_ENDPOINT: &str = "https://telemetry.v3.influxdata.com";
pub const DEFAULT_TELEMETRY_ENDPOINT: &str = "https://telemetry.v3.influxdata.com";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -325,6 +325,26 @@ pub struct Config {
action
)]
pub force_snapshot_mem_threshold: MemorySize,

/// Disable sending telemetry data to telemetry.v3.influxdata.com.
#[clap(
long = "disable-telemetry-upload",
env = "INFLUXDB3_TELEMETRY_DISABLE_UPLOAD",
default_value_t = false,
hide = true,
action
)]
pub disable_telemetry_upload: bool,

/// Send telemetry data to the specified endpoint.
#[clap(
long = "telemetry-endpoint",
env = "INFLUXDB3_TELEMETRY_ENDPOINT",
default_value = DEFAULT_TELEMETRY_ENDPOINT,
hide = true,
action
)]
pub telemetry_endpoint: String,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -538,7 +558,8 @@ pub async fn command(config: Config) -> Result<()> {
catalog.instance_id(),
num_cpus,
Some(Arc::clone(&write_buffer_impl.persisted_files())),
DEFAULT_TELMETRY_ENDPOINT,
config.telemetry_endpoint.as_str(),
config.disable_telemetry_upload,
)
.await;

Expand Down Expand Up @@ -593,7 +614,8 @@ async fn setup_telemetry_store(
instance_id: Arc<str>,
num_cpus: usize,
persisted_files: Option<Arc<PersistedFiles>>,
telemetry_endpoint: &'static str,
telemetry_endpoint: &str,
disable_upload: bool,
) -> Arc<TelemetryStore> {
let os = std::env::consts::OS;
let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
Expand All @@ -605,16 +627,22 @@ async fn setup_telemetry_store(
.unwrap_or(ObjectStoreType::Memory);
let storage_type = obj_store_type.as_str();

TelemetryStore::new(
instance_id,
Arc::from(os),
Arc::from(influx_version),
Arc::from(storage_type),
num_cpus,
persisted_files.map(|p| p as _),
telemetry_endpoint,
)
.await
if disable_upload {
debug!("Initializing TelemetryStore with upload disabled.");
TelemetryStore::new_without_background_runners(persisted_files.map(|p| p as _))
} else {
debug!("Initializing TelemetryStore with upload enabled for {telemetry_endpoint}.");
TelemetryStore::new(
instance_id,
Arc::from(os),
Arc::from(influx_version),
Arc::from(storage_type),
num_cpus,
persisted_files.map(|p| p as _),
telemetry_endpoint.to_string(),
)
.await
}
}

async fn background_buffer_checker(
Expand Down
118 changes: 117 additions & 1 deletion influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::{

use crate::{ConfigProvider, TestServer};
use assert_cmd::cargo::CommandCargoExt;
use assert_cmd::Command as AssertCmd;
use observability_deps::tracing::debug;
use pretty_assertions::assert_eq;
use serde_json::{json, Value};
use test_helpers::assert_contains;
use test_helpers::tempfile::NamedTempFile;
#[cfg(feature = "system-py")]
use test_helpers::tempfile::TempDir;
use test_helpers::{assert_contains, assert_not_contains};

const WRITE_REPORTS_PLUGIN_CODE: &str = r#"
def process_writes(influxdb3_local, table_batches, args=None):
Expand Down Expand Up @@ -107,6 +108,121 @@ fn create_plugin_file(code: &str) -> NamedTempFile {
file
}

#[test_log::test(tokio::test)]
async fn test_telemetry_disabled_with_debug_msg() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--object-store",
"memory",
];

let expected_disabled: &str = "Initializing TelemetryStore with upload disabled.";

// validate we get a debug message indicating upload disabled
let output = AssertCmd::cargo_bin("influxdb3")
.unwrap()
.args(serve_args)
.arg("-vv")
.arg("--disable-telemetry-upload")
.timeout(std::time::Duration::from_millis(500))
.assert()
.failure()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it failure() because process starts and hangs around till 500 millis and then shuts down? Just wondered if it could've used the TestServer to spin up the server and got hold of the stdout for your assertions. If you'd already looked into it and it didn't work for you that's fine, but it has the mechanism built in to kill the process after the test without timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it failure() because process starts and hangs around till 500 millis and then shuts down?

I added the failure() call just to be clear in this test case that we expect a non-zero exit code when the process is killed using the timeout method. It's not strictly necessary for this test case (we only really care about checking the stdout), just me trying to be explicit.

Just wondered if it could've used the TestServer to spin up the server and got hold of the stdout for your assertions.

I did look through the TestSever code a little, and it looked to me like it was intended more for API-level integration testing than what I'm doing here, which is asserting on the CLI's stdout. Looking at it again, it's still not obvious how to get the stdout from it without modifying it non-trivially to preserve stdout.

it has the mechanism built in to kill the process after the test without timeout.

Yeah, I see the it has the mechanism built in to kill the process after the test without timeout using the kill method, but that would require additional boilerplate to have the test case sleep 500 ms before killing it (assuming we also added more code to preserve and retrieve the TestServer stdout).

To be clear, the timeout method isn't used to kill the process after the test -- the test happens after we've killed the process. We kill it after a constant 500 ms because that's what I thought would reliably be enough time to reach the stdout debug messages that are being asserted on (a too small timeout could cause this test to be flaky since we would be less likely to reach that debug message). I know it's not ideal asserting on stdout to test whether the telemetry store is running in but I think the only other option would be to actually set up a listening http server and point it at that, and test that something reached it from the TelemetryStore (or not as the test case might call for), but that seemed like it might no be worth the additional effort...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did think the test happens after killing the process as well with the timeout - just wasn't sure if there was easy wins in extending TestServer.

.get_output()
.stdout
.clone();
let output = String::from_utf8(output).expect("must be able to convert output to String");
assert_contains!(output, expected_disabled);
}

#[test_log::test(tokio::test)]
async fn test_telemetry_disabled() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--object-store",
"memory",
];

let expected_disabled: &str = "Initializing TelemetryStore with upload disabled.";
// validate no message when debug output disabled
let output = AssertCmd::cargo_bin("influxdb3")
.unwrap()
.args(serve_args)
.arg("-v")
.arg("--disable-telemetry-upload")
.timeout(std::time::Duration::from_millis(500))
.assert()
.failure()
.get_output()
.stdout
.clone();
let output = String::from_utf8(output).expect("must be able to convert output to String");
assert_not_contains!(output, expected_disabled);
}

#[test_log::test(tokio::test)]
async fn test_telemetry_enabled_with_debug_msg() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--object-store",
"memory",
];

let expected_enabled: &str =
"Initializing TelemetryStore with upload enabled for http://localhost:9999.";

// validate debug output shows which endpoint we are hitting when telemetry enabled
let output = AssertCmd::cargo_bin("influxdb3")
.unwrap()
.args(serve_args)
.arg("-vv")
.arg("--telemetry-endpoint")
.arg("http://localhost:9999")
.timeout(std::time::Duration::from_millis(500))
.assert()
.failure()
.get_output()
.stdout
.clone();
let output = String::from_utf8(output).expect("must be able to convert output to String");
assert_contains!(output, expected_enabled);
}

#[test_log::test(tokio::test)]
async fn test_telementry_enabled() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--object-store",
"memory",
];

let expected_enabled: &str =
"Initializing TelemetryStore with upload enabled for http://localhost:9999.";

// validate no telemetry endpoint reported when debug output not enabled
let output = AssertCmd::cargo_bin("influxdb3")
.unwrap()
.args(serve_args)
.arg("-v")
.arg("--telemetry-endpoint")
.arg("http://localhost:9999")
.timeout(std::time::Duration::from_millis(500))
.assert()
.failure()
.get_output()
.stdout
.clone();
let output = String::from_utf8(output).expect("must be able to convert output to String");
assert_not_contains!(output, expected_enabled);
}

#[test_log::test(tokio::test)]
async fn test_show_databases() {
let server = TestServer::spawn().await;
Expand Down
1 change: 1 addition & 0 deletions influxdb3/tests/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl TestServer {
let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command");
let command = command
.arg("serve")
.arg("--disable-telemetry-upload")
// bind to port 0 to get a random port assigned:
.args(["--http-bind", "0.0.0.0:0"])
.args(["--wal-flush-interval", "10ms"])
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_telemetry/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(crate) struct TelemetryPayload {
/// This function runs in the background and if any call fails
/// there is no retrying mechanism and it is ok to lose a few samples
pub(crate) async fn send_telemetry_in_background(
full_url: &'static str,
full_url: String,
store: Arc<TelemetryStore>,
duration_secs: Duration,
) -> tokio::task::JoinHandle<()> {
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_telemetry/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl TelemetryStore {
storage_type: Arc<str>,
cores: usize,
persisted_files: Option<Arc<dyn ParquetMetrics>>,
telemetry_endpoint: &'static str,
telemetry_endpoint: String,
) -> Arc<Self> {
debug!(
instance_id = ?instance_id,
Expand Down Expand Up @@ -322,7 +322,7 @@ mod tests {
Arc::from("Memory"),
10,
Some(parqet_file_metrics),
"http://localhost/telemetry",
"http://localhost/telemetry".to_string(),
)
.await;
tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
Loading