diff --git a/Cargo.lock b/Cargo.lock index a5cdfc8a31511..7b8d730ffe388 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2853,6 +2853,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.9" @@ -3622,6 +3633,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matchers" version = "0.1.0" @@ -4244,6 +4261,16 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + [[package]] name = "opentelemetry-http" version = "0.6.0" @@ -4254,7 +4281,7 @@ dependencies = [ "bytes", "http", "isahc", - "opentelemetry", + "opentelemetry 0.17.0", ] [[package]] @@ -4267,21 +4294,99 @@ dependencies = [ "http", "isahc", "lazy_static", - "opentelemetry", + "opentelemetry 0.17.0", "opentelemetry-http", - "opentelemetry-semantic-conventions", + "opentelemetry-semantic-conventions 0.9.0", "thiserror", "thrift", "tokio", ] +[[package]] +name = "opentelemetry-otlp" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca" +dependencies = [ + "async-trait", + "futures", + "futures-util", + "http", + "opentelemetry 0.19.0", + "opentelemetry-proto", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c" +dependencies = [ + "futures", + "futures-util", + "opentelemetry 0.19.0", + "prost", + "tonic", +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd" dependencies = [ - "opentelemetry", + "opentelemetry 0.17.0", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24e33428e6bf08c6f7fcea4ddb8e358fab0fe48ab877a87c70c6ebe20f673ce5" +dependencies = [ + "opentelemetry 0.19.0", +] + +[[package]] +name = "opentelemetry_api" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", ] [[package]] @@ -5627,7 +5732,7 @@ dependencies = [ "libc", "madsim-tokio", "nix 0.25.1", - "opentelemetry", + "opentelemetry 0.17.0", "opentelemetry-jaeger", "parking_lot 0.12.1", "prometheus", @@ -5638,7 +5743,7 @@ dependencies = [ "tokio-stream", "toml 0.7.3", "tracing", - "tracing-opentelemetry", + "tracing-opentelemetry 0.17.4", "tracing-subscriber", "workspace-hack", ] @@ -5737,6 +5842,7 @@ dependencies = [ "num-integer", "num-traits", "number_prefix", + "opentelemetry 0.19.0", "parking_lot 0.12.1", "parse-display", "paste", @@ -5768,6 +5874,7 @@ dependencies = [ "tinyvec", "toml 0.7.3", "tracing", + "tracing-opentelemetry 0.19.0", "tracing-subscriber", "twox-hash", "url", @@ -6400,12 +6507,17 @@ dependencies = [ "console", "console-subscriber", "futures", + "hostname", "madsim-tokio", + "opentelemetry 0.19.0", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions 0.11.0", "parking_lot 0.12.1", "pprof", "prometheus", "risingwave_common", "tracing", + "tracing-opentelemetry 0.19.0", "tracing-subscriber", "workspace-hack", ] @@ -6600,6 +6712,7 @@ dependencies = [ "thiserror", "tokio-retry", "tracing", + "tracing-futures", "uuid", "workspace-hack", "xorf", @@ -8176,6 +8289,8 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ + "futures", + "futures-task", "pin-project", "tracing", ] @@ -8198,7 +8313,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f" dependencies = [ "once_cell", - "opentelemetry", + "opentelemetry 0.17.0", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00a39dcf9bfc1742fa4d6215253b33a6e474be78275884c216fc2a06267b3600" +dependencies = [ + "once_cell", + "opentelemetry 0.19.0", "tracing", "tracing-core", "tracing-log", @@ -8852,6 +8981,7 @@ dependencies = [ "futures", "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -8913,6 +9043,7 @@ dependencies = [ "tower", "tracing", "tracing-core", + "tracing-futures", "tracing-subscriber", "url", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 8580af3422e66..53a162be775ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,8 +106,6 @@ opt-level = 3 opt-level = 3 [profile.ci-dev.package."indextree"] opt-level = 3 -[profile.ci-dev.package."task_stats_alloc"] -opt-level = 3 # The profile used for deterministic simulation tests in CI. # The simulator can only run single-threaded, so optimization is required to make the running time diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 8eb55f354c0a8..6150a0eb1c336 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -79,7 +79,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 12 + timeout_in_minutes: 15 - label: "end-to-end test for opendal (parallel)" command: "ci/scripts/e2e-test-parallel-for-opendal.sh -p ci-dev" diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 4ca2348ac4070..b2a2fbc899c15 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -93,7 +93,7 @@ message Barrier { ResumeMutation resume = 8; } // Used for tracing. - bytes span = 2; + map tracing_context = 2; // Whether this barrier do checkpoint bool checkpoint = 9; diff --git a/proto/stream_service.proto b/proto/stream_service.proto index d7dceec70c2ad..dd964a6fa5908 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -68,6 +68,7 @@ message InjectBarrierResponse { message BarrierCompleteRequest { string request_id = 1; uint64 prev_epoch = 2; + map tracing_context = 3; } message BarrierCompleteResponse { message CreateMviewProgress { diff --git a/risedev.yml b/risedev.yml index 84b7c7dcb0e72..6f6f28cee98f3 100644 --- a/risedev.yml +++ b/risedev.yml @@ -24,7 +24,7 @@ profile: # If you want to use aws-s3, configure AK and SK in env var and enable the following lines: # - use: aws-s3 - # bucket: test-bucket + # bucket: test-bucket # If you want to create CDC source table, uncomment the following line # - use: connector-node @@ -32,6 +32,10 @@ profile: # if you want to enable etcd backend, uncomment the following lines. # - use: etcd # unsafe-no-fsync: true + + # If you want to enable tracing, uncomment the following line. + # - use: jaeger + - use: meta-node - use: compute-node - use: frontend @@ -43,9 +47,6 @@ profile: # - use: prometheus # - use: grafana - # If you want to enable tracing, uncomment the following line. - # - use: jaeger - # If you want to create source from Kafka, uncomment the following lines # Note that kafka depends on zookeeper, so zookeeper must be started beforehand. # - use: zookeeper @@ -898,6 +899,9 @@ template: # AWS s3 bucket used by the cluster provide-aws-s3: "aws-s3*" + # Jaeger used by this meta node + provide-jaeger: "jaeger*" + # Whether to enable in-memory pure KV state backend enable-in-memory-kv-state-backend: false @@ -969,6 +973,9 @@ template: # Meta-nodes used by this frontend instance provide-meta-node: "meta-node*" + # Jaeger used by this frontend instance + provide-jaeger: "jaeger*" + # If `user-managed` is true, this service will be started by user with the above config user-managed: false @@ -991,9 +998,12 @@ template: # Minio instances used by this compactor provide-minio: "minio*" - # Meta-nodes used by this compute node + # Meta-nodes used by this compactor provide-meta-node: "meta-node*" + # Jaeger used by this compator + provide-jaeger: "jaeger*" + # If `user-managed` is true, this service will be started by user with the above config user-managed: false @@ -1039,6 +1049,12 @@ template: # Dashboard listen port of Jaeger dashboard-port: 16680 + # gRPC listen address of the OTLP collector + otlp-address: "127.0.0.1" + + # gRPC listen port of the OTLP collector + otlp-port: 4317 + # Jaeger has a lot of ports open, and we don't want to make this config more complex. # So we keep the default value of jaeger instead of making it part of RiseDev config. @@ -1064,7 +1080,6 @@ template: # access key, secret key and region should be set in aws config (either by env var or .aws/config) - # Apache Kafka service kafka: # Id to be picked-up by services diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index bf3035e1cd8a8..d0fa1303782b8 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -17,6 +17,7 @@ use risingwave_compute::ComputeNodeOpts; use risingwave_ctl::CliOpts as CtlOpts; use risingwave_frontend::FrontendOpts; use risingwave_meta::MetaNodeOpts; +use risingwave_rt::{init_risingwave_logger, main_okk, LoggerSettings}; /// Define the `main` function for a component. #[macro_export] @@ -40,30 +41,30 @@ macro_rules! main { // Entry point functions. pub fn compute(opts: ComputeNodeOpts, registry: prometheus::Registry) { - risingwave_rt::init_risingwave_logger( - risingwave_rt::LoggerSettings::new().enable_tokio_console(false), + init_risingwave_logger( + LoggerSettings::new("compute").enable_tokio_console(false), registry.clone(), ); - risingwave_rt::main_okk(risingwave_compute::start(opts, registry)); + main_okk(risingwave_compute::start(opts, registry)); } pub fn meta(opts: MetaNodeOpts, registry: prometheus::Registry) { - risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry); - risingwave_rt::main_okk(risingwave_meta::start(opts)); + init_risingwave_logger(LoggerSettings::new("meta"), registry); + main_okk(risingwave_meta::start(opts)); } pub fn frontend(opts: FrontendOpts, registry: prometheus::Registry) { - risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry); - risingwave_rt::main_okk(risingwave_frontend::start(opts)); + init_risingwave_logger(LoggerSettings::new("frontend"), registry); + main_okk(risingwave_frontend::start(opts)); } pub fn compactor(opts: CompactorOpts, registry: prometheus::Registry) { - risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry); - risingwave_rt::main_okk(risingwave_compactor::start(opts)); + init_risingwave_logger(LoggerSettings::new("compactor"), registry); + main_okk(risingwave_compactor::start(opts)); } pub fn ctl(opts: CtlOpts, registry: prometheus::Registry) { - risingwave_rt::init_risingwave_logger(risingwave_rt::LoggerSettings::new(), registry); + init_risingwave_logger(LoggerSettings::new("ctl"), registry); // Note: Use a simple current thread runtime for ctl. // When there's a heavy workload, multiple thread runtime seems to respond slowly. May need diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index c1dec0b3ea727..b2a531a624ab9 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -189,7 +189,7 @@ fn main() -> Result<()> { } fn playground(opts: PlaygroundOpts, registry: prometheus::Registry) { - let settings = risingwave_rt::LoggerSettings::new() + let settings = risingwave_rt::LoggerSettings::new("playground") .enable_tokio_console(false) .with_target("risingwave_storage", Level::WARN); risingwave_rt::init_risingwave_logger(settings, registry); diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index b28929937887c..e588c5227f5e3 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -50,6 +50,7 @@ memcomparable = { version = "0.2", features = ["decimal"] } num-integer = "0.1" num-traits = "0.2" number_prefix = "0.4.0" +opentelemetry = { version = "0.19", default-features = false } parking_lot = "0.12" parse-display = "0.6" paste = "1" @@ -90,6 +91,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ toml = "0.7" tonic = { version = "0.2", package = "madsim-tonic" } tracing = "0.1" +tracing-opentelemetry = "0.19" tracing-subscriber = "0.3.16" twox-hash = "1" url = "2" diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f6cf68165f192..14aed7e61ad68 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -305,10 +305,6 @@ pub struct StreamingConfig { #[serde(default)] pub actor_runtime_worker_threads_num: Option, - /// Enable reporting tracing information to jaeger. - #[serde(default = "default::streaming::enable_jaegar_tracing")] - pub enable_jaeger_tracing: bool, - /// Enable async stack tracing through `await-tree` for risectl. #[serde(default = "default::streaming::async_stack_trace")] pub async_stack_trace: AsyncStackTraceOption, @@ -739,10 +735,6 @@ mod default { 10000 } - pub fn enable_jaegar_tracing() -> bool { - false - } - pub fn async_stack_trace() -> AsyncStackTraceOption { AsyncStackTraceOption::default() } diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index a78daa01e8de2..ca2fb6e068481 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -44,7 +44,8 @@ impl Epoch { pub fn next(self) -> Self { let physical_now = Epoch::physical_now(); let prev_physical_time = self.physical_time(); - match physical_now.cmp(&prev_physical_time) { + + let next_epoch = match physical_now.cmp(&prev_physical_time) { Ordering::Greater => Self::from_physical_time(physical_now), Ordering::Equal => { tracing::warn!("New generate epoch is too close to the previous one."); @@ -58,7 +59,10 @@ impl Epoch { ); Epoch(self.0 + 1) } - } + }; + + assert!(next_epoch.0 > self.0); + next_epoch } pub fn physical_time(&self) -> u64 { diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 45cf8a93cf05b..32c51207ab7f6 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -41,6 +41,7 @@ pub mod schema_check; pub mod sort_util; pub mod stream_cancel; pub mod stream_graph_visitor; +pub mod tracing; pub mod value_encoding; pub mod worker_util; diff --git a/src/common/src/util/tracing.rs b/src/common/src/util/tracing.rs new file mode 100644 index 0000000000000..fb889da9c5b40 --- /dev/null +++ b/src/common/src/util/tracing.rs @@ -0,0 +1,56 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry::sdk::propagation::TraceContextPropagator; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +/// Context for tracing used for propagating tracing information in a distributed system. +/// +/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information. +#[derive(Debug, Clone)] +pub struct TracingContext(opentelemetry::Context); + +type Propagator = TraceContextPropagator; + +impl TracingContext { + /// Create a new tracing context from a tracing span. + pub fn from_span(span: &tracing::Span) -> Self { + Self(span.context()) + } + + /// Create a no-op tracing context. + pub fn none() -> Self { + Self(opentelemetry::Context::new()) + } + + /// Attach the given span as a child of the context. Returns the attached span. + pub fn attach(&self, span: tracing::Span) -> tracing::Span { + span.set_parent(self.0.clone()); + span + } + + pub fn to_protobuf(&self) -> HashMap { + let mut fields = HashMap::new(); + Propagator::new().inject_context(&self.0, &mut fields); + fields + } + + pub fn from_protobuf(fields: &HashMap) -> Self { + let context = Propagator::new().extract(fields); + Self(context) + } +} diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index e265875af6dee..4089f1bd37dc2 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -113,11 +113,6 @@ struct OverrideConfigOpts { #[override_opts(path = storage.file_cache.dir)] pub file_cache_dir: Option, - /// Enable reporting tracing information to jaeger. - #[clap(long, env = "RW_ENABLE_JAEGER_TRACING", default_missing_value = None)] - #[override_opts(path = streaming.enable_jaeger_tracing)] - pub enable_jaeger_tracing: Option, - /// Enable async stack tracing through `await-tree` for risectl. #[clap(long, env = "RW_ASYNC_STACK_TRACE", value_enum)] #[override_opts(path = streaming.async_stack_trace)] diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 00ac75068d9a0..3f3bba6cfb57b 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use itertools::Itertools; use risingwave_common::error::tonic_err; +use risingwave_common::util::tracing::TracingContext; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo; @@ -27,6 +28,7 @@ use risingwave_stream::error::StreamError; use risingwave_stream::executor::Barrier; use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; use tonic::{Code, Request, Response, Status}; +use tracing::Instrument; #[derive(Clone)] pub struct StreamServiceImpl { @@ -184,8 +186,13 @@ impl StreamService for StreamServiceImpl { // Must finish syncing data written in the epoch before respond back to ensure persistence // of the state. let synced_sstables = if checkpoint { + let span = TracingContext::from_protobuf(&req.tracing_context).attach( + tracing::info_span!("sync_epoch", prev_epoch = req.prev_epoch), + ); + self.mgr .sync_epoch(req.prev_epoch) + .instrument(span) .instrument_await(format!("sync_epoch (epoch {})", req.prev_epoch)) .await? } else { diff --git a/src/config/example.toml b/src/config/example.toml index df204708e7c25..478876fbfd6ee 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -39,7 +39,6 @@ batch_chunk_size = 1024 [streaming] in_flight_barrier_nums = 10000 -enable_jaeger_tracing = false async_stack_trace = "ReleaseVerbose" unique_user_stream_errors = 10 diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 3d7d617dc23a7..09113684f3480 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -19,7 +19,6 @@ use futures::future::try_join_all; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; -use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; @@ -35,6 +34,7 @@ use risingwave_rpc_client::StreamClientPoolRef; use uuid::Uuid; use super::info::BarrierActorInfo; +use super::trace::TracedEpoch; use crate::barrier::CommandChanges; use crate::manager::{FragmentManagerRef, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; @@ -212,8 +212,8 @@ pub struct CommandContext { // TODO: this could be stale when we are calling `post_collect`, check if it matters pub info: Arc, - pub prev_epoch: Epoch, - pub curr_epoch: Epoch, + pub prev_epoch: TracedEpoch, + pub curr_epoch: TracedEpoch, pub command: Command, @@ -228,8 +228,8 @@ impl CommandContext { fragment_manager: FragmentManagerRef, client_pool: StreamClientPoolRef, info: BarrierActorInfo, - prev_epoch: Epoch, - curr_epoch: Epoch, + prev_epoch: TracedEpoch, + curr_epoch: TracedEpoch, command: Command, checkpoint: bool, source_manager: SourceManagerRef, @@ -543,7 +543,7 @@ where // execution of the next command of `Update`, as some newly created operators may // immediately initialize their states on that barrier. Some(Mutation::Pause(..)) => { - self.wait_epoch_commit(self.prev_epoch.0).await?; + self.wait_epoch_commit(self.prev_epoch.value().0).await?; } _ => {} diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 0da9b8801b20a..171afe27d1d69 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -25,6 +25,7 @@ use prometheus::HistogramTimer; use risingwave_common::bail; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; +use risingwave_common::util::tracing::TracingContext; use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -63,9 +64,11 @@ mod notifier; mod progress; mod recovery; mod schedule; +mod trace; pub use self::command::{Command, Reschedule}; pub use self::schedule::BarrierScheduler; +pub use self::trace::TracedEpoch; /// Status of barrier manager. enum BarrierManagerStatus { @@ -211,11 +214,9 @@ where } fn cancel_command(&mut self, cancelled_command: TrackingCommand) { - if let Some(index) = self - .command_ctx_queue - .iter() - .position(|x| x.command_ctx.prev_epoch == cancelled_command.context.prev_epoch) - { + if let Some(index) = self.command_ctx_queue.iter().position(|x| { + x.command_ctx.prev_epoch.value() == cancelled_command.context.prev_epoch.value() + }) { self.command_ctx_queue.remove(index); self.remove_changes(cancelled_command.context.command.changes()); } @@ -317,9 +318,11 @@ where /// Enqueue a barrier command, and init its state to `InFlight`. fn enqueue_command(&mut self, command_ctx: Arc>, notifiers: Vec) { let timer = self.metrics.barrier_latency.start_timer(); + self.command_ctx_queue.push_back(EpochNode { timer: Some(timer), wait_commit_timer: None, + state: InFlight, command_ctx, notifiers, @@ -338,7 +341,7 @@ where if let Some(node) = self .command_ctx_queue .iter_mut() - .find(|x| x.command_ctx.prev_epoch.0 == prev_epoch) + .find(|x| x.command_ctx.prev_epoch.value().0 == prev_epoch) { assert!(matches!(node.state, InFlight)); node.wait_commit_timer = Some(wait_commit_timer); @@ -395,7 +398,7 @@ where pub fn contains_epoch(&self, epoch: u64) -> bool { self.command_ctx_queue .iter() - .any(|x| x.command_ctx.prev_epoch.0 == epoch) + .any(|x| x.command_ctx.prev_epoch.value().0 == epoch) } /// After some command is committed, the changes will be applied to the meta store so we can @@ -448,6 +451,7 @@ pub struct EpochNode { timer: Option, /// The timer of `barrier_wait_commit_latency` wait_commit_timer: Option, + /// Whether this barrier is in-flight or completed. state: BarrierEpochState, /// Context of this command to generate barrier and do some post jobs. @@ -550,15 +554,14 @@ where if self.enable_recovery { // handle init, here we simply trigger a recovery process to achieve the consistency. We // may need to avoid this when we have more state persisted in meta store. - let new_epoch = state.in_flight_prev_epoch.next(); - assert!(new_epoch > state.in_flight_prev_epoch); - state.in_flight_prev_epoch = new_epoch; + + // XXX(bugen): why we need this? + let new_epoch = state.in_flight_prev_epoch().next(); self.set_status(BarrierManagerStatus::Recovering).await; - let new_epoch = self.recovery(state.in_flight_prev_epoch).await; - state.in_flight_prev_epoch = new_epoch; + let new_epoch = self.recovery(new_epoch).await; state - .update_inflight_prev_epoch(self.env.meta_store()) + .update_inflight_prev_epoch(self.env.meta_store(), new_epoch) .await .unwrap(); } else if self.fragment_manager.has_any_table_fragments().await { @@ -634,20 +637,18 @@ where } = self.scheduled_barriers.pop_or_default().await; let info = self.resolve_actor_info(checkpoint_control, &command).await; - let prev_epoch = state.in_flight_prev_epoch; + let prev_epoch = state.in_flight_prev_epoch(); + let new_epoch = prev_epoch.next(); - state.in_flight_prev_epoch = new_epoch; - assert!( - new_epoch > prev_epoch, - "new{:?},prev{:?}", - new_epoch, - prev_epoch - ); state - .update_inflight_prev_epoch(self.env.meta_store()) + .update_inflight_prev_epoch(self.env.meta_store(), new_epoch.clone()) .await .unwrap(); + prev_epoch.span().in_scope(|| { + tracing::info!(target: "rw_tracing", epoch = new_epoch.value().0, "new barrier enqueued"); + }); + let command_ctx = Arc::new(CommandContext::new( self.fragment_manager.clone(), self.env.stream_client_pool_ref(), @@ -672,7 +673,7 @@ where command_context: Arc>, barrier_complete_tx: &UnboundedSender, ) { - let prev_epoch = command_context.prev_epoch.0; + let prev_epoch = command_context.prev_epoch.value().0; let result = self.inject_barrier_inner(command_context.clone()).await; match result { Ok(node_need_collect) => { @@ -716,12 +717,12 @@ where let request_id = Uuid::new_v4().to_string(); let barrier = Barrier { epoch: Some(risingwave_pb::data::Epoch { - curr: command_context.curr_epoch.0, - prev: command_context.prev_epoch.0, + curr: command_context.curr_epoch.value().0, + prev: command_context.prev_epoch.value().0, }), mutation, - // TODO(chi): add distributed tracing - span: vec![], + tracing_context: TracingContext::from_span(command_context.curr_epoch.span()) + .to_protobuf(), checkpoint: command_context.checkpoint, passed_actors: vec![], }; @@ -756,7 +757,10 @@ where command_context: Arc>, barrier_complete_tx: UnboundedSender, ) { - let prev_epoch = command_context.prev_epoch.0; + let prev_epoch = command_context.prev_epoch.value().0; + let tracing_context = + TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf(); + let info = command_context.info.clone(); let client_pool = client_pool_ref.deref(); let collect_futures = info.node_map.iter().filter_map(|(node_id, node)| { @@ -765,11 +769,13 @@ where None } else { let request_id = Uuid::new_v4().to_string(); + let tracing_context = tracing_context.clone(); async move { let client = client_pool.get(node).await?; let request = BarrierCompleteRequest { request_id, prev_epoch, + tracing_context, }; tracing::trace!( target: "events::meta::barrier::barrier_complete", @@ -869,10 +875,9 @@ where self.set_status(BarrierManagerStatus::Recovering).await; let mut tracker = self.tracker.lock().await; *tracker = CreateMviewProgressTracker::new(); - let new_epoch = self.recovery(state.in_flight_prev_epoch).await; - state.in_flight_prev_epoch = new_epoch; + let new_epoch = self.recovery(state.in_flight_prev_epoch()).await; state - .update_inflight_prev_epoch(self.env.meta_store()) + .update_inflight_prev_epoch(self.env.meta_store(), new_epoch) .await .unwrap(); self.set_status(BarrierManagerStatus::Running).await; @@ -887,7 +892,7 @@ where node: &mut EpochNode, checkpoint_control: &mut CheckpointControl, ) -> MetaResult<()> { - let prev_epoch = node.command_ctx.prev_epoch.0; + let prev_epoch = node.command_ctx.prev_epoch.value().0; match &mut node.state { Completed(resps) => { // We must ensure all epochs are committed in ascending order, @@ -906,7 +911,11 @@ where } else if checkpoint { new_snapshot = self .hummock_manager - .commit_epoch(node.command_ctx.prev_epoch.0, synced_ssts, sst_to_worker) + .commit_epoch( + node.command_ctx.prev_epoch.value().0, + synced_ssts, + sst_to_worker, + ) .await?; } else { new_snapshot = Some(self.hummock_manager.update_current_epoch(prev_epoch)); diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 17292a8580973..d7d3e715a497f 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -210,7 +210,7 @@ impl CreateMviewProgressTracker { return Some(command); } - let ddl_epoch = command.context.curr_epoch; + let ddl_epoch = command.context.curr_epoch.value(); for &actor in &actors { self.actor_map.insert(actor, ddl_epoch); } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 19a01c9113191..7ed812584403a 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -18,7 +18,6 @@ use std::time::{Duration, Instant}; use futures::future::try_join_all; use itertools::Itertools; -use risingwave_common::util::epoch::Epoch; use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan::barrier::Mutation; use risingwave_pb::stream_plan::AddMutation; @@ -30,6 +29,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, warn}; use uuid::Uuid; +use super::TracedEpoch; use crate::barrier::command::CommandContext; use crate::barrier::info::BarrierActorInfo; use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager}; @@ -107,7 +107,7 @@ where } /// Recovery the whole cluster from the latest epoch. - pub(crate) async fn recovery(&self, prev_epoch: Epoch) -> Epoch { + pub(crate) async fn recovery(&self, prev_epoch: TracedEpoch) -> TracedEpoch { // pause discovery of all connector split changes and trigger config change. let _source_pause_guard = self.source_manager.paused.lock().await; @@ -124,7 +124,7 @@ where // get recovered. let recovery_timer = self.metrics.recovery_latency.start_timer(); let (new_epoch, _responses) = tokio_retry::Retry::spawn(retry_strategy, || async { - let recovery_result: MetaResult<(Epoch, Vec)> = try { + let recovery_result: MetaResult<(TracedEpoch, Vec)> = try { let mut info = self.resolve_actor_info_for_recovery().await; let mut new_epoch = prev_epoch.next(); @@ -166,7 +166,7 @@ where self.env.stream_client_pool_ref(), info, prev_epoch, - new_epoch, + new_epoch.clone(), command, true, self.source_manager.clone(), diff --git a/src/meta/src/barrier/trace.rs b/src/meta/src/barrier/trace.rs new file mode 100644 index 0000000000000..6fd00f624ef92 --- /dev/null +++ b/src/meta/src/barrier/trace.rs @@ -0,0 +1,60 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::util::epoch::Epoch; + +/// A wrapper of [`Epoch`] with tracing span, used for issuing epoch-based tracing from the barrier +/// manager on the meta service. This structure is free to clone, which'll extend the lifetime of +/// the underlying span. +/// +/// - A new [`TracedEpoch`] is created when the barrier manager is going to inject a new barrier. +/// The span will be created automatically and the start time is recorded. +/// - Then, the previous and the current [`TracedEpoch`]s are stored in the command context. +/// - When the barrier is successfully collected and committed, the command context will be dropped, +/// then the previous span will be automatically closed. +#[derive(Debug, Clone)] +pub struct TracedEpoch { + epoch: Epoch, + span: tracing::Span, +} + +impl TracedEpoch { + /// Create a new [`TracedEpoch`] with the given `epoch`. + pub fn new(epoch: Epoch) -> Self { + // The span created on the meta service is always a root span for epoch-level tracing. + let span = tracing::info_span!( + parent: None, + "epoch", + "otel.name" = format!("Epoch {}", epoch.0), + epoch = epoch.0 + ); + + Self { epoch, span } + } + + /// Create a new [`TracedEpoch`] with the next epoch. + pub fn next(&self) -> Self { + Self::new(self.epoch.next()) + } + + /// Retrieve the epoch value. + pub fn value(&self) -> Epoch { + self.epoch + } + + /// Retrieve the tracing span. + pub fn span(&self) -> &tracing::Span { + &self.span + } +} diff --git a/src/meta/src/model/barrier.rs b/src/meta/src/model/barrier.rs index 6e6d4020903cf..2a002a85bb37e 100644 --- a/src/meta/src/model/barrier.rs +++ b/src/meta/src/model/barrier.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_common::util::epoch::INVALID_EPOCH; +use crate::barrier::TracedEpoch; use crate::storage::{MetaStore, MetaStoreError, MetaStoreResult, DEFAULT_COLUMN_FAMILY}; /// `BarrierManagerState` defines the necessary state of `GlobalBarrierManager`, this will be stored /// persistently to meta store. Add more states when needed. pub struct BarrierManagerState { /// The last sent `prev_epoch` - pub in_flight_prev_epoch: Epoch, + in_flight_prev_epoch: TracedEpoch, } const BARRIER_MANAGER_STATE_KEY: &[u8] = b"barrier_manager_state"; @@ -39,11 +40,15 @@ impl BarrierManagerState { Err(e) => panic!("{:?}", e), }; Self { - in_flight_prev_epoch, + in_flight_prev_epoch: TracedEpoch::new(in_flight_prev_epoch), } } - pub async fn update_inflight_prev_epoch(&self, store: &S) -> MetaStoreResult<()> + pub async fn update_inflight_prev_epoch( + &mut self, + store: &S, + new_epoch: TracedEpoch, + ) -> MetaStoreResult<()> where S: MetaStore, { @@ -51,9 +56,16 @@ impl BarrierManagerState { .put_cf( DEFAULT_COLUMN_FAMILY, BARRIER_MANAGER_STATE_KEY.to_vec(), - memcomparable::to_vec(&self.in_flight_prev_epoch.0).unwrap(), + memcomparable::to_vec(&new_epoch.value().0).unwrap(), ) - .await - .map_err(Into::into) + .await?; + + self.in_flight_prev_epoch = new_epoch; + + Ok(()) + } + + pub fn in_flight_prev_epoch(&self) -> TracedEpoch { + self.in_flight_prev_epoch.clone() } } diff --git a/src/risedevtool/jaeger.toml b/src/risedevtool/jaeger.toml index 37d3cbcbbcd86..b10f03f8593ef 100644 --- a/src/risedevtool/jaeger.toml +++ b/src/risedevtool/jaeger.toml @@ -3,7 +3,7 @@ extend = "common.toml" [env] JAEGER_SYSTEM = "${SYSTEM}" JAEGER_DOWNLOAD_PATH = "${PREFIX_TMP}/jaeger.tar.gz" -JAEGER_VERSION = "1.30.0" +JAEGER_VERSION = "1.46.0" JAEGER_RELEASE = "jaeger-${JAEGER_VERSION}-${JAEGER_SYSTEM}" JAEGER_DOWNLOAD_TAR_GZ = "https://github.com/jaegertracing/jaeger/releases/download/v${JAEGER_VERSION}/${JAEGER_RELEASE}.tar.gz" diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 7e49cf4da6ef0..ee7225afb987f 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -68,6 +68,8 @@ pub struct MetaNodeConfig { pub provide_compute_node: Option>, pub provide_compactor: Option>, + pub provide_jaeger: Option>, + pub provide_aws_s3: Option>, pub provide_minio: Option>, pub provide_opendal: Option>, @@ -90,6 +92,8 @@ pub struct FrontendConfig { pub health_check_port: u16, pub provide_meta_node: Option>, + pub provide_jaeger: Option>, + pub user_managed: bool, } @@ -110,6 +114,8 @@ pub struct CompactorConfig { pub provide_minio: Option>, pub provide_meta_node: Option>, + pub provide_jaeger: Option>, + pub user_managed: bool, pub max_concurrent_task_number: u64, pub compaction_worker_threads_number: Option, @@ -210,6 +216,8 @@ pub struct JaegerConfig { pub id: String, pub dashboard_address: String, pub dashboard_port: u16, + pub otlp_address: String, + pub otlp_port: u16, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/risedevtool/src/task/compactor_service.rs b/src/risedevtool/src/task/compactor_service.rs index a8d5952a4ad45..afd5e3c9a9a9a 100644 --- a/src/risedevtool/src/task/compactor_service.rs +++ b/src/risedevtool/src/task/compactor_service.rs @@ -20,7 +20,7 @@ use std::process::Command; use anyhow::Result; use crate::util::{get_program_args, get_program_env_cmd, get_program_name}; -use crate::{add_meta_node, CompactorConfig, ExecuteContext, Task}; +use crate::{add_jaeger_endpoint, add_meta_node, CompactorConfig, ExecuteContext, Task}; pub struct CompactorService { config: CompactorConfig, @@ -68,6 +68,9 @@ impl CompactorService { let provide_meta_node = config.provide_meta_node.as_ref().unwrap(); add_meta_node(provide_meta_node, cmd)?; + let provide_jaeger = config.provide_jaeger.as_ref().unwrap(); + add_jaeger_endpoint(provide_jaeger, cmd)?; + Ok(()) } } diff --git a/src/risedevtool/src/task/compute_node_service.rs b/src/risedevtool/src/task/compute_node_service.rs index dabad23c9ee61..4509c5da54fa3 100644 --- a/src/risedevtool/src/task/compute_node_service.rs +++ b/src/risedevtool/src/task/compute_node_service.rs @@ -16,11 +16,11 @@ use std::env; use std::path::{Path, PathBuf}; use std::process::Command; -use anyhow::{anyhow, Result}; +use anyhow::Result; use super::{ExecuteContext, Task}; use crate::util::{get_program_args, get_program_env_cmd, get_program_name}; -use crate::{add_meta_node, ComputeNodeConfig}; +use crate::{add_jaeger_endpoint, add_meta_node, ComputeNodeConfig}; pub struct ComputeNodeService { config: ComputeNodeConfig, @@ -69,23 +69,12 @@ impl ComputeNodeService { .arg("--role") .arg(&config.role); - let provide_jaeger = config.provide_jaeger.as_ref().unwrap(); - match provide_jaeger.len() { - 0 => {} - 1 => { - cmd.arg("--enable-jaeger-tracing"); - } - other_size => { - return Err(anyhow!( - "{} Jaeger instance found in config, but only 1 is needed", - other_size - )) - } - } - let provide_meta_node = config.provide_meta_node.as_ref().unwrap(); add_meta_node(provide_meta_node, cmd)?; + let provide_jaeger = config.provide_jaeger.as_ref().unwrap(); + add_jaeger_endpoint(provide_jaeger, cmd)?; + Ok(()) } } diff --git a/src/risedevtool/src/task/frontend_service.rs b/src/risedevtool/src/task/frontend_service.rs index cf917c9d64190..b4d0efd62ac5f 100644 --- a/src/risedevtool/src/task/frontend_service.rs +++ b/src/risedevtool/src/task/frontend_service.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use super::{ExecuteContext, Task}; use crate::util::{get_program_args, get_program_env_cmd, get_program_name}; -use crate::FrontendConfig; +use crate::{add_jaeger_endpoint, FrontendConfig}; pub struct FrontendService { config: FrontendConfig, @@ -79,6 +79,9 @@ impl FrontendService { ); } + let provide_jaeger = config.provide_jaeger.as_ref().unwrap(); + add_jaeger_endpoint(provide_jaeger, cmd)?; + Ok(()) } } diff --git a/src/risedevtool/src/task/jaeger_service.rs b/src/risedevtool/src/task/jaeger_service.rs index d953dabb39be6..063450c21f694 100644 --- a/src/risedevtool/src/task/jaeger_service.rs +++ b/src/risedevtool/src/task/jaeger_service.rs @@ -55,20 +55,13 @@ impl Task for JaegerService { let mut cmd = self.jaeger()?; cmd.arg("--admin.http.host-port") .arg("127.0.0.1:14269") - .arg("--collector.grpc-server.host-port") - .arg("127.0.0.1:14250") - .arg("--collector.http-server.host-port") - .arg("127.0.0.1:14268") - .arg("--collector.queue-size") - .arg("65536") + .arg("--collector.otlp.grpc.host-port") + .arg(format!( + "{}:{}", + self.config.otlp_address, self.config.otlp_port + )) .arg("--http-server.host-port") .arg("127.0.0.1:5778") - .arg("--processor.jaeger-binary.server-host-port") - .arg("127.0.0.1:6832") - .arg("--processor.jaeger-compact.server-host-port") - .arg("127.0.0.1:6831") - .arg("--processor.zipkin-compact.server-host-port") - .arg("127.0.0.1:5775") .arg("--query.grpc-server.host-port") .arg("127.0.0.1:16685") .arg("--query.http-server.host-port") diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 06544723c123b..2da276819dd5a 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use super::{ExecuteContext, Task}; use crate::util::{get_program_args, get_program_env_cmd, get_program_name}; -use crate::{add_hummock_backend, HummockInMemoryStrategy, MetaNodeConfig}; +use crate::{add_hummock_backend, add_jaeger_endpoint, HummockInMemoryStrategy, MetaNodeConfig}; pub struct MetaNodeService { config: MetaNodeConfig, @@ -159,6 +159,9 @@ impl MetaNodeService { cmd.arg("--data-directory").arg("hummock_001"); + let provide_jaeger = config.provide_jaeger.as_ref().unwrap(); + add_jaeger_endpoint(provide_jaeger, cmd)?; + Ok(()) } } diff --git a/src/risedevtool/src/task/utils.rs b/src/risedevtool/src/task/utils.rs index 5bbd160b40d63..9f76539143172 100644 --- a/src/risedevtool/src/task/utils.rs +++ b/src/risedevtool/src/task/utils.rs @@ -17,7 +17,7 @@ use std::process::Command; use anyhow::{anyhow, Result}; use itertools::Itertools; -use crate::{AwsS3Config, MetaNodeConfig, MinioConfig, OpendalConfig}; +use crate::{AwsS3Config, JaegerConfig, MetaNodeConfig, MinioConfig, OpendalConfig}; #[allow(dead_code)] pub(crate) const DEFAULT_QUERY_LOG_PATH: &str = ".risingwave/log/"; @@ -43,6 +43,27 @@ pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) -> Ok(()) } +/// Add the jaeger endpoint to the environment variables. +pub fn add_jaeger_endpoint(provide_jaeger: &[JaegerConfig], cmd: &mut Command) -> Result<()> { + match provide_jaeger { + [] => {} + [jaeger] => { + cmd.env( + "RW_TRACING_ENDPOINT", + format!("http://{}:{}", jaeger.otlp_address, jaeger.otlp_port), + ); + } + _ => { + return Err(anyhow!( + "{} Jaeger instance found in config, but only 1 is needed", + provide_jaeger.len() + )) + } + } + + Ok(()) +} + /// Strategy for whether to enable in-memory hummock if no minio and s3 is provided. pub enum HummockInMemoryStrategy { /// Enable isolated in-memory hummock. Used by single-node configuration. diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 7125f72213974..e77839550487f 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -64,6 +64,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ ] } tokio-retry = "0.3" tracing = "0.1" +tracing-futures = { version = "0.2", features = ["futures-03"] } xorf = "0.8.1" xxhash-rust = { version = "0.8.5", features = ["xxh32", "xxh64"] } zstd = { version = "0.12", default-features = false } diff --git a/src/storage/backup/cmd/src/bin/backup_restore.rs b/src/storage/backup/cmd/src/bin/backup_restore.rs index 3941a024d0cd4..d43be432a22f0 100644 --- a/src/storage/backup/cmd/src/bin/backup_restore.rs +++ b/src/storage/backup/cmd/src/bin/backup_restore.rs @@ -21,7 +21,7 @@ fn main() -> BackupResult<()> { use clap::Parser; let opts = risingwave_meta::backup_restore::RestoreOpts::parse(); risingwave_rt::init_risingwave_logger( - risingwave_rt::LoggerSettings::new(), + risingwave_rt::LoggerSettings::new("backup_restore"), prometheus::Registry::new(), ); tokio::runtime::Builder::new_multi_thread() diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index be902b87580cd..e2c1650d5b7a5 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockReadEpoch; use tokio::time::Instant; use tracing::error; +use tracing_futures::Instrument; use super::MonitoredStorageMetrics; use crate::error::{StorageError, StorageResult}; @@ -119,10 +120,13 @@ impl MonitoredStateStore { .get_duration .with_label_values(&[table_id_label.as_str()]) .start_timer(); + let value = get_future .verbose_instrument_await("store_get") + .instrument(tracing::trace_span!("store_get")) .await .inspect_err(|e| error!("Failed in get: {:?}", e))?; + timer.observe_duration(); self.storage_metrics @@ -367,7 +371,7 @@ impl MonitoredStateStoreIter { } fn into_stream(self) -> impl StateStoreIterItemStream { - Self::into_stream_inner(self) + Self::into_stream_inner(self).instrument(tracing::trace_span!("store_iter")) } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 9ffc788a1c38f..97d2245eca0aa 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -47,7 +47,7 @@ use risingwave_storage::store::{ }; use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution}; use risingwave_storage::StateStore; -use tracing::trace; +use tracing::{trace, Instrument}; use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; use crate::cache::cache_may_stale; @@ -790,7 +790,9 @@ where // Tick the watermark buffer here because state table is expected to be committed once // per epoch. self.watermark_buffer_strategy.tick(); - self.seal_current_epoch(new_epoch.curr).await + self.seal_current_epoch(new_epoch.curr) + .instrument(tracing::info_span!("state_table_commit")) + .await } // TODO(st1page): maybe we should extract a pub struct to do it diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 6de4de3fff6a4..273b91d2c1efe 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -162,14 +162,12 @@ where async fn run_consumer(self) -> StreamResult<()> { let id = self.actor_context.id; - let span_name = format!("actor_poll_{:03}", id); - let mut span = tracing::trace_span!( - "actor_poll", - otel.name = span_name.as_str(), - next = id, - next = "Outbound", - epoch = -1 - ); + let span_name = format!("Actor {id}"); + + let new_span = + || tracing::info_span!(parent: None, "actor", "otel.name" = span_name, actor_id = id); + let mut span = new_span(); + let mut last_epoch: Option = None; let mut stream = Box::pin(Box::new(self.consumer).execute()); @@ -177,7 +175,7 @@ where let result = loop { let barrier = match stream .try_next() - .instrument(span) + .instrument(span.clone()) .instrument_await( last_epoch.map_or("Epoch ".into(), |e| format!("Epoch {}", e.curr)), ) @@ -198,14 +196,7 @@ where // Tracing related work last_epoch = Some(barrier.epoch); - - span = tracing::trace_span!( - "actor_poll", - otel.name = span_name.as_str(), - next = id, - next = "Outbound", - epoch = barrier.epoch.curr - ); + span = barrier.tracing_context().attach(new_span()); }; spawn_blocking_drop_stream(stream).await; diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 4af4a33327057..57b8a1a0d9529 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -30,7 +30,7 @@ use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate; use risingwave_pb::stream_plan::PbDispatcher; use smallvec::{smallvec, SmallVec}; use tokio::time::Instant; -use tracing::event; +use tracing::{event, Instrument}; use super::exchange::output::{new_output, BoxedOutput}; use super::Watermark; @@ -267,7 +267,18 @@ impl StreamConsumer for DispatchExecutor { Message::Barrier(ref barrier) => (Some(barrier.clone()), "dispatch_barrier"), Message::Watermark(_) => (None, "dispatch_watermark"), }; - self.inner.dispatch(msg).instrument_await(span).await?; + + let tracing_span = if let Some(_barrier) = &barrier { + tracing::info_span!("dispatch_barrier") + } else { + tracing::Span::none() + }; + + self.inner + .dispatch(msg) + .instrument(tracing_span) + .instrument_await(span) + .await?; if let Some(barrier) = barrier { yield barrier; } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 5b0fbf24597f4..b9707e6b73e99 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -28,6 +28,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, DefaultOrd, DefaultPartialOrd, ScalarImpl}; use risingwave_common::util::epoch::{Epoch, EpochPair}; +use risingwave_common::util::tracing::TracingContext; use risingwave_common::util::value_encoding::{deserialize_datum, serialize_datum}; use risingwave_connector::source::SplitImpl; use risingwave_expr::expr::BoxedExpression; @@ -242,6 +243,9 @@ pub struct Barrier { pub mutation: Option>, pub checkpoint: bool, + /// Tracing context for the **current** epoch of this barrier. + tracing_context: TracingContext, + /// The actors that this barrier has passed locally. Used for debugging only. pub passed_actors: Vec, } @@ -252,6 +256,7 @@ impl Barrier { Self { epoch: EpochPair::new_test_epoch(epoch), checkpoint: true, + tracing_context: TracingContext::none(), mutation: Default::default(), passed_actors: Default::default(), } @@ -261,6 +266,7 @@ impl Barrier { Self { epoch: EpochPair::new(epoch, prev_epoch), checkpoint: true, + tracing_context: TracingContext::none(), mutation: Default::default(), passed_actors: Default::default(), } @@ -359,6 +365,11 @@ impl Barrier { pub fn get_curr_epoch(&self) -> Epoch { Epoch(self.epoch.curr) } + + /// Retrieve the tracing context for the **current** epoch of this barrier. + pub fn tracing_context(&self) -> &TracingContext { + &self.tracing_context + } } impl PartialEq for Barrier { @@ -541,15 +552,17 @@ impl Barrier { mutation, checkpoint, passed_actors, + tracing_context, .. - }: Barrier = self.clone(); + } = self.clone(); + PbBarrier { epoch: Some(PbEpoch { curr: epoch.curr, prev: epoch.prev, }), mutation: mutation.map(|mutation| mutation.to_protobuf()), - span: vec![], + tracing_context: tracing_context.to_protobuf(), checkpoint, passed_actors, } @@ -563,11 +576,13 @@ impl Barrier { .transpose()? .map(Arc::new); let epoch = prost.get_epoch()?; + Ok(Barrier { checkpoint: prost.checkpoint, epoch: EpochPair::new(epoch.curr, epoch.prev), mutation, passed_actors: prost.get_passed_actors().clone(), + tracing_context: TracingContext::from_protobuf(&prost.tracing_context), }) } } diff --git a/src/stream/src/executor/no_op.rs b/src/stream/src/executor/no_op.rs index 88634b4f73df3..f323f95f5adc5 100644 --- a/src/stream/src/executor/no_op.rs +++ b/src/stream/src/executor/no_op.rs @@ -28,7 +28,7 @@ impl NoOpExecutor { pub fn new(ctx: ActorContextRef, input: BoxedExecutor, executor_id: u64) -> Self { Self { _ctx: ctx, - identity: format!("BarrierRecvExecutor {:X}", executor_id), + identity: format!("NoOpExecutor {:X}", executor_id), input, } } diff --git a/src/stream/src/executor/wrapper/trace.rs b/src/stream/src/executor/wrapper/trace.rs index 61d661e1fb69b..540fca1d9d772 100644 --- a/src/stream/src/executor/wrapper/trace.rs +++ b/src/stream/src/executor/wrapper/trace.rs @@ -17,50 +17,58 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; -use tracing::{event, Instrument}; +use tracing::{Instrument, Span}; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ExecutorInfo, Message, MessageStream}; use crate::task::ActorId; -/// Streams wrapped by `trace` will print data passing in the stream graph to stdout. +/// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`. #[try_stream(ok = Message, error = StreamExecutorError)] pub async fn trace( info: Arc, - input_pos: usize, + _input_pos: usize, actor_id: ActorId, - _executor_id: u64, + executor_id: u64, metrics: Arc, input: impl MessageStream, ) { - let span_name = format!("{}_{}_next", info.identity, input_pos); let actor_id_string = actor_id.to_string(); + let span_name = pretty_identity(&info.identity, actor_id, executor_id); + + let new_span = || tracing::info_span!("executor", "otel.name" = span_name, actor_id); + let mut span = new_span(); + pin_mut!(input); - while let Some(message) = input - .next() - .instrument(tracing::trace_span!( - "next", - otel.name = span_name.as_str(), - next = info.identity, - input_pos = input_pos - )) - .await - .transpose()? - { + while let Some(message) = input.next().instrument(span.clone()).await.transpose()? { if let Message::Chunk(chunk) = &message { if chunk.cardinality() > 0 { metrics .executor_row_count .with_label_values(&[&actor_id_string, &info.identity]) .inc_by(chunk.cardinality() as u64); - event!(tracing::Level::TRACE, prev = %info.identity, msg = "chunk", "input = \n{:#?}", chunk); + tracing::trace!(prev = %info.identity, msg = "chunk", "input = \n{:#?}", chunk); } } - yield message; + match &message { + Message::Chunk(_) | Message::Watermark(_) => yield message, + + Message::Barrier(_barrier) => { + // Drop the span as the inner executor has finished processing the barrier (then all + // data from the previous epoch). + let _ = std::mem::replace(&mut span, Span::none()); + + yield message; + + // Create a new span after we're called again. Now we're in a new epoch and the + // parent of the span is updated. + span = new_span(); + } + } } } @@ -92,10 +100,10 @@ pub async fn metrics( fn pretty_identity(identity: &str, actor_id: ActorId, executor_id: u64) -> String { format!( - "{} (actor {}, executor {})", + "{} (actor {}, operator {})", identity, actor_id, - executor_id as u32 // Use the lower 32 bit to match the dashboard. + executor_id as u32 // The lower 32 bit is for the operator id. ) } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 4e268ddc4e8ed..66fdfeb2864bb 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -557,9 +557,13 @@ impl LocalStreamManagerCore { // If there're multiple stateful executors in this actor, we will wrap it into a subtask. let executor = if has_stateful && is_stateful { - let (subtask, executor) = subtask::wrap(executor, actor_context.id); - subtasks.push(subtask); - executor.boxed() + // TODO(bugen): subtask does not work with tracing spans. + // let (subtask, executor) = subtask::wrap(executor, actor_context.id); + // subtasks.push(subtask); + // executor.boxed() + + let _ = subtasks; + executor } else { executor }; diff --git a/src/tests/compaction_test/src/bin/compaction.rs b/src/tests/compaction_test/src/bin/compaction.rs index 96b956638ae37..14864c7fea19a 100644 --- a/src/tests/compaction_test/src/bin/compaction.rs +++ b/src/tests/compaction_test/src/bin/compaction.rs @@ -21,7 +21,7 @@ fn main() { let opts = risingwave_compaction_test::CompactionTestOpts::parse(); risingwave_rt::init_risingwave_logger( - risingwave_rt::LoggerSettings::new(), + risingwave_rt::LoggerSettings::default(), prometheus::Registry::new(), ); diff --git a/src/tests/compaction_test/src/bin/delete_range.rs b/src/tests/compaction_test/src/bin/delete_range.rs index 28d3673392b39..e44c689c5fd69 100644 --- a/src/tests/compaction_test/src/bin/delete_range.rs +++ b/src/tests/compaction_test/src/bin/delete_range.rs @@ -21,7 +21,7 @@ fn main() { let opts = risingwave_compaction_test::CompactionTestOpts::parse(); risingwave_rt::init_risingwave_logger( - risingwave_rt::LoggerSettings::new(), + risingwave_rt::LoggerSettings::default(), prometheus::Registry::new(), ); diff --git a/src/tests/state_cleaning_test/src/bin/main.rs b/src/tests/state_cleaning_test/src/bin/main.rs index f3dcb4a8c116d..81a1db7527e83 100644 --- a/src/tests/state_cleaning_test/src/bin/main.rs +++ b/src/tests/state_cleaning_test/src/bin/main.rs @@ -154,7 +154,7 @@ async fn validate_case( #[tokio::main] async fn main() -> anyhow::Result<()> { risingwave_rt::init_risingwave_logger( - risingwave_rt::LoggerSettings::new(), + risingwave_rt::LoggerSettings::default(), prometheus::Registry::new(), ); diff --git a/src/utils/runtime/Cargo.toml b/src/utils/runtime/Cargo.toml index fd32fd4bc0615..1b75fcedc5163 100644 --- a/src/utils/runtime/Cargo.toml +++ b/src/utils/runtime/Cargo.toml @@ -19,6 +19,9 @@ await-tree = { workspace = true } console = "0.15" console-subscriber = "0.1.8" futures = { version = "0.3", default-features = false, features = ["alloc"] } +hostname = "0.3" +opentelemetry-otlp = { version = "0.12" } +opentelemetry-semantic-conventions = "0.11" parking_lot = { version = "0.12", features = ["deadlock_detection"] } pprof = { version = "0.11", features = ["flamegraph"] } prometheus = { version = "0.13" } @@ -33,7 +36,9 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs" ] } tracing = "0.1" -tracing-subscriber = { version = "0.3.16", features = ["fmt", "parking_lot", "std", "time", "local-time"] } +tracing-opentelemetry = "0.19" +tracing-subscriber = { version = "0.3", features = ["fmt", "parking_lot", "std", "time", "local-time"] } [target.'cfg(not(madsim))'.dependencies] +opentelemetry = { version = "0.19", default-features = false, features = ["rt-tokio"] } workspace-hack = { path = "../../workspace-hack" } diff --git a/src/utils/runtime/src/lib.rs b/src/utils/runtime/src/lib.rs index 47b398580d1b5..5b8bade059d87 100644 --- a/src/utils/runtime/src/lib.rs +++ b/src/utils/runtime/src/lib.rs @@ -22,8 +22,8 @@ use std::time::Duration; use futures::Future; use risingwave_common::metrics::MetricsLayer; -use tracing::Level; -use tracing_subscriber::filter::{Directive, Targets}; +use tracing::level_filters::LevelFilter as Level; +use tracing_subscriber::filter::{Directive, FilterFn, Targets}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; use tracing_subscriber::{filter, EnvFilter}; @@ -76,22 +76,26 @@ fn configure_risingwave_targets_fmt(targets: filter::Targets) -> filter::Targets // =========================================================================== pub struct LoggerSettings { + /// The name of the service. + name: String, /// Enable tokio console output. enable_tokio_console: bool, /// Enable colorful output in console. colorful: bool, + /// Override default target settings. targets: Vec<(String, tracing::metadata::LevelFilter)>, } impl Default for LoggerSettings { fn default() -> Self { - Self::new() + Self::new("risingwave") } } impl LoggerSettings { - pub fn new() -> Self { + pub fn new(name: impl Into) -> Self { Self { + name: name.into(), enable_tokio_console: false, colorful: console::colors_enabled_stderr() && console::colors_enabled(), targets: vec![], @@ -140,19 +144,8 @@ pub fn set_panic_hook() { /// * `RW_QUERY_LOG_PATH`: the path to generate query log. If set, [`ENABLE_QUERY_LOG_FILE`] is /// turned on. pub fn init_risingwave_logger(settings: LoggerSettings, registry: prometheus::Registry) { - let mut layers = vec![]; - - // fmt layer (formatting and logging to stdout) - { - let fmt_layer = tracing_subscriber::fmt::layer() - .compact() - .with_ansi(settings.colorful); - let fmt_layer = if ENABLE_PRETTY_LOG { - fmt_layer.pretty().boxed() - } else { - fmt_layer.boxed() - }; - + // Default filter for logging to stdout and tracing. + let filter = { let filter = filter::Targets::new() .with_target("aws_sdk_ec2", Level::INFO) .with_target("aws_sdk_s3", Level::INFO) @@ -171,12 +164,12 @@ pub fn init_risingwave_logger(settings: LoggerSettings, registry: prometheus::Re let filter = configure_risingwave_targets_fmt(filter); - // Enable DEBUG level for all other crates - #[cfg(debug_assertions)] - let filter = filter.with_default(Level::DEBUG); - - #[cfg(not(debug_assertions))] - let filter = filter.with_default(Level::INFO); + // For all other crates + let filter = filter.with_default(if cfg!(debug_assertions) { + Level::DEBUG + } else { + Level::INFO + }); let filter = settings .targets @@ -185,8 +178,34 @@ pub fn init_risingwave_logger(settings: LoggerSettings, registry: prometheus::Re filter.with_target(target, level) }); - layers.push(fmt_layer.with_filter(to_env_filter(filter)).boxed()); + move |additional_targets: Vec<(&str, Level)>| { + to_env_filter(filter.clone().with_targets(additional_targets)) + } + }; + + let mut layers = vec![]; + + // fmt layer (formatting and logging to stdout) + { + let fmt_layer = tracing_subscriber::fmt::layer() + .compact() + .with_ansi(settings.colorful); + let fmt_layer = if ENABLE_PRETTY_LOG { + fmt_layer.pretty().boxed() + } else { + fmt_layer.boxed() + }; + + layers.push( + fmt_layer + .with_filter(FilterFn::new(|metadata| metadata.is_event())) // filter-out all span-related info + .with_filter(filter(vec![ + ("rw_tracing", Level::OFF), // filter out tracing-only events + ])) + .boxed(), + ); }; + let default_query_log_path = "./".to_string(); let query_log_path = std::env::var("RW_QUERY_LOG_PATH"); @@ -275,15 +294,78 @@ pub fn init_risingwave_logger(settings: LoggerSettings, registry: prometheus::Re .build() .unwrap() .block_on(async move { - tracing::info!("serving console subscriber"); + println!("serving console subscriber"); server.serve().await.unwrap(); }); }); }; - let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG); + // Tracing layer + #[cfg(not(madsim))] + if let Ok(endpoint) = std::env::var("RW_TRACING_ENDPOINT") { + println!("tracing enabled, exported to `{endpoint}`"); + + use opentelemetry::{sdk, KeyValue}; + use opentelemetry_otlp::WithExportConfig; + use opentelemetry_semantic_conventions::resource; + + let id = format!( + "{}-{}", + hostname::get() + .ok() + .and_then(|o| o.into_string().ok()) + .unwrap_or_default(), + std::process::id() + ); + + let otel_tracer = { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("risingwave-otel") + .worker_threads(2) + .build() + .unwrap(); + let runtime = Box::leak(Box::new(runtime)); + + // Installing the exporter requires a tokio runtime. + let _entered = runtime.enter(); + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint), + ) + .with_trace_config(sdk::trace::config().with_resource(sdk::Resource::new([ + KeyValue::new( + resource::SERVICE_NAME, + // TODO(bugen): better service name + // https://github.com/jaegertracing/jaeger-ui/issues/336 + format!("{}-{}", settings.name, id), + ), + KeyValue::new(resource::SERVICE_INSTANCE_ID, id), + KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), + ]))) + .install_batch(opentelemetry::runtime::Tokio) + .unwrap() + }; + + let layer = tracing_opentelemetry::layer() + .with_tracer(otel_tracer) + .with_filter(filter(vec![])); + + layers.push(layer.boxed()); + } + + // Metrics layer + { + let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG); + + layers.push(Box::new(MetricsLayer::new(registry).with_filter(filter))); + } - layers.push(Box::new(MetricsLayer::new(registry).with_filter(filter))); tracing_subscriber::registry().with(layers).init(); // TODO: add file-appender tracing subscriber in the future diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index a6d2e890f6ad5..3a1c1dcaef919 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -37,9 +37,10 @@ fixedbitset = { version = "0.4" } futures = { version = "0.3" } futures-channel = { version = "0.3", features = ["sink"] } futures-core = { version = "0.3" } +futures-executor = { version = "0.3" } futures-io = { version = "0.3" } futures-sink = { version = "0.3" } -futures-task = { version = "0.3", default-features = false, features = ["std"] } +futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["nightly"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } @@ -94,6 +95,7 @@ tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] } tower = { version = "0.4", features = ["balance", "buffer", "filter", "limit", "load-shed", "retry", "timeout", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } +tracing-futures = { version = "0.2", features = ["futures-03"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "parking_lot"] } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } @@ -120,9 +122,10 @@ fixedbitset = { version = "0.4" } futures = { version = "0.3" } futures-channel = { version = "0.3", features = ["sink"] } futures-core = { version = "0.3" } +futures-executor = { version = "0.3" } futures-io = { version = "0.3" } futures-sink = { version = "0.3" } -futures-task = { version = "0.3", default-features = false, features = ["std"] } +futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["nightly"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } @@ -181,6 +184,7 @@ tonic = { version = "0.8", features = ["gzip", "tls-webpki-roots"] } tower = { version = "0.4", features = ["balance", "buffer", "filter", "limit", "load-shed", "retry", "timeout", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } +tracing-futures = { version = "0.2", features = ["futures-03"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "parking_lot"] } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] }