Skip to content

Commit

Permalink
Support timeout after waiting RPC response for a maximum time. apache#47
Browse files Browse the repository at this point in the history
  • Loading branch information
qunwei committed Dec 20, 2022
1 parent 1e38ddf commit c29fedd
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
71 changes: 71 additions & 0 deletions dubbo/src/filter/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::time::{SystemTime, UNIX_EPOCH};

use serde_json::Value;

use crate::{
codegen::Request,
context::{Context, RpcContext},
status::Status,
};

use super::Filter;

#[derive(Clone)]
pub struct ContextFilter {}

impl Filter for ContextFilter {
fn call(&mut self, req: Request<()>) -> Result<Request<()>, Status> {
let headers = &mut req.metadata.into_headers();

let timeout = headers.get("timeout-countdown");

let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();

let mut dead_line_in_nanos = 0_u128;

if let Some(t) = timeout {
let timeout: u128 = t.to_str().unwrap().parse().unwrap();
if timeout > 0_u128 {
dead_line_in_nanos = time + timeout * 1000000;
}
} else {
// TODO default timeout
let timeout: u128 = 1000_u128 * 1000000;
dead_line_in_nanos = time + timeout;
}

tracing::debug!(
"ContextFilter tri-timeout-deadline-in-nanos : {}",
dead_line_in_nanos
);
if let Some(at) = RpcContext::get_attachments() {
let mut attachments = at.lock().unwrap();
attachments.insert(
String::from("tri-timeout-deadline-in-nanos"),
Value::from(dead_line_in_nanos.to_string()),
);
}

Ok(req)
}
}
2 changes: 2 additions & 0 deletions dubbo/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
* limitations under the License.
*/

pub mod context;
pub mod service;
pub mod timeout;

use crate::invocation::Request;

Expand Down
64 changes: 64 additions & 0 deletions dubbo/src/filter/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::time::{SystemTime, UNIX_EPOCH};

use crate::{
codegen::Request,
context::{Context, RpcContext},
status::{Code, Status},
};

use super::Filter;

#[derive(Clone)]
pub struct TimeoutFilter {}

/// timeout count
/// 1. ContextFilter 初始化 timeout 时间,初始化后将 tri-timeout-deadline-in-nanos 放入 context 中
/// 2. TimeoutFilter read context tri-timeout-deadline-in-nanos
/// 3. 响应时计算 tri-timeout-deadline-in-nanos - current_nanos <= 0
///
impl Filter for TimeoutFilter {
fn call(&mut self, req: Request<()>) -> Result<Request<()>, Status> {
if let Some(attachments) = RpcContext::get_attachments() {
let current_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();

let attachments = attachments.lock().unwrap();
let tri_timeout_deadline_in_nanos =
attachments.get("tri-timeout-deadline-in-nanos").unwrap();
let tri_timeout_deadline_in_nanos: u128 = tri_timeout_deadline_in_nanos
.as_str()
.unwrap()
.parse()
.unwrap();

tracing::debug!(
"TimeoutFilter tri-timeout-deadline-in-nanos : {}, current-nanos:{}",
tri_timeout_deadline_in_nanos,
current_nanos
);
if tri_timeout_deadline_in_nanos - current_nanos <= 0 {
return Err(Status::new(Code::DeadlineExceeded, String::from("Timeout")));
}
}

Ok(req)
}
}
7 changes: 6 additions & 1 deletion examples/echo/src/echo/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::io::ErrorKind;
use std::pin::Pin;

use async_trait::async_trait;
use dubbo::filter::context::ContextFilter;
use dubbo::filter::timeout::TimeoutFilter;
use futures_util::Stream;
use futures_util::StreamExt;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -52,12 +54,15 @@ async fn main() {
});
let server = EchoServerImpl::default();
let s = EchoServer::<EchoServerImpl>::with_filter(server, FakeFilter {});
let timeout_filter = FilterService::new(s, TimeoutFilter {});
let context_filter = FilterService::new(timeout_filter, ContextFilter {});

dubbo::protocol::triple::TRIPLE_SERVICES
.write()
.unwrap()
.insert(
"grpc.examples.echo.Echo".to_string(),
dubbo::utils::boxed_clone::BoxCloneService::new(s),
dubbo::utils::boxed_clone::BoxCloneService::new(context_filter),
);

// Dubbo::new().start().await;
Expand Down

0 comments on commit c29fedd

Please sign in to comment.