Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the number of service instances in the proxy #428

Merged
merged 3 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ dependencies = [
"linkerd2-retry",
"linkerd2-router",
"linkerd2-stack",
"linkerd2-stack-metrics",
"linkerd2-test-util",
"linkerd2-timeout",
"linkerd2-trace-context",
Expand Down Expand Up @@ -1201,6 +1202,22 @@ dependencies = [
"tower-service",
]

[[package]]
name = "linkerd2-stack-metrics"
version = "0.1.0"
dependencies = [
"futures",
"indexmap",
"linkerd2-error",
"linkerd2-metrics",
"linkerd2-stack",
"tokio",
"tokio-timer",
"tower",
"tower-util",
"tracing",
]

[[package]]
name = "linkerd2-test-util"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ members = [
"linkerd/router",
"linkerd/signal",
"linkerd/stack",
"linkerd/stack-metrics",
"linkerd/test-util",
"linkerd/timeout",
"linkerd2-proxy",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ linkerd2-request-filter = { path = "../../request-filter" }
linkerd2-retry = { path = "../../retry" }
linkerd2-router = { path = "../../router" }
linkerd2-stack = { path = "../../stack" }
linkerd2-stack-metrics = { path = "../../stack-metrics" }
linkerd2-timeout = { path = "../../timeout" }
linkerd2-trace-context = { path = "../../trace-context" }
rand = { version = "0.7", features = ["small_rng"] }
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use linkerd2_opencensus as opencensus;
pub use linkerd2_reconnect as reconnect;
pub use linkerd2_request_filter as request_filter;
pub use linkerd2_router as router;
pub use linkerd2_stack_metrics as stack_metrics;
pub use linkerd2_trace_context as trace_context;

pub mod accept_error;
Expand Down Expand Up @@ -104,6 +105,8 @@ pub type HttpRouteMetrics = http_metrics::Requests<metric_labels::RouteLabels, c

pub type HttpRouteRetry = http_metrics::Retries<metric_labels::RouteLabels>;

pub type StackMetrics = stack_metrics::Registry<metric_labels::StackLabels>;

#[derive(Clone)]
pub struct ProxyMetrics {
pub http_handle_time: handle_time::Scope,
Expand All @@ -112,5 +115,6 @@ pub struct ProxyMetrics {
pub http_route_retry: HttpRouteRetry,
pub http_endpoint: HttpEndpointMetrics,
pub http_errors: errors::MetricsLayer,
pub stack: StackMetrics,
pub transport: transport::Metrics,
}
31 changes: 31 additions & 0 deletions linkerd/app/core/src/metric_labels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ pub struct EndpointLabels {
pub labels: Option<String>,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StackLabels {
pub direction: Direction,
pub name: &'static str,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RouteLabels {
dst: dst::DstAddr,
Expand Down Expand Up @@ -186,3 +192,28 @@ where
}
Some(out)
}

// === impl StackLabels ===

impl StackLabels {
pub fn inbound(name: &'static str) -> Self {
Self {
direction: Direction::In,
name,
}
}

pub fn outbound(name: &'static str) -> Self {
Self {
direction: Direction::Out,
name,
}
}
}

impl FmtLabels for StackLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.direction.fmt_labels(f)?;
write!(f, ",name=\"{}\"", self.name)
}
}
4 changes: 4 additions & 0 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl<L> Layers<L> {
self.push_pending().push(buffer::layer(bound, d))
}

pub fn push_per_make<T: Clone>(self, layer: T) -> Layers<Pair<L, stack::per_make::Layer<T>>> {
olix0r marked this conversation as resolved.
Show resolved Hide resolved
self.push(stack::per_make::layer(layer))
}

pub fn push_spawn_ready(self) -> Layers<Pair<L, SpawnReadyLayer>> {
self.push(SpawnReadyLayer::new())
}
Expand Down
10 changes: 9 additions & 1 deletion linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use linkerd2_app_core::{
drain,
dst::DstAddr,
errors, http_request_authority_addr, http_request_host_addr,
http_request_l5d_override_dst_addr, http_request_orig_dst_addr,
http_request_l5d_override_dst_addr, http_request_orig_dst_addr, metric_labels,
opencensus::proto::trace::v1 as oc,
proxy::{
self,
Expand Down Expand Up @@ -131,6 +131,7 @@ impl<A: OrigDstAddr> Config<A> {
.push(trace::layer(
|endpoint: &Endpoint| info_span!("endpoint", peer.addr = %endpoint.addr),
))
.push_per_make(metrics.stack.layer(stack_labels("endpoint")))
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract)
.makes::<Endpoint>()
.push(router::Layer::new(
Expand All @@ -150,6 +151,7 @@ impl<A: OrigDstAddr> Config<A> {
.push(insert::target::layer())
.push(metrics.http_route.into_layer::<classify::Response>())
.push(classify::Layer::new())
.push_per_make(metrics.stack.layer(stack_labels("route")))
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract);

// A per-`DstAddr` stack that does the following:
Expand All @@ -163,6 +165,7 @@ impl<A: OrigDstAddr> Config<A> {
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract)
.push(profiles::router::layer(profiles_client, dst_route_layer))
.push(strip_header::request::layer(DST_OVERRIDE_HEADER))
.push_per_make(metrics.stack.layer(stack_labels("logical")))
.push(trace::layer(
|dst: &DstAddr| info_span!("logical", dst = %dst.dst_logical()),
));
Expand Down Expand Up @@ -249,6 +252,7 @@ impl<A: OrigDstAddr> Config<A> {
}))
.push_per_make(metrics.http_errors)
.push_per_make(errors::layer())
.push_per_make(metrics.stack.layer(stack_labels("source")))
.push(trace::layer(|src: &tls::accept::Meta| {
info_span!(
"source",
Expand Down Expand Up @@ -318,3 +322,7 @@ pub fn trace_labels() -> HashMap<String, String> {
l.insert("direction".to_string(), "inbound".to_string());
l
}

fn stack_labels(name: &'static str) -> metric_labels::StackLabels {
metric_labels::StackLabels::inbound(name)
}
11 changes: 10 additions & 1 deletion linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use linkerd2_app_core::{
dns, drain,
dst::DstAddr,
errors, http_request_authority_addr, http_request_host_addr,
http_request_l5d_override_dst_addr, http_request_orig_dst_addr,
http_request_l5d_override_dst_addr, http_request_orig_dst_addr, metric_labels,
opencensus::proto::trace::v1 as oc,
proxy::{
self, core::resolve::Resolve, discover, fallback, http, identity, resolve::map_endpoint,
Expand Down Expand Up @@ -191,6 +191,7 @@ impl<A: OrigDstAddr> Config<A> {
// If the `l5d-require-id` header is present, then that identity is
// used as the server name when connecting to the endpoint.
let orig_dst_router_layer = svc::layers()
.push_per_make(metrics.stack.layer(stack_labels("fallback.endpoint")))
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract)
.push(router::Layer::new(
router::Config::new(router_capacity, router_max_idle_age),
Expand All @@ -201,6 +202,7 @@ impl<A: OrigDstAddr> Config<A> {
// over all endpoints returned from the destination service.
const DISCOVER_UPDATE_BUFFER_CAPACITY: usize = 10;
let balancer_layer = svc::layers()
.push_per_make(metrics.stack.layer(stack_labels("balance.endpoint")))
.push_spawn_ready()
.push(discover::Layer::new(
DISCOVER_UPDATE_BUFFER_CAPACITY,
Expand Down Expand Up @@ -247,6 +249,7 @@ impl<A: OrigDstAddr> Config<A> {
.push(trace::layer(
|dst: &DstAddr| info_span!("logical", dst.logical = %dst.dst_logical()),
))
.push_per_make(metrics.stack.layer(stack_labels("logical.dst")))
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract)
.push(router::Layer::new(
router::Config::new(router_capacity, router_max_idle_age),
Expand Down Expand Up @@ -286,6 +289,7 @@ impl<A: OrigDstAddr> Config<A> {
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER))
.push(http::insert::target::layer())
.push(trace::layer(|addr: &Addr| info_span!("addr", %addr)))
.push_per_make(metrics.stack.layer(stack_labels("addr")))
.push_buffer_pending(buffer.max_in_flight, DispatchDeadline::extract)
.push(router::Layer::new(
router::Config::new(router_capacity, router_max_idle_age),
Expand Down Expand Up @@ -323,6 +327,7 @@ impl<A: OrigDstAddr> Config<A> {
.push(trace::layer(
|src: &tls::accept::Meta| info_span!("source", target.addr = %src.addrs.target_addr()),
))
.push_per_make(metrics.stack.layer(stack_labels("source")))
.push(trace_context::layer(span_sink.map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
Expand Down Expand Up @@ -382,3 +387,7 @@ pub fn trace_labels() -> HashMap<String, String> {
l.insert("direction".to_string(), "outbound".to_string());
l
}

fn stack_labels(name: &'static str) -> metric_labels::StackLabels {
metric_labels::StackLabels::outbound(name)
}
7 changes: 6 additions & 1 deletion linkerd/app/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use linkerd2_app_core::{
errors, handle_time, http_metrics as metrics,
metric_labels::{ControlLabels, EndpointLabels, RouteLabels},
metrics::FmtMetrics,
opencensus, proxy, telemetry, transport, ControlHttpMetrics, ProxyMetrics,
opencensus, proxy, stack_metrics, telemetry, transport, ControlHttpMetrics, ProxyMetrics,
};
use std::time::{Duration, SystemTime};

Expand Down Expand Up @@ -57,6 +57,8 @@ impl Metrics {
let inbound_handle_time = handle_time_report.inbound();
let outbound_handle_time = handle_time_report.outbound();

let stack = stack_metrics::Registry::default();

let (transport, transport_report) = transport::metrics::new();

let (opencensus, opencensus_report) = opencensus::metrics::new();
Expand All @@ -69,6 +71,7 @@ impl Metrics {
http_route_actual: http_route_actual.clone(),
http_route_retry: http_route_retry.clone(),
http_errors: http_errors.inbound(),
stack: stack.clone(),
transport: transport.clone(),
},
outbound: ProxyMetrics {
Expand All @@ -78,6 +81,7 @@ impl Metrics {
http_route_retry,
http_route_actual,
http_errors: http_errors.outbound(),
stack: stack.clone(),
transport,
},
control,
Expand All @@ -93,6 +97,7 @@ impl Metrics {
.and_then(handle_time_report)
.and_then(transport_report)
.and_then(opencensus_report)
.and_then(stack)
.and_then(process);

(metrics, report)
Expand Down
18 changes: 18 additions & 0 deletions linkerd/stack-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "linkerd2-stack-metrics"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
edition = "2018"
publish = false

[dependencies]
futures = "0.1"
indexmap = "1.0"
linkerd2-error = { path = "../error" }
linkerd2-metrics = { path = "../metrics" }
linkerd2-stack = { path = "../stack" }
tokio = "0.1"
tokio-timer = "0.2" # for tokio_timer::clock
tower = "0.1"
tower-util = "0.1"
tracing = "0.1.9"
20 changes: 20 additions & 0 deletions linkerd/stack-metrics/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::{Metrics, TrackService};
use std::sync::Arc;

#[derive(Clone, Debug)]
pub struct TrackServiceLayer(Arc<Metrics>);

impl TrackServiceLayer {
pub(crate) fn new(metrics: Arc<Metrics>) -> Self {
TrackServiceLayer(metrics)
}
}

impl<S> tower::layer::Layer<S> for TrackServiceLayer {
type Service = TrackService<S>;

fn layer(&self, inner: S) -> Self::Service {
self.0.create_total.incr();
TrackService::new(inner, self.0.clone())
}
}
Loading