Skip to content

Commit

Permalink
feat(streaming): provide more context when panic with async stack tra…
Browse files Browse the repository at this point in the history
…ce (#5931)

* trace wrapper

Signed-off-by: Bugen Zhao <[email protected]>

* print sql if panic

Signed-off-by: Bugen Zhao <[email protected]>

* rename for_test

Signed-off-by: Bugen Zhao <[email protected]>

* enable verbose async stack trace

Signed-off-by: Bugen Zhao <[email protected]>

* tweak build profile

Signed-off-by: Bugen Zhao <[email protected]>

* remove unwind trace

Signed-off-by: Bugen Zhao <[email protected]>

* revert context changes

Signed-off-by: Bugen Zhao <[email protected]>

* try always enable

Signed-off-by: Bugen Zhao <[email protected]>

* support verbose configuration

Signed-off-by: Bugen Zhao <[email protected]>

* verbose config

Signed-off-by: Bugen Zhao <[email protected]>

* print options on launch

Signed-off-by: Bugen Zhao <[email protected]>

* add doc

Signed-off-by: Bugen Zhao <[email protected]>

* use const generic

Signed-off-by: Bugen Zhao <[email protected]>

* static non verbose

Signed-off-by: Bugen Zhao <[email protected]>

* statically ignore verbose

Signed-off-by: Bugen Zhao <[email protected]>

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
BugenZhao and mergify[bot] authored Oct 21, 2022
1 parent d4f5307 commit 5ffeccb
Show file tree
Hide file tree
Showing 37 changed files with 255 additions and 166 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ inherits = "dev"
incremental = false
[profile.ci-dev.package."*"] # external dependencies
opt-level = 1
[profile.ci-dev.package."tokio"]
opt-level = 3
[profile.ci-dev.package."async_stack_trace"]
opt-level = 3
[profile.ci-dev.package."indextree"]
opt-level = 3

# The profile used for deterministic simulation tests in CI.
# The simulator can only run single-threaded, so optimization is required to make the running time
Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ message StreamActor {
// Vnodes that the executors in this actor own.
// If the fragment is a singleton, this field will not be set and leave a `None`.
common.Buffer vnode_bitmap = 8;
// The SQL definition of this materialized view. Used for debugging only.
string mview_definition = 9;
}

enum FragmentType {
Expand Down
11 changes: 6 additions & 5 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,21 +275,21 @@ risedev:
id: compute-node-0
listen-address: "0.0.0.0"
address: ${dns-host:rw-compute-0}
enable-async-stack-trace: true
async-stack-trace: verbose
enable-tiered-cache: true

- use: compute-node
id: compute-node-1
listen-address: "0.0.0.0"
address: ${dns-host:rw-compute-1}
enable-async-stack-trace: true
async-stack-trace: verbose
enable-tiered-cache: true

- use: compute-node
id: compute-node-2
listen-address: "0.0.0.0"
address: ${dns-host:rw-compute-2}
enable-async-stack-trace: true
async-stack-trace: verbose
enable-tiered-cache: true

- use: frontend
Expand Down Expand Up @@ -518,8 +518,9 @@ template:
# Id of this instance
id: compute-node-${port}

# Whether to enable async stack trace for this compute node.
enable-async-stack-trace: false
# Whether to enable async stack trace for this compute node, `off`, `on`, or `verbose`.
# Considering the performance, `verbose` mode only effect under `release` profile with `debug_assertions` off.
async-stack-trace: on

# If `enable-tiered-cache` is true, hummock will use data directory as file cache.
enable-tiered-cache: false
Expand Down
14 changes: 11 additions & 3 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ extern crate tracing;
pub mod rpc;
pub mod server;

use clap::clap_derive::ArgEnum;
use clap::Parser;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, ArgEnum)]
pub enum AsyncStackTraceOption {
Off,
On,
Verbose,
}

/// Command-line arguments for compute-node.
#[derive(Parser, Debug)]
pub struct ComputeNodeOpts {
Expand Down Expand Up @@ -64,8 +72,8 @@ pub struct ComputeNodeOpts {
pub enable_jaeger_tracing: bool,

/// Enable async stack tracing for risectl.
#[clap(long)]
pub enable_async_stack_trace: bool,
#[clap(long, arg_enum, default_value_t = AsyncStackTraceOption::Off)]
pub async_stack_trace: AsyncStackTraceOption,

/// Path to file cache data directory.
/// Left empty to disable file cache.
Expand All @@ -89,7 +97,7 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
Box::pin(async move {
tracing::info!("meta address: {}", opts.meta_address.clone());
tracing::info!("Compute node options: {:?}", opts);

let listen_address = opts.host.parse().unwrap();
tracing::info!("Server Listening at {}", listen_address);
Expand Down
26 changes: 11 additions & 15 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ pub mod grpc_middleware {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use async_stack_trace::{SpanValue, StackTraceManager};
use async_stack_trace::{SpanValue, StackTraceManager, TraceConfig};
use futures::Future;
use hyper::Body;
use tokio::sync::Mutex;
Expand All @@ -124,19 +123,20 @@ pub mod grpc_middleware {
#[derive(Clone)]
pub struct StackTraceMiddlewareLayer {
manager: GrpcStackTraceManagerRef,
config: TraceConfig,
}
pub type OptionalStackTraceMiddlewareLayer = Either<StackTraceMiddlewareLayer, Identity>;

impl StackTraceMiddlewareLayer {
pub fn new(manager: GrpcStackTraceManagerRef) -> Self {
Self { manager }
pub fn new(manager: GrpcStackTraceManagerRef, config: TraceConfig) -> Self {
Self { manager, config }
}

pub fn new_optional(
manager: Option<GrpcStackTraceManagerRef>,
optional: Option<(GrpcStackTraceManagerRef, TraceConfig)>,
) -> OptionalStackTraceMiddlewareLayer {
if let Some(manager) = manager {
Either::A(Self::new(manager))
if let Some((manager, config)) = optional {
Either::A(Self::new(manager, config))
} else {
Either::B(Identity::new())
}
Expand All @@ -150,6 +150,7 @@ pub mod grpc_middleware {
StackTraceMiddleware {
inner: service,
manager: self.manager.clone(),
config: self.config.clone(),
next_id: Default::default(),
}
}
Expand All @@ -159,6 +160,7 @@ pub mod grpc_middleware {
pub struct StackTraceMiddleware<S> {
inner: S,
manager: GrpcStackTraceManagerRef,
config: TraceConfig,
next_id: Arc<AtomicU64>,
}

Expand All @@ -185,19 +187,13 @@ pub mod grpc_middleware {

let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let manager = self.manager.clone();
let config = self.config.clone();

async move {
let sender = manager.lock().await.register(id);
let root_span: SpanValue = format!("{}:{}", req.uri().path(), id).into();

sender
.trace(
inner.call(req),
root_span,
false,
Duration::from_millis(100),
)
.await
sender.trace(inner.call(req), root_span, config).await
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::rpc::service::monitor_service::{
GrpcStackTraceManagerRef, MonitorServiceImpl, StackTraceMiddlewareLayer,
};
use crate::rpc::service::stream_service::StreamServiceImpl;
use crate::{ComputeNodeConfig, ComputeNodeOpts};
use crate::{AsyncStackTraceOption, ComputeNodeConfig, ComputeNodeOpts};

/// Bootstraps the compute-node.
pub async fn compute_node_serve(
Expand Down Expand Up @@ -177,14 +177,23 @@ pub async fn compute_node_serve(
extra_info_sources,
));

let async_stack_trace_config = match opts.async_stack_trace {
AsyncStackTraceOption::Off => None,
c => Some(async_stack_trace::TraceConfig {
report_detached: true,
verbose: matches!(c, AsyncStackTraceOption::Verbose),
interval: Duration::from_secs(1),
}),
};

// Initialize the managers.
let batch_mgr = Arc::new(BatchManager::new(config.batch.worker_threads_num));
let stream_mgr = Arc::new(LocalStreamManager::new(
client_addr.clone(),
state_store.clone(),
streaming_metrics.clone(),
config.streaming.clone(),
opts.enable_async_stack_trace,
async_stack_trace_config.clone(),
opts.enable_managed_cache,
));
let source_mgr = Arc::new(TableSourceManager::new(
Expand Down Expand Up @@ -237,8 +246,7 @@ pub async fn compute_node_serve(
.initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
.tcp_nodelay(true)
.layer(StackTraceMiddlewareLayer::new_optional(
opts.enable_async_stack_trace
.then_some(grpc_stack_trace_mgr),
async_stack_trace_config.map(|c| (grpc_stack_trace_mgr, c)),
))
.add_service(TaskServiceServer::new(batch_srv))
.add_service(ExchangeServiceServer::new(exchange_srv))
Expand Down
4 changes: 1 addition & 3 deletions src/ctl/src/cmd_impl/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ pub async fn trace() -> anyhow::Result<()> {
}

if all_actor_traces.is_empty() && all_rpc_traces.is_empty() {
println!(
"No traces found. No actors are running, or `--enable-async-stack-trace` not set?"
);
println!("No traces found. No actors are running, or `--async-stack-trace` not set?");
} else {
println!("--- Actor Traces ---");
for (key, trace) in all_actor_traces {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn gen_create_mv_plan(
(db_id, schema.id())
};

let definition = format!("{}", query);
let definition = query.to_string();

let bound = {
let mut binder = Binder::new(session);
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ impl StreamingJob {
}
}

pub fn mview_definition(&self) -> String {
match self {
Self::MaterializedView(table) => table.definition.clone(),
Self::MaterializedSource(_, table) => table.definition.clone(),
_ => "".to_owned(),
}
}

pub fn properties(&self) -> HashMap<String, String> {
match self {
Self::MaterializedView(table) => table.properties.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/service/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ where
schema_id: stream_job.schema_id(),
database_id: stream_job.database_id(),
mview_name: stream_job.name(),
mview_definition: stream_job.mview_definition(),
table_properties: stream_job.properties(),
table_sink_map: self
.fragment_manager
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/stream/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ mod test {
upstream_actor_id: vec![],
same_worker_node_as_upstream: false,
vnode_bitmap: None,
mview_definition: "".to_owned(),
}],
..Default::default()
};
Expand All @@ -350,6 +351,7 @@ mod test {
upstream_actor_id: vec![],
same_worker_node_as_upstream: false,
vnode_bitmap: None,
mview_definition: "".to_owned(),
})
.collect_vec();
actor_id += node_count * parallel_degree as u32;
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ impl StreamActorBuilder {
same_worker_node_as_upstream: self.chain_same_worker_node
|| self.upstreams.values().any(|u| u.same_worker_node),
vnode_bitmap: None,
// To be filled by `StreamGraphBuilder::build`
mview_definition: "".to_owned(),
}
}
}
Expand Down Expand Up @@ -495,6 +497,8 @@ impl StreamGraphBuilder {
)?;

actor.nodes = Some(stream_node);
actor.mview_definition = ctx.mview_definition.clone();

graph
.entry(builder.get_fragment_id())
.or_default()
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub struct CreateMaterializedViewContext {
pub database_id: DatabaseId,
/// Name of mview, for internal table name generation.
pub mview_name: String,
/// The SQL definition of this materialized view. Used for debugging only.
pub mview_definition: String,

pub table_properties: HashMap<String, String>,
}

Expand Down
14 changes: 7 additions & 7 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let ret = self
.inner
.upload(path, obj)
.stack_trace("object_store_upload")
.verbose_stack_trace("object_store_upload")
.await;

try_update_failure_metric(&self.object_store_metrics, &ret, operation_type);
Expand Down Expand Up @@ -561,7 +561,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let res = self
.inner
.read(path, block_loc)
.stack_trace("object_store_read")
.verbose_stack_trace("object_store_read")
.await
.map_err(|err| {
ObjectError::internal(format!(
Expand Down Expand Up @@ -598,7 +598,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let res = self
.inner
.readv(path, block_locs)
.stack_trace("object_store_readv")
.verbose_stack_trace("object_store_readv")
.await;

try_update_failure_metric(&self.object_store_metrics, &res, operation_type);
Expand Down Expand Up @@ -636,7 +636,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let ret = self
.inner
.metadata(path)
.stack_trace("object_store_metadata")
.verbose_stack_trace("object_store_metadata")
.await;

try_update_failure_metric(&self.object_store_metrics, &ret, operation_type);
Expand All @@ -654,7 +654,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let ret = self
.inner
.delete(path)
.stack_trace("object_store_delete")
.verbose_stack_trace("object_store_delete")
.await;

try_update_failure_metric(&self.object_store_metrics, &ret, operation_type);
Expand All @@ -672,7 +672,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let ret = self
.inner
.delete_objects(paths)
.stack_trace("object_store_delete_objects")
.verbose_stack_trace("object_store_delete_objects")
.await;

try_update_failure_metric(&self.object_store_metrics, &ret, operation_type);
Expand All @@ -690,7 +690,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let ret = self
.inner
.list(prefix)
.stack_trace("object_store_list")
.verbose_stack_trace("object_store_list")
.await;

try_update_failure_metric(&self.object_store_metrics, &ret, operation_type);
Expand Down
2 changes: 1 addition & 1 deletion src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct ComputeNodeConfig {
pub port: u16,
pub listen_address: String,
pub exporter_port: u16,
pub enable_async_stack_trace: bool,
pub async_stack_trace: String,
pub enable_managed_cache: bool,
pub enable_tiered_cache: bool,

Expand Down
8 changes: 3 additions & 5 deletions src/risedevtool/src/task/compute_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,9 @@ impl ComputeNodeService {
.arg("--client-address")
.arg(format!("{}:{}", config.address, config.port))
.arg("--metrics-level")
.arg("1");

if config.enable_async_stack_trace {
cmd.arg("--enable-async-stack-trace");
}
.arg("1")
.arg("--async-stack-trace")
.arg(&config.async_stack_trace);

if config.enable_managed_cache {
cmd.arg("--enable-managed-cache");
Expand Down
Loading

0 comments on commit 5ffeccb

Please sign in to comment.