Skip to content

Commit

Permalink
[proxy] Enable OpenTelemetry tracing.
Browse files Browse the repository at this point in the history
This commit sets up OpenTelemetry tracing and exporter, so that they
can be exported as OpenTelemetry traces as well.

All outgoing HTTP requests will be traced. A separate (child)
span is created for each outgoing HTTP request, and the tracing
context is also propagated to the server in the HTTP headers.

If tracing is enabled in the control plane and compute node too, you
can now get an end-to-end distributed trace of what happens when a new
connection is established, starting from the handshake with the
client, creating the 'start_compute' operation in the control plane,
starting the compute node, all the way to down to fetching the base
backup and the availability checks in compute_ctl.

Co-authored-by: Dmitry Ivanov <[email protected]>
  • Loading branch information
hlinnaka and funbringer committed Feb 16, 2023
1 parent 0d3aefb commit f0c9e35
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 50 deletions.
64 changes: 64 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ prost = "0.11"
rand = "0.8"
regex = "1.4"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
routerify = "3"
rpds = "0.12.0"
rustls = "0.20"
Expand Down
5 changes: 5 additions & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ itertools.workspace = true
md5.workspace = true
metrics.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
parking_lot.workspace = true
pin-project-lite.workspace = true
pq_proto.workspace = true
prometheus.workspace = true
rand.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
reqwest-middleware.workspace = true
reqwest-tracing.workspace = true
routerify.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
Expand All @@ -49,7 +52,9 @@ tls-listener.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
url.workspace = true
utils.workspace = true
Expand Down
23 changes: 15 additions & 8 deletions proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use async_trait::async_trait;
use std::sync::Arc;

pub mod errors {
use crate::error::{io_error, UserFacingError};
use reqwest::StatusCode as HttpStatusCode;
use crate::{
error::{io_error, UserFacingError},
http,
};
use thiserror::Error;

/// A go-to error message which doesn't leak any detail.
Expand All @@ -24,7 +26,7 @@ pub mod errors {
/// Error returned by the console itself.
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
Console {
status: HttpStatusCode,
status: http::StatusCode,
text: Box<str>,
},

Expand All @@ -35,7 +37,7 @@ pub mod errors {

impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
pub fn http_status_code(&self) -> Option<HttpStatusCode> {
pub fn http_status_code(&self) -> Option<http::StatusCode> {
use ApiError::*;
match self {
Console { status, .. } => Some(*status),
Expand All @@ -51,15 +53,15 @@ pub mod errors {
// To minimize risks, only select errors are forwarded to users.
// Ask @neondatabase/control-plane for review before adding more.
Console { status, .. } => match *status {
HttpStatusCode::NOT_FOUND => {
http::StatusCode::NOT_FOUND => {
// Status 404: failed to get a project-related resource.
format!("{REQUEST_FAILED}: endpoint cannot be found")
}
HttpStatusCode::NOT_ACCEPTABLE => {
http::StatusCode::NOT_ACCEPTABLE => {
// Status 406: endpoint is disabled (we don't allow connections).
format!("{REQUEST_FAILED}: endpoint is disabled")
}
HttpStatusCode::LOCKED => {
http::StatusCode::LOCKED => {
// Status 423: project might be in maintenance mode (or bad state).
format!("{REQUEST_FAILED}: endpoint is temporary unavailable")
}
Expand All @@ -70,13 +72,18 @@ pub mod errors {
}
}

// Helps eliminate graceless `.map_err` calls without introducing another ctor.
impl From<reqwest::Error> for ApiError {
fn from(e: reqwest::Error) -> Self {
io_error(e).into()
}
}

impl From<reqwest_middleware::Error> for ApiError {
fn from(e: reqwest_middleware::Error) -> Self {
io_error(e).into()
}
}

#[derive(Debug, Error)]
pub enum GetAuthInfoError {
// We shouldn't include the actual secret here.
Expand Down
5 changes: 2 additions & 3 deletions proxy/src/console/provider/neon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use super::{
use crate::{auth::ClientCredentials, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
use reqwest::StatusCode as HttpStatusCode;
use tracing::{error, info, info_span, warn, Instrument};

#[derive(Clone)]
Expand Down Expand Up @@ -52,7 +51,7 @@ impl Api {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
Err(e) => match e.http_status_code() {
Some(HttpStatusCode::NOT_FOUND) => return Ok(None),
Some(http::StatusCode::NOT_FOUND) => return Ok(None),
_otherwise => return Err(e.into()),
},
};
Expand Down Expand Up @@ -154,7 +153,7 @@ impl super::Api for Api {

/// Parse http response body, taking status code into account.
async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
response: reqwest::Response,
response: http::Response,
) -> Result<T, ApiError> {
let status = response.status();
if status.is_success() {
Expand Down
41 changes: 30 additions & 11 deletions proxy/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,72 @@
//! HTTP client and server impls.
//! Other modules should use stuff from this module instead of
//! directly relying on deps like `reqwest` (think loose coupling).
pub mod server;
pub mod websocket;

pub use reqwest::{Request, Response, StatusCode};
pub use reqwest_middleware::{ClientWithMiddleware, Error};

use crate::url::ApiUrl;
use reqwest_middleware::RequestBuilder;

/// This is the preferred way to create new http clients,
/// because it takes care of observability (OpenTelemetry).
/// We deliberately don't want to replace this with a public static.
pub fn new_client() -> ClientWithMiddleware {
reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
.with(reqwest_tracing::TracingMiddleware::default())
.build()
}

/// Thin convenience wrapper for an API provided by an http endpoint.
#[derive(Debug, Clone)]
pub struct Endpoint {
/// API's base URL.
endpoint: ApiUrl,
/// Connection manager with built-in pooling.
client: reqwest::Client,
client: ClientWithMiddleware,
}

impl Endpoint {
/// Construct a new HTTP endpoint wrapper.
pub fn new(endpoint: ApiUrl, client: reqwest::Client) -> Self {
Self { endpoint, client }
/// Http client is not constructed under the hood so that it can be shared.
pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
Self {
endpoint,
client: client.into(),
}
}

#[inline(always)]
pub fn url(&self) -> &ApiUrl {
&self.endpoint
}

/// Return a [builder](reqwest::RequestBuilder) for a `GET` request,
/// Return a [builder](RequestBuilder) for a `GET` request,
/// appending a single `path` segment to the base endpoint URL.
pub fn get(&self, path: &str) -> reqwest::RequestBuilder {
pub fn get(&self, path: &str) -> RequestBuilder {
let mut url = self.endpoint.clone();
url.path_segments_mut().push(path);
self.client.get(url.into_inner())
}

/// Execute a [request](reqwest::Request).
pub async fn execute(
&self,
request: reqwest::Request,
) -> Result<reqwest::Response, reqwest::Error> {
pub async fn execute(&self, request: Request) -> Result<Response, Error> {
self.client.execute(request).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use reqwest::Client;

#[test]
fn optional_query_params() -> anyhow::Result<()> {
let url = "http://example.com".parse()?;
let endpoint = Endpoint::new(url, reqwest::Client::new());
let endpoint = Endpoint::new(url, Client::new());

// Validate that this pattern makes sense.
let req = endpoint
Expand All @@ -66,7 +85,7 @@ mod tests {
#[test]
fn uuid_params() -> anyhow::Result<()> {
let url = "http://example.com".parse()?;
let endpoint = Endpoint::new(url, reqwest::Client::new());
let endpoint = Endpoint::new(url, Client::new());

let req = endpoint
.get("frobnicate")
Expand Down
46 changes: 46 additions & 0 deletions proxy/src/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
prelude::*,
};

/// Initialize logging and OpenTelemetry tracing and exporter.
///
/// Logging can be configured using `RUST_LOG` environment variable.
///
/// OpenTelemetry is configured with OTLP/HTTP exporter. It picks up
/// configuration from environment variables. For example, to change the
/// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`.
/// See <https://opentelemetry.io/docs/reference/specification/sdk-environment-variables>
pub async fn init() -> anyhow::Result<LoggingGuard> {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();

let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(atty::is(atty::Stream::Stderr))
.with_writer(std::io::stderr)
.with_target(false);

let otlp_layer = tracing_utils::init_tracing("proxy")
.await
.map(OpenTelemetryLayer::new);

tracing_subscriber::registry()
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)
.try_init()?;

Ok(LoggingGuard)
}

pub struct LoggingGuard;

impl Drop for LoggingGuard {
fn drop(&mut self) {
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing_utils::shutdown_tracing();
}
}
Loading

0 comments on commit f0c9e35

Please sign in to comment.