Skip to content

Commit

Permalink
proxy: OpenTelemetry tracing.
Browse files Browse the repository at this point in the history
Thanks to commit XXXX ("Refactor common parts of handle_client and
handle_ws_client to function."), we now have separate tracing spans
for the connection establishment phase and for the forwarring phase
of each connection. This commit sets up OpenTelemetry tracing and
exporter, so that they can be exported as OpenTelemetry traces as
well.

This adds tracing to all outgoing HTTP requests. A separate (child)
span is created for each outgoing HTTP request, and the tracing
context is also propagated to the server in the HTTP headers. Use the
'reqwest-middleware' crate to do that.

If tracing is enabled in the control plane and compute node too, you
can now get an end-to-end distributed trace of what happens when a new
connection is established, starting from the handshake with the
client, creating the 'start_compute' operation in the control plane,
starting the compute node, all the way to down to fetching the base
backup and the availability checks in compute_ctl.
  • Loading branch information
hlinnaka committed Jan 26, 2023
1 parent 6fcf0f2 commit 28064b5
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 21 deletions.
64 changes: 64 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ prost = "0.11"
rand = "0.8"
regex = "1.4"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
routerify = "3"
rpds = "0.12.0"
rustls = "0.20"
Expand Down
5 changes: 5 additions & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ hyper-tungstenite.workspace = true
itertools.workspace = true
md5.workspace = true
once_cell.workspace = true
opentelemetry = "0.18.0"
tracing-opentelemetry = "0.18.0"
parking_lot.workspace = true
pin-project-lite.workspace = true
rand.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = [ "json" ] }
reqwest-tracing.workspace = true
reqwest-middleware.workspace = true
routerify.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true
Expand All @@ -43,6 +47,7 @@ tokio-rustls.workspace = true
tls-listener.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
url.workspace = true
uuid.workspace = true
webpki-roots.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions proxy/src/auth/backend/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ impl UserFacingError for ApiError {
}

// Helps eliminate graceless `.map_err` calls without introducing another ctor.
impl From<reqwest_middleware::Error> for ApiError {
fn from(e: reqwest_middleware::Error) -> Self {
io_error(e).into()
}
}

impl From<reqwest::Error> for ApiError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
Expand Down
18 changes: 9 additions & 9 deletions proxy/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ pub mod server;
pub mod websocket;

use crate::url::ApiUrl;
use reqwest::{Request, Response};
use reqwest_middleware::{ClientWithMiddleware, Error, RequestBuilder};

/// Thin convenience wrapper for an API provided by an http endpoint.
#[derive(Debug, Clone)]
pub struct Endpoint {
/// API's base URL.
endpoint: ApiUrl,
/// Connection manager with built-in pooling.
client: reqwest::Client,
client: ClientWithMiddleware,
}

impl Endpoint {
/// Construct a new HTTP endpoint wrapper.
pub fn new(endpoint: ApiUrl, client: reqwest::Client) -> Self {
pub fn new(endpoint: ApiUrl, client: ClientWithMiddleware) -> Self {
Self { endpoint, client }
}

Expand All @@ -25,29 +27,27 @@ impl Endpoint {

/// Return a [builder](reqwest::RequestBuilder) for a `GET` request,
/// appending a single `path` segment to the base endpoint URL.
pub fn get(&self, path: &str) -> reqwest::RequestBuilder {
pub fn get(&self, path: &str) -> RequestBuilder {
let mut url = self.endpoint.clone();
url.path_segments_mut().push(path);
self.client.get(url.into_inner())
}

/// Execute a [request](reqwest::Request).
pub async fn execute(
&self,
request: reqwest::Request,
) -> Result<reqwest::Response, reqwest::Error> {
pub async fn execute(&self, request: Request) -> Result<Response, Error> {
self.client.execute(request).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use reqwest::Client;

#[test]
fn optional_query_params() -> anyhow::Result<()> {
let url = "http://example.com".parse()?;
let endpoint = Endpoint::new(url, reqwest::Client::new());
let endpoint = Endpoint::new(url, ClientWithMiddleware::from(Client::new()));

// Validate that this pattern makes sense.
let req = endpoint
Expand All @@ -66,7 +66,7 @@ mod tests {
#[test]
fn uuid_params() -> anyhow::Result<()> {
let url = "http://example.com".parse()?;
let endpoint = Endpoint::new(url, reqwest::Client::new());
let endpoint = Endpoint::new(url, ClientWithMiddleware::from(Client::new()));

let req = endpoint
.get("frobnicate")
Expand Down
38 changes: 38 additions & 0 deletions proxy/src/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;

/// Initialize logging to stdout, and OpenTelemetry tracing and exporter
///
/// Logging is configured using either `default_log_level` or
/// `RUST_LOG` environment variable as default log level.
///
/// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
/// configuration from environment variables. For example, to change the destination,
/// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See
/// https://opentelemetry.io/docs/reference/specification/sdk-environment-variables/
///
pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
// Initialize Logging
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level));

let fmt_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_writer(std::io::stderr);

// Initialize OpenTelemetry
let otlp_layer = tracing_utils::init_tracing("proxy")
.await
.map(OpenTelemetryLayer::new);

// Put it all together
tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.init();
tracing::info!("logging and tracing started");

Ok(())
}
16 changes: 11 additions & 5 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod config;
mod console;
mod error;
mod http;
mod logging;
mod metrics;
mod mgmt;
mod parse;
Expand Down Expand Up @@ -43,10 +44,7 @@ async fn flatten_err(

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_ansi(atty::is(atty::Stream::Stdout))
.with_target(false)
.init();
logging::init_tracing_and_logging("info").await?;

// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
Expand Down Expand Up @@ -82,6 +80,10 @@ async fn main() -> anyhow::Result<()> {
_ => bail!("either both or neither metric-collection-endpoint and metric-collection-interval must be specified"),
};

let http_client = reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
.with(reqwest_tracing::TracingMiddleware::default())
.build();

let auth_backend = match arg_matches
.get_one::<String>("auth-backend")
.unwrap()
Expand All @@ -92,7 +94,7 @@ async fn main() -> anyhow::Result<()> {
.get_one::<String>("auth-endpoint")
.unwrap()
.parse()?;
let endpoint = http::Endpoint::new(url, reqwest::Client::new());
let endpoint = http::Endpoint::new(url, http_client);
auth::BackendType::Console(Cow::Owned(endpoint), ())
}
"postgres" => {
Expand Down Expand Up @@ -166,6 +168,10 @@ async fn main() -> anyhow::Result<()> {
// Furthermore, the first one to fail will cancel the rest.
let _: Vec<()> = futures::future::try_join_all(tasks).await?;

// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing_utils::shutdown_tracing();

Ok(())
}

Expand Down
15 changes: 9 additions & 6 deletions proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use serde::Serialize;
use std::{collections::HashMap, time::Duration};
use tracing::{debug, error, log::info, trace};
use tracing::{debug, error, info, instrument, trace};

const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";

Expand Down Expand Up @@ -41,14 +41,16 @@ pub async fn collect_metrics(
);

// define client here to reuse it for all requests
let client = reqwest::Client::new();
let http_client = reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
.with(reqwest_tracing::TracingMiddleware::default())
.build();
let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();

loop {
tokio::select! {
_ = ticker.tick() => {

match collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, hostname.clone()).await
match collect_metrics_iteration(&http_client, &mut cached_metrics, metric_collection_endpoint, hostname.clone()).await
{
Err(e) => {
error!("Failed to send consumption metrics: {} ", e);
Expand All @@ -60,7 +62,7 @@ pub async fn collect_metrics(
}
}

pub fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
let mut current_metrics: Vec<(Ids, (u64, DateTime<Utc>))> = Vec::new();
let metrics = prometheus::default_registry().gather();

Expand Down Expand Up @@ -99,8 +101,9 @@ pub fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
current_metrics
}

pub async fn collect_metrics_iteration(
client: &reqwest::Client,
#[instrument(skip_all)]
async fn collect_metrics_iteration(
client: &reqwest_middleware::ClientWithMiddleware,
cached_metrics: &mut HashMap<Ids, (u64, DateTime<Utc>)>,
metric_collection_endpoint: &reqwest::Url,
hostname: String,
Expand Down
2 changes: 1 addition & 1 deletion workspace_hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-syntax = { version = "0.6" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "multipart", "rustls-tls"] }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.20", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
Expand Down

0 comments on commit 28064b5

Please sign in to comment.