Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inbound: refactor newtypes for policy addresses #2264

Merged
merged 4 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ impl Config {
let (listen_addr, listen) = bind.bind(&self.server)?;

// Get the policy for the admin server.
let policy = policy.get_policy(OrigDstAddr(listen_addr.into())).await?;
let policy = policy
.get_policy(inbound::policy::LookupAddr(listen_addr.into()))
.await?;

let (ready, latch) = crate::server::Readiness::new();
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
Expand Down
12 changes: 10 additions & 2 deletions linkerd/app/gateway/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use linkerd_app_core::{
svc::{NewService, ServiceExt},
tls,
trace::test::trace_init,
transport::ServerAddr,
Error, NameAddr,
};
use linkerd_app_inbound::GatewayLoop;
Expand Down Expand Up @@ -52,7 +53,7 @@ async fn upgraded_request_remains_relative_form() {

impl svc::Param<OrigDstAddr> for Target {
fn param(&self) -> OrigDstAddr {
OrigDstAddr(([10, 10, 10, 10], 4143).into())
OrigDstAddr(Self::dst_addr())
}
}

Expand Down Expand Up @@ -132,14 +133,21 @@ async fn upgraded_request_remains_relative_form() {
}]))]),
},
};
let (policy, tx) = inbound::policy::AllowPolicy::for_test(self.param(), policy);
let (policy, tx) =
inbound::policy::AllowPolicy::for_test(ServerAddr(Self::dst_addr()), policy);
tokio::spawn(async move {
tx.closed().await;
});
policy
}
}

impl Target {
fn dst_addr() -> std::net::SocketAddr {
([10, 10, 10, 10], 4143).into()
}
}

let (inner, mut handle) =
mock::pair::<http::Request<http::BoxBody>, http::Response<http::BoxBody>>();
handle.allow(1);
Expand Down
7 changes: 6 additions & 1 deletion linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ impl<N> Inbound<N> {
}
})
.lift_new_with_target()
.push(policy::Discover::layer(policies))
.push(policy::Discover::layer_via(policies, |t: &T| {
// For non-direct inbound connections, policies are always
// looked up for the original destination address.
let OrigDstAddr(addr) = t.param();
policy::LookupAddr(addr)
}))
.into_new_service()
.check_new_service::<T, I>()
.push_switch(
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/detect/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn authzs() -> Arc<[Authorization]> {

fn allow(protocol: Protocol) -> AllowPolicy {
let (allow, _tx) = AllowPolicy::for_test(
orig_dst_addr(),
ServerAddr(orig_dst_addr().into()),
ServerPolicy {
protocol,
meta: Arc::new(Meta::Resource {
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/inbound/src/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ impl<N> Inbound<N> {
|(header, client): &(TransportHeader, ClientInfo)| {
if header.name.is_some() {
// When the transport header provides an alternate target, the
// connection is a gateway connection. . We use the `OrigDstAddr`--the
// connection is a gateway connection. We use the `OrigDstAddr`--the
// inbound proxy server's address--to lookup policies.
return client.local_addr;
return policy::LookupAddr(client.local_addr.into());
}

// Otherwise, use the port override from the transport header.
OrigDstAddr((client.local_addr.ip(), header.port).into())
policy::LookupAddr((client.local_addr.ip(), header.port).into())
},
))
.into_new_service()
Expand Down
8 changes: 6 additions & 2 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,15 @@ impl Target {
fn addr() -> SocketAddr {
([127, 0, 0, 1], 80).into()
}

fn dst_addr() -> SocketAddr {
([192, 0, 2, 2], 80).into()
}
}

impl svc::Param<OrigDstAddr> for Target {
fn param(&self) -> OrigDstAddr {
OrigDstAddr(([192, 0, 2, 2], 80).into())
OrigDstAddr(Self::dst_addr())
}
}

Expand Down Expand Up @@ -622,7 +626,7 @@ impl svc::Param<policy::AllowPolicy> for Target {
}),
}]);
let (policy, _) = policy::AllowPolicy::for_test(
self.param(),
ServerAddr(Self::dst_addr()),
policy::ServerPolicy {
protocol: policy::Protocol::Http1(Arc::new([
linkerd_proxy_server_policy::http::default(authorizations),
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/inbound/src/metrics/authz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use linkerd_app_core::{
ServerLabel, TargetAddr, TlsAccept,
},
tls,
transport::OrigDstAddr,
transport::ServerAddr,
};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -79,7 +79,7 @@ impl HttpAuthzMetrics {
pub fn route_not_found(
&self,
labels: ServerLabel,
dst: OrigDstAddr,
dst: ServerAddr,
tls: tls::ConditionalServerTls,
) {
self.0
Expand All @@ -90,7 +90,7 @@ impl HttpAuthzMetrics {
.incr();
}

pub fn deny(&self, labels: RouteLabels, dst: OrigDstAddr, tls: tls::ConditionalServerTls) {
pub fn deny(&self, labels: RouteLabels, dst: ServerAddr, tls: tls::ConditionalServerTls) {
self.0
.deny
.lock()
Expand Down Expand Up @@ -205,7 +205,7 @@ impl FmtMetrics for TcpAuthzMetrics {
// === impl Key ===

impl<L> Key<L> {
fn new(labels: L, dst: OrigDstAddr, tls: tls::ConditionalServerTls) -> Self {
fn new(labels: L, dst: ServerAddr, tls: tls::ConditionalServerTls) -> Self {
Self {
tls,
target: TargetAddr(dst.into()),
Expand Down
29 changes: 16 additions & 13 deletions linkerd/app/inbound/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use linkerd_app_core::metrics::ServerLabel;
use linkerd_app_core::{
metrics::{RouteAuthzLabels, ServerAuthzLabels},
tls,
transport::{ClientAddr, OrigDstAddr, Remote},
transport::{ClientAddr, Remote, ServerAddr},
Error,
};
use linkerd_idle_cache::Cached;
Expand All @@ -33,7 +33,7 @@ pub use linkerd_proxy_server_policy::{
http::{filter::Redirection, Route as HttpRoute},
route, Authentication, Authorization, Meta, Protocol, RoutePolicy, ServerPolicy,
};
use std::{future::Future, sync::Arc};
use std::{future::Future, net::SocketAddr, sync::Arc};
use thiserror::Error;
use tokio::sync::watch;

Expand All @@ -47,27 +47,30 @@ pub struct ServerUnauthorized {
pub trait GetPolicy: Clone + Send + Sync + 'static {
type Future: Future<Output = Result<AllowPolicy, Error>> + Unpin + Send;

fn get_policy(&self, target: OrigDstAddr) -> Self::Future;
fn get_policy(&self, addr: LookupAddr) -> Self::Future;
}

#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
pub struct LookupAddr(pub SocketAddr);

#[derive(Clone, Debug)]
pub struct AllowPolicy {
dst: OrigDstAddr,
dst: ServerAddr,
server: Cached<watch::Receiver<ServerPolicy>>,
}

// Describes an authorized non-HTTP connection.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ServerPermit {
pub dst: OrigDstAddr,
pub dst: ServerAddr,
pub protocol: Protocol,
pub labels: ServerAuthzLabels,
}

// Describes an authorized HTTP request.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct HttpRoutePermit {
pub dst: OrigDstAddr,
pub dst: ServerAddr,
pub labels: RouteAuthzLabels,
}

Expand All @@ -80,25 +83,25 @@ pub enum Routes {

impl<S> GetPolicy for S
where
S: tower::Service<OrigDstAddr, Response = AllowPolicy, Error = Error>,
S: tower::Service<LookupAddr, Response = AllowPolicy, Error = Error>,
S: Clone + Send + Sync + Unpin + 'static,
S::Future: Send + Unpin,
{
type Future = tower::util::Oneshot<S, OrigDstAddr>;
type Future = tower::util::Oneshot<S, LookupAddr>;

#[inline]
fn get_policy(&self, target: OrigDstAddr) -> Self::Future {
fn get_policy(&self, addr: LookupAddr) -> Self::Future {
use tower::util::ServiceExt;

self.clone().oneshot(target)
self.clone().oneshot(addr)
}
}

// === impl AllowPolicy ===

impl AllowPolicy {
#[cfg(any(test, fuzzing, feature = "test-util"))]
pub fn for_test(dst: OrigDstAddr, server: ServerPolicy) -> (Self, watch::Sender<ServerPolicy>) {
pub fn for_test(dst: ServerAddr, server: ServerPolicy) -> (Self, watch::Sender<ServerPolicy>) {
let (tx, server) = watch::channel(server);
let server = Cached::uncached(server);
let p = Self { dst, server };
Expand All @@ -116,7 +119,7 @@ impl AllowPolicy {
}

#[inline]
pub fn dst_addr(&self) -> OrigDstAddr {
pub fn dst_addr(&self) -> ServerAddr {
self.dst
}

Expand Down Expand Up @@ -186,7 +189,7 @@ fn is_authorized(
// === impl Permit ===

impl ServerPermit {
fn new(dst: OrigDstAddr, server: &ServerPolicy, authz: &Authorization) -> Self {
fn new(dst: ServerAddr, server: &ServerPolicy, authz: &Authorization) -> Self {
Self {
dst,
protocol: server.protocol.clone(),
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/inbound/src/policy/discover.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{AllowPolicy, GetPolicy};
use super::{AllowPolicy, GetPolicy, LookupAddr};
use futures::ready;
use linkerd_app_core::{svc, transport::OrigDstAddr, Error};
use linkerd_app_core::{svc, Error};
use std::{
future::Future,
pin::Pin,
Expand Down Expand Up @@ -50,7 +50,7 @@ where
impl<X, G, N, NSvc, T> svc::Service<T> for Discover<X, G, N>
where
G: GetPolicy,
X: svc::ExtractParam<OrigDstAddr, T>,
X: svc::ExtractParam<LookupAddr, T>,
N: svc::NewService<T, Service = NSvc> + Clone,
NSvc: svc::NewService<AllowPolicy>,
{
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/policy/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use linkerd_app_core::{
metrics::{RouteAuthzLabels, RouteLabels},
svc::{self, ServiceExt},
tls,
transport::{ClientAddr, OrigDstAddr, Remote},
transport::{ClientAddr, Remote, ServerAddr},
Error, Result,
};
use linkerd_proxy_server_policy::{grpc, http, route::RouteMatch};
Expand Down Expand Up @@ -41,7 +41,7 @@ pub struct HttpPolicyService<T, N> {

#[derive(Clone, Debug)]
struct ConnectionMeta {
dst: OrigDstAddr,
dst: ServerAddr,
client: Remote<ClientAddr>,
tls: tls::ConditionalServerTls,
}
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/policy/http/tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::*;
use crate::policy::{Authentication, Authorization, Meta, Protocol, ServerPolicy};
use linkerd_app_core::{svc::Service, Infallible};
use linkerd_app_core::{svc::Service, transport::ServerAddr, Infallible};
use std::sync::Arc;

macro_rules! conn {
($client:expr, $dst:expr) => {{
ConnectionMeta {
dst: OrigDstAddr(($dst, 8080).into()),
dst: ServerAddr(($dst, 8080).into()),
client: Remote(ClientAddr(($client, 30120).into())),
tls: tls::ConditionalServerTls::Some(tls::ServerTls::Established {
client_id: Some("foo.bar.bah".parse().unwrap()),
Expand Down
13 changes: 7 additions & 6 deletions linkerd/app/inbound/src/policy/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::AllowPolicy;
use super::{AllowPolicy, LookupAddr};
use futures::future;
use linkerd_app_core::{svc, transport::OrigDstAddr, Error};
use linkerd_app_core::{svc, transport::ServerAddr, Error};
use linkerd_idle_cache::IdleCache;
pub use linkerd_proxy_server_policy::{
authz::Suffix, Authentication, Authorization, Protocol, ServerPolicy,
Expand Down Expand Up @@ -96,7 +96,7 @@ where
}
}

impl<D> svc::Service<OrigDstAddr> for Store<D>
impl<D> svc::Service<LookupAddr> for Store<D>
where
D: svc::Service<
u16,
Expand All @@ -117,11 +117,12 @@ where
task::Poll::Ready(Ok(()))
}

fn call(&mut self, dst: OrigDstAddr) -> Self::Future {
// Lookup the polcify for the target port in the cache. If it doesn't
fn call(&mut self, LookupAddr(addr): LookupAddr) -> Self::Future {
// Lookup the policy for the target port in the cache. If it doesn't
// already exist, we spawn a watch on the API (if it is configured). If
// no discovery API is configured we use the default policy.
let port = dst.port();
let port = addr.port();
let dst = ServerAddr(addr);
if let Some(server) = self.cache.get(&port) {
return future::Either::Left(future::ready(Ok(AllowPolicy { dst, server })));
}
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/policy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
use futures::future;
use linkerd_app_core::{
svc, tls,
transport::{ClientAddr, OrigDstAddr, Remote},
transport::{ClientAddr, Remote, ServerAddr},
Error, Result,
};
use linkerd_proxy_server_policy::{Protocol, ServerPolicy};
Expand Down Expand Up @@ -182,7 +182,7 @@ where
/// accept connections given the provided TLS state.
fn check_authorized(
server: &ServerPolicy,
dst: OrigDstAddr,
dst: ServerAddr,
client_addr: Remote<ClientAddr>,
tls: &tls::ConditionalServerTls,
) -> Result<ServerPermit, ServerUnauthorized> {
Expand Down
Loading