Skip to content

Commit

Permalink
Client refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Feb 1, 2025
1 parent fad4db3 commit e8e5f92
Show file tree
Hide file tree
Showing 14 changed files with 456 additions and 593 deletions.
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ path = "../core-api"
[dev-dependencies]
assert_matches = "1"
mockall = "0.13"
rstest = "0.24"

[lints]
workspace = true
179 changes: 87 additions & 92 deletions client/src/lib.rs

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions client/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{AttachMetricLabels, LONG_POLL_METHOD_NAMES};
use crate::{AttachMetricLabels, CallType};
use futures_util::{future::BoxFuture, FutureExt};
use std::{
sync::Arc,
Expand Down Expand Up @@ -195,16 +195,18 @@ impl Service<http::Request<BoxBody>> for GrpcMetricSvc {
if let Some(other_labels) = req.extensions_mut().remove::<AttachMetricLabels>() {
m.with_new_attrs(other_labels.labels)
}
if let Some(ct) = req.extensions().get::<CallType>() {
if ct.is_long() {
m.set_is_long_poll();
}
}
m
})
.and_then(|mut metrics| {
// Attach method name label if possible
req.uri().to_string().rsplit_once('/').map(|split_tup| {
let method_name = split_tup.1;
metrics.with_new_attrs([svc_operation(method_name.to_string())]);
if LONG_POLL_METHOD_NAMES.contains(&method_name) {
metrics.set_is_long_poll();
}
metrics.svc_request();
metrics
})
Expand Down
17 changes: 8 additions & 9 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,18 @@ where
&mut self,
call_name: &'static str,
mut callfn: F,
req: Request<Req>,
mut req: Request<Req>,
) -> Result<Response<Resp>, Status>
where
Req: Clone + Unpin + Send + Sync + 'static,
F: FnMut(&mut Self, Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
F: Send + Sync + Unpin + 'static,
{
let info = self.get_call_info(call_name, Some(&req));
req.extensions_mut().insert(info.call_type);
if info.call_type.is_long() {
req.set_default_timeout(LONG_POLL_TIMEOUT);
}
let fact = || {
let req_clone = req_cloner(&req);
callfn(self, req_clone)
Expand Down Expand Up @@ -612,9 +616,6 @@ proxier! {
if r.get_ref().wait_new_event {
r.extensions_mut().insert(IsUserLongPoll);
}
if r.get_ref().wait_new_event {
r.set_default_timeout(LONG_POLL_TIMEOUT);
}
}
);
(
Expand All @@ -634,7 +635,6 @@ proxier! {
let mut labels = namespaced_request!(r);
labels.task_q(r.get_ref().task_queue.clone());
r.extensions_mut().insert(labels);
r.set_default_timeout(LONG_POLL_TIMEOUT);
}
);
(
Expand Down Expand Up @@ -663,7 +663,6 @@ proxier! {
let mut labels = namespaced_request!(r);
labels.task_q(r.get_ref().task_queue.clone());
r.extensions_mut().insert(labels);
r.set_default_timeout(LONG_POLL_TIMEOUT);
}
);
(
Expand Down Expand Up @@ -1017,8 +1016,9 @@ proxier! {
UpdateWorkflowExecutionResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
r.set_default_timeout(LONG_POLL_TIMEOUT);
let exts = r.extensions_mut();
exts.insert(labels);
exts.insert(IsUserLongPoll);
}
);
(
Expand All @@ -1028,7 +1028,6 @@ proxier! {
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
r.set_default_timeout(LONG_POLL_TIMEOUT);
}
);
(
Expand Down
Loading

0 comments on commit e8e5f92

Please sign in to comment.