Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
daedalus2022 authored Dec 9, 2022
2 parents dc77d7b + eef25df commit 2825262
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 58 deletions.
3 changes: 3 additions & 0 deletions dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ async-stream = "0.3"
flate2 = "1.0"

dubbo-config = {path = "../config", version = "0.2.0"}

#对象存储
state = { version = "0.5", features = ["tls"] }
99 changes: 41 additions & 58 deletions dubbo/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,14 @@
* limitations under the License.
*/

use core::cell::RefCell;
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;

///
/// ```rust
/// use std::collections::HashMap;
/// use std::sync::Arc;
///
/// let mut map = HashMap::<String, SafetyValue>::new();
/// map.insert("key1".into(), Arc::new("data-1"));
///
/// // get a typed value from SafetyValue
/// let value = map
/// .get("key1")
/// .and_then(|f| f.downcast_ref::<String>())
/// .unwrap();
///
/// assert_eq!(value, "data-1");
/// ```
type SafetyValue = Arc<dyn Any + Sync + Send>;
use serde_json::Value;
use state::Container;

thread_local! {
static SERVICE_CONTEXT: RefCell<RpcContext> = RefCell::new(RpcContext::default());
}
pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send + Sync]>::new();

///
/// All environment information of during the current call will put into the context
Expand All @@ -53,37 +34,38 @@ thread_local! {
/// After B call C,the RpcContext record the information of B call C
///
#[derive(Clone, Default)]
pub struct RpcContext {
pub attachments: HashMap<String, SafetyValue>,
// TODO
}

impl RpcContext {
pub fn current() -> Self {
get_current(|ctx| ctx.clone())
}
pub struct RpcContext {}

pub fn clear(&mut self) {
self.attachments.clear();
}
pub trait Context {
fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>>;
}

fn get_current<F: FnMut(&RpcContext) -> T, T>(mut f: F) -> T {
SERVICE_CONTEXT.try_with(|ctx| f(&ctx.borrow())).unwrap()
}
impl Context for RpcContext {
fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>> {
let local = APPLICATION_CONTEXT.try_get_local::<Arc<Mutex<HashMap<String, Value>>>>();

tracing::debug!("{:?} - {:?}", thread::current().id(), local);

impl fmt::Debug for RpcContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context")
.field("attachments", &self.attachments)
.finish()
match local {
Some(attachment) => Some(attachment.clone()),
None => {
let attachment = HashMap::<String, Value>::new();
let mutex = Arc::new(Mutex::new(attachment));
let mutex_clone = Arc::clone(&mutex);
APPLICATION_CONTEXT.set_local(move || {
return Arc::clone(&mutex_clone);
});
Some(Arc::clone(&mutex))
}
}
}
}

#[cfg(test)]
mod tests {
use tokio::time;

use super::*;
use std::thread::sleep;
use std::time::Duration;

#[test]
Expand All @@ -96,25 +78,26 @@ mod tests {

let mut handles = Vec::with_capacity(10);

for i in 0..10 {
for i in 0..=10 {
handles.push(rt.spawn(async move {
let mut attachments = RpcContext::current().attachments;
attachments.insert("key1".into(), Arc::new(format!("data-{i}")));

if i == 10 {
attachments.insert("key2".into(), Arc::new(2));
assert_eq!(attachments.len(), 2);
} else {
assert_eq!(attachments.len(), 1);
}
if let Some(attachments) = RpcContext::get_attachments() {
let mut attachments = attachments.lock().unwrap();
attachments.insert("key1".into(), Value::from(format!("data-{i}")));

assert!(attachments.len() > 0);
};

time::sleep(Duration::from_millis(1000)).await;

if let Some(attachments) = RpcContext::get_attachments() {
let attachments = attachments.lock().unwrap();
assert!(attachments.len() > 0);
};
}));
}

sleep(Duration::from_millis(500));

for handle in handles {
rt.block_on(handle).unwrap();
}
assert_eq!(RpcContext::current().attachments.len(), 0);
}
}

0 comments on commit 2825262

Please sign in to comment.