-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[ENH] Setup GC tool and service (#3451)
## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Parse config and setup tracing ## Test plan None ## Documentation Changes None
- Loading branch information
1 parent
e4be53a
commit bf522a5
Showing
10 changed files
with
265 additions
and
4 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
[package] | ||
name = "garbage_collector" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
[lib] | ||
name = "garbage_collector_library" | ||
path = "src/lib.rs" | ||
|
||
[[bin]] | ||
name = "garbage_collector_service" | ||
path = "src/bin/garbage_collector_service.rs" | ||
|
||
[[bin]] | ||
name = "garbage_collector_tool" | ||
path = "src/bin/garbage_collector_tool.rs" | ||
|
||
[dependencies] | ||
async-trait = { workspace = true } | ||
chrono = { workspace = true } | ||
clap = { workspace = true } | ||
figment = { workspace = true } | ||
serde = { workspace = true } | ||
serde_json = { workspace = true } | ||
tokio = { workspace = true } | ||
uuid = { workspace = true } | ||
|
||
tracing = { workspace = true } | ||
tracing-bunyan-formatter = { workspace = true } | ||
tracing-opentelemetry = { workspace = true } | ||
tracing-subscriber = { workspace = true } | ||
opentelemetry = { workspace = true } | ||
opentelemetry-otlp = { workspace = true } | ||
opentelemetry-http = { workspace = true } | ||
opentelemetry_sdk = { workspace = true } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
service_name: "garbage-collector" | ||
otel_endpoint: "http://otel-collector:4317" | ||
cutoff_time_hours: 12 # GC all versions created at time < now() - cutoff_time_hours | ||
max_collections_to_gc: 1000 # Maximum number of collections to GC in one run | ||
gc_interval_mins: 120 # Run GC every x mins | ||
disallow_collection_names: [] | ||
sysdb_connection: | ||
host: "sysdb.chroma" | ||
port: 50051 | ||
connect_timeout_ms: 60000 | ||
request_timeout_ms: 60000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
use garbage_collector_library::garbage_collector_service_entrypoint; | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
garbage_collector_service_entrypoint().await | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#[tokio::main] | ||
async fn main() { | ||
// TODO: Parse the arguments and define an entry point. | ||
todo!() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
use figment::providers::{Env, Format, Yaml}; | ||
|
||
const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml"; | ||
|
||
#[derive(Debug, serde::Deserialize)] | ||
pub(super) struct GarbageCollectorConfig { | ||
pub(super) service_name: String, | ||
pub(super) otel_endpoint: String, | ||
cutoff_time_hours: u32, | ||
max_collections_to_gc: u32, | ||
gc_interval_mins: u32, | ||
disallow_collection_names: Vec<String>, | ||
sysdb_connection: SysdbConnectionConfig, | ||
} | ||
|
||
#[derive(Debug, serde::Deserialize)] | ||
pub(super) struct SysdbConnectionConfig { | ||
host: String, | ||
port: u32, | ||
connect_timeout_ms: u32, | ||
request_timeout_ms: u32, | ||
} | ||
|
||
impl GarbageCollectorConfig { | ||
pub(super) fn load() -> Self { | ||
Self::load_from_path(DEFAULT_CONFIG_PATH) | ||
} | ||
|
||
pub(super) fn load_from_path(path: &str) -> Self { | ||
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them. | ||
// Excluding our own environment variables, which are prefixed with CHROMA_. | ||
let mut f = figment::Figment::from( | ||
Env::prefixed("CHROMA_GC_").map(|k| k.as_str().replace("__", ".").into()), | ||
); | ||
if std::path::Path::new(path).exists() { | ||
f = figment::Figment::from(Yaml::file(path)).merge(f); | ||
} | ||
let res = f.extract(); | ||
match res { | ||
Ok(config) => config, | ||
Err(e) => panic!("Error loading config: {}", e), | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
#[test] | ||
fn test_load_config() { | ||
let config = GarbageCollectorConfig::load(); | ||
assert_eq!(config.service_name, "garbage-collector"); | ||
assert_eq!(config.otel_endpoint, "http://otel-collector:4317"); | ||
assert_eq!(config.cutoff_time_hours, 12); | ||
assert_eq!(config.max_collections_to_gc, 1000); | ||
assert_eq!(config.gc_interval_mins, 120); | ||
let empty_vec: Vec<String> = vec![]; | ||
assert_eq!(config.disallow_collection_names, empty_vec); | ||
assert_eq!(config.sysdb_connection.host, "sysdb.chroma"); | ||
assert_eq!(config.sysdb_connection.port, 50051); | ||
assert_eq!(config.sysdb_connection.connect_timeout_ms, 60000); | ||
assert_eq!(config.sysdb_connection.request_timeout_ms, 60000); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
use config::GarbageCollectorConfig; | ||
use opentelemetry_config::init_otel_tracing; | ||
|
||
mod config; | ||
mod opentelemetry_config; | ||
|
||
pub async fn garbage_collector_service_entrypoint() { | ||
// Parse configuration. Configuration includes sysdb connection details, and | ||
// otel details. | ||
let config = GarbageCollectorConfig::load(); | ||
// Enable OTEL tracing. | ||
init_otel_tracing(&config.service_name, &config.otel_endpoint); | ||
|
||
// Start a background task to periodically check for garbage. | ||
todo!() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
// NOTE: This is a copy of the file of the same name in the | ||
// worker/src/tracing/opentelemetry_config.rs file. | ||
// | ||
// Keep them in-sync manually. | ||
|
||
use opentelemetry::global; | ||
use opentelemetry::trace::TracerProvider; | ||
use opentelemetry_otlp::WithExportConfig; | ||
use opentelemetry_sdk::propagation::TraceContextPropagator; | ||
use tracing_bunyan_formatter::BunyanFormattingLayer; | ||
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer}; | ||
|
||
pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { | ||
println!( | ||
"Registering jaeger subscriber for {} at endpoint {}", | ||
service_name, otel_endpoint | ||
); | ||
let resource = opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new( | ||
"service.name", | ||
service_name.clone(), | ||
)]); | ||
// Prepare trace config. | ||
let trace_config = opentelemetry_sdk::trace::Config::default() | ||
.with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn) | ||
.with_resource(resource.clone()); | ||
|
||
// Prepare tracer. | ||
let tracing_span_exporter = opentelemetry_otlp::SpanExporter::builder() | ||
.with_tonic() | ||
.with_endpoint(otel_endpoint) | ||
.build() | ||
.expect("could not build span exporter for tracing"); | ||
let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder() | ||
.with_batch_exporter(tracing_span_exporter, opentelemetry_sdk::runtime::Tokio) | ||
.with_config(trace_config) | ||
.build(); | ||
let tracer = tracer_provider.tracer(service_name.clone()); | ||
|
||
// Prepare meter. | ||
let metric_exporter = opentelemetry_otlp::MetricExporter::builder() | ||
.with_tonic() | ||
.with_endpoint(otel_endpoint) | ||
.build() | ||
.expect("could not build metric exporter"); | ||
|
||
let reader = opentelemetry_sdk::metrics::PeriodicReader::builder( | ||
metric_exporter, | ||
opentelemetry_sdk::runtime::Tokio, | ||
) | ||
.build(); | ||
let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() | ||
.with_reader(reader) | ||
.with_resource(resource.clone()) | ||
.build(); | ||
global::set_meter_provider(meter_provider); | ||
|
||
// Layer for adding our configured tracer. | ||
// Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end. | ||
let exporter_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer) | ||
.with_filter(tracing_subscriber::filter::LevelFilter::TRACE); | ||
// Layer for printing spans to stdout. Only print INFO logs by default. | ||
let stdout_layer = | ||
BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout) | ||
.with_filter(tracing_subscriber::filter::LevelFilter::INFO); | ||
// global filter layer. Don't filter anything at above trace at the global layer for gc. | ||
let global_layer = EnvFilter::new("none,garbage_collector_library=trace"); | ||
|
||
// Create subscriber. | ||
let subscriber = tracing_subscriber::registry() | ||
.with(global_layer) | ||
.with(stdout_layer) | ||
.with(exporter_layer); | ||
global::set_text_map_propagator(TraceContextPropagator::new()); | ||
tracing::subscriber::set_global_default(subscriber) | ||
.expect("Set global default subscriber failed"); | ||
println!("Set global subscriber for {}", service_name); | ||
|
||
// Add panics to tracing | ||
let prev_hook = std::panic::take_hook(); | ||
std::panic::set_hook(Box::new(move |panic_info| { | ||
let payload = panic_info.payload(); | ||
|
||
#[allow(clippy::manual_map)] | ||
let payload = if let Some(s) = payload.downcast_ref::<&str>() { | ||
Some(&**s) | ||
} else if let Some(s) = payload.downcast_ref::<String>() { | ||
Some(s.as_str()) | ||
} else { | ||
None | ||
}; | ||
|
||
tracing::error!( | ||
panic.payload = payload, | ||
panic.location = panic_info.location().map(|l| l.to_string()), | ||
panic.backtrace = tracing::field::display(std::backtrace::Backtrace::capture()), | ||
"A panic occurred" | ||
); | ||
|
||
prev_hook(panic_info); | ||
})); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters