Skip to content

Commit

Permalink
update endpoint spans
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jan 29, 2024
1 parent 24cac26 commit 3c2fdd1
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 49 deletions.
3 changes: 2 additions & 1 deletion proxy/src/auth/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ async fn auth_quirks(
Err(info) => {
let res = hacks::password_hack_no_authentication(info, client, &mut ctx.latency_timer)
.await?;
ctx.set_endpoint_id(Some(res.info.endpoint.clone()));
ctx.set_endpoint_id(res.info.endpoint.clone());
tracing::Span::current().record("ep", &tracing::field::display(&res.info.endpoint));
(res.info, Some(res.keys))
}
Ok(info) => (info, None),
Expand Down
46 changes: 20 additions & 26 deletions proxy/src/auth/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,28 +93,23 @@ impl ComputeUserInfoMaybeEndpoint {
// Project name might be passed via PG's command-line options.
let endpoint_option = params
.options_raw()
.and_then(|options| {
// We support both `project` (deprecated) and `endpoint` options for backward compatibility.
// However, if both are present, we don't exactly know which one to use.
// Therefore we require that only one of them is present.
options
.filter_map(parse_endpoint_param)
.at_most_one()
.ok()?
})
.map(|name| name.into());

let endpoint_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
endpoint_sni(sni_str, cn)?
} else {
None
}
} else {
None
};

let endpoint = match (endpoint_option, endpoint_from_domain) {
.into_iter()
.flatten()
.filter_map(parse_endpoint_param)
// We support both `project` (deprecated) and `endpoint` options for backward compatibility.
// However, if both are present, we don't exactly know which one to use.
// Therefore we require that only one of them is present.
.at_most_one()
.ok()
.flatten();

let endpoint_from_domain = sni
.zip(common_names)
.map(|(sni, cn)| endpoint_sni(sni, cn))
.transpose()?
.flatten();

let endpoint_id = match (endpoint_option, endpoint_from_domain) {
// Invariant: if we have both project name variants, they should match.
(Some(option), Some(domain)) if option != domain => {
Some(Err(InconsistentProjectNames { domain, option }))
Expand All @@ -126,15 +121,14 @@ impl ComputeUserInfoMaybeEndpoint {
}),
}
.transpose()?;
ctx.set_endpoint_id(endpoint.clone());

info!(%user, project = endpoint.as_deref(), "credentials");
info!(%user, project = endpoint_id.as_deref(), "credentials");
if sni.is_some() {
info!("Connection with sni");
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["sni"])
.inc();
} else if endpoint.is_some() {
} else if endpoint_id.is_some() {
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["no_sni"])
.inc();
Expand All @@ -150,7 +144,7 @@ impl ComputeUserInfoMaybeEndpoint {

Ok(Self {
user,
endpoint_id: endpoint.map(EndpointId::from),
endpoint_id,
options,
})
}
Expand Down
13 changes: 7 additions & 6 deletions proxy/src/auth/password_hack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl PasswordHackPayload {
if let Some((endpoint, password)) = bytes.split_once_str(sep) {
let endpoint = endpoint.to_str().ok()?;
return Some(Self {
endpoint: parse_endpoint_param(endpoint)?.into(),
endpoint: parse_endpoint_param(endpoint)?,
password: password.to_owned(),
});
}
Expand All @@ -30,10 +30,11 @@ impl PasswordHackPayload {
}
}

pub fn parse_endpoint_param(bytes: &str) -> Option<&str> {
pub fn parse_endpoint_param(bytes: &str) -> Option<EndpointId> {
bytes
.strip_prefix("project=")
.or_else(|| bytes.strip_prefix("endpoint="))
.map(EndpointId::from)
}

#[cfg(test)]
Expand All @@ -46,16 +47,16 @@ mod tests {
assert!(parse_endpoint_param(input).is_none());

let input = "project=";
assert_eq!(parse_endpoint_param(input), Some(""));
assert_eq!(parse_endpoint_param(input).as_deref(), Some(""));

let input = "project=foobar";
assert_eq!(parse_endpoint_param(input), Some("foobar"));
assert_eq!(parse_endpoint_param(input).as_deref(), Some("foobar"));

let input = "endpoint=";
assert_eq!(parse_endpoint_param(input), Some(""));
assert_eq!(parse_endpoint_param(input).as_deref(), Some(""));

let input = "endpoint=foobar";
assert_eq!(parse_endpoint_param(input), Some("foobar"));
assert_eq!(parse_endpoint_param(input).as_deref(), Some("foobar"));

let input = "other_option=foobar";
assert!(parse_endpoint_param(input).is_none());
Expand Down
12 changes: 5 additions & 7 deletions proxy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,11 @@ impl RequestMonitoring {
self.project = Some(x.project_id);
}

pub fn set_endpoint_id(&mut self, endpoint_id: Option<EndpointId>) {
self.endpoint_id = endpoint_id.or_else(|| self.endpoint_id.clone());
if let Some(ep) = &self.endpoint_id {
crate::metrics::CONNECTING_ENDPOINTS
.with_label_values(&[self.protocol])
.measure(&ep);
}
pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
crate::metrics::CONNECTING_ENDPOINTS
.with_label_values(&[self.protocol])
.measure(&endpoint_id);
self.endpoint_id = Some(endpoint_id);
}

pub fn set_application(&mut self, app: Option<SmolStr>) {
Expand Down
22 changes: 14 additions & 8 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub async fn task_main(
let cancel_map = Arc::clone(&cancel_map);
let endpoint_rate_limiter = endpoint_rate_limiter.clone();

let session_span = info_span!(
"handle_client",
?session_id,
peer_addr = tracing::field::Empty,
ep = tracing::field::Empty,
);

connections.spawn(
async move {
info!("accepted postgres client connection");
Expand Down Expand Up @@ -107,15 +114,11 @@ pub async fn task_main(
)
.await
}
.instrument(info_span!(
"handle_client",
?session_id,
peer_addr = tracing::field::Empty
))
.unwrap_or_else(move |e| {
// Acknowledge that the task has finished with an error.
error!(?session_id, "per-client task finished with an error: {e:#}");
}),
error!("per-client task finished with an error: {e:#}");
})
.instrument(session_span),
);
}

Expand Down Expand Up @@ -212,7 +215,10 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
Err(e) => stream.throw_error(e).await?,
};

ctx.set_endpoint_id(user_info.get_endpoint());
if let Some(ep) = user_info.get_endpoint() {
ctx.set_endpoint_id(ep.clone());
tracing::Span::current().record("ep", &tracing::field::display(ep));
}

// check rate limit
if let Some(ep) = user_info.get_endpoint() {
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/serverless/sql_over_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn get_conn_info(
}

let endpoint = endpoint_sni(hostname, &tls.common_names)?.context("malformed endpoint")?;
ctx.set_endpoint_id(Some(endpoint.clone()));
ctx.set_endpoint_id(endpoint.clone());

let pairs = connection_url.query_pairs();

Expand Down

0 comments on commit 3c2fdd1

Please sign in to comment.