Skip to content

Commit

Permalink
Introduce Executors for Bridges
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Dec 31, 2023
1 parent 0527f7f commit 3f2bef6
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 99 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fabric/rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tokio = { version = "1", features = ["full"] }
paste = "1.0"
windows-core = "0.51"
async-trait = "0.1"
ctrlc = { version = "3.0", features = ["termination"] }

[dependencies.windows]
version = "0.51"
Expand Down
55 changes: 46 additions & 9 deletions crates/fabric/rs/src/runtime/executor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use tokio::runtime::Handle;
use std::future::Future;

// TODO/WIP: The intension here is to be able to plug in the backend runner for
// SF bridges to execute async functions.
// Executor as this cannot be dyn passed due to generic args cannot form vtable.
// To enable this feature the bridges needs to be generic.
use log::info;
use tokio::{runtime::Handle, sync::mpsc::channel};

// Executor is used by rs to post jobs to execute in the background
pub trait Executor {
fn spawn(&self, future: impl FnOnce() + std::marker::Send + 'static);
pub trait Executor: Clone {
// spawns the task to run in background
fn spawn<F>(&self, future: F)
where
F: Future + Send + 'static;

// run the executor until the ctrl-c os signal
fn run_until_ctrl_c(&self);
}

#[derive(Clone)]
Expand All @@ -21,9 +25,42 @@ impl DefaultExecutor {
}
}

// TODO: rt obj needs to be hold somewhere outside of handle
// impl Default for DefaultExecutor {
// fn default() -> Self {
// let rt = tokio::runtime::Runtime::new().unwrap();
// Self {
// rt: rt.handle().clone(),
// }
// }
// }

impl Executor for DefaultExecutor {
fn spawn(&self, future: impl FnOnce() + std::marker::Send + 'static) {
self.rt.spawn(async move { future() });
fn spawn<F>(&self, future: F)
where
F: Future + Send + 'static,
{
self.rt.spawn(async move {
future.await;
});
}

fn run_until_ctrl_c(&self) {
info!("DefaultExecutor: setting up ctrl-c event.");
// set ctrc event
let (tx, mut rx) = channel(1);
let handler = move || {
tx.blocking_send(())
.expect("Could not send signal on channel.")
};
ctrlc::set_handler(handler).expect("Error setting Ctrl-C handler");

// wait for ctrl-c signal.
self.rt.block_on(async move {
info!("DefaultExecutor: Waiting for Ctrl-C...");
rx.recv().await.expect("Could not receive from channel.");
info!("DefaultExecutor: Got Ctrl-C! Exiting...");
});
}
}

Expand Down
16 changes: 11 additions & 5 deletions crates/fabric/rs/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use fabric_base::{
FABRIC_ENDPOINT_RESOURCE_DESCRIPTION,
};
use std::cell::Cell;
use tokio::runtime::Handle;
use windows::core::implement;
use windows_core::{ComInterface, Error, Interface, HSTRING, PCWSTR};

use self::{
executor::Executor,
stateful::{StatefulServiceFactory, StatefulServiceReplica},
stateful_bridge::StatefulServiceFactoryBridge,
stateless::{StatelessServiceFactory, StatelessServiceInstance},
Expand Down Expand Up @@ -52,13 +52,19 @@ pub fn get_com_activation_context() -> ::windows_core::Result<IFabricCodePackage
}

// safe wrapping for runtime
pub struct Runtime {
pub struct Runtime<E>
where
E: Executor,
{
com_impl: IFabricRuntime,
rt: Handle,
rt: E,
}

impl Runtime {
pub fn create(rt: Handle) -> ::windows_core::Result<Runtime> {
impl<E> Runtime<E>
where
E: Executor,
{
pub fn create(rt: E) -> ::windows_core::Result<Runtime<E>> {
let com = create_com_runtime()?;
Ok(Runtime { com_impl: com, rt })
}
Expand Down
86 changes: 55 additions & 31 deletions crates/fabric/rs/src/runtime/stateful_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::{marker::PhantomData, sync::Arc};

use log::info;
use tokio::runtime::Handle;
use windows::core::implement;
use windows_core::{AsImpl, ComInterface, Error, HSTRING};

Expand All @@ -30,37 +29,41 @@ use crate::{
StringResult,
};

use super::stateful::{
PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServiceReplica,
use super::{
executor::Executor,
stateful::{PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServiceReplica},
};

#[implement(IFabricStatefulServiceFactory)]
pub struct StatefulServiceFactoryBridge<F, R>
pub struct StatefulServiceFactoryBridge<E, F, R>
where
E: Executor,
F: StatefulServiceFactory<R>,
R: StatefulServiceReplica + 'static,
{
inner: F,
rt: Handle,
rt: E,
phantom: PhantomData<R>,
}

impl<F, R> StatefulServiceFactoryBridge<F, R>
impl<E, F, R> StatefulServiceFactoryBridge<E, F, R>
where
E: Executor,
F: StatefulServiceFactory<R>,
R: StatefulServiceReplica,
{
pub fn create(factory: F, rt: Handle) -> StatefulServiceFactoryBridge<F, R> {
StatefulServiceFactoryBridge::<F, R> {
pub fn create(factory: F, rt: E) -> StatefulServiceFactoryBridge<E, F, R> {
StatefulServiceFactoryBridge::<E, F, R> {
inner: factory,
rt,
phantom: PhantomData,
}
}
}

impl<F, R> IFabricStatefulServiceFactory_Impl for StatefulServiceFactoryBridge<F, R>
impl<E, F, R> IFabricStatefulServiceFactory_Impl for StatefulServiceFactoryBridge<E, F, R>
where
E: Executor,
F: StatefulServiceFactory<R>,
R: StatefulServiceReplica + 'static,
{
Expand Down Expand Up @@ -100,13 +103,19 @@ where
// bridge from safe service instance to com
#[implement(IFabricReplicator)]

pub struct IFabricReplicatorBridge {
pub struct IFabricReplicatorBridge<E>
where
E: Executor,
{
inner: Arc<Box<dyn Replicator>>,
rt: Handle,
rt: E,
}

impl IFabricReplicatorBridge {
pub fn create(rplctr: Box<dyn Replicator>, rt: Handle) -> IFabricReplicatorBridge {
impl<E> IFabricReplicatorBridge<E>
where
E: Executor,
{
pub fn create(rplctr: Box<dyn Replicator>, rt: E) -> IFabricReplicatorBridge<E> {
IFabricReplicatorBridge {
inner: Arc::new(rplctr),
rt,
Expand All @@ -115,16 +124,19 @@ impl IFabricReplicatorBridge {

fn create_from_primary_replicator(
replicator: Arc<Box<dyn Replicator>>,
rt: Handle,
) -> IFabricReplicatorBridge {
rt: E,
) -> IFabricReplicatorBridge<E> {
IFabricReplicatorBridge {
inner: replicator,
rt,
}
}
}

impl IFabricReplicator_Impl for IFabricReplicatorBridge {
impl<E> IFabricReplicator_Impl for IFabricReplicatorBridge<E>
where
E: Executor,
{
fn BeginOpen(
&self,
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
Expand Down Expand Up @@ -292,17 +304,20 @@ impl IFabricReplicator_Impl for IFabricReplicatorBridge {

// primary replicator bridge
#[implement(IFabricPrimaryReplicator)]
pub struct IFabricPrimaryReplicatorBridge {
pub struct IFabricPrimaryReplicatorBridge<E>
where
E: Executor,
{
inner: Arc<Box<dyn PrimaryReplicator>>,
rt: Handle,
rplctr: IFabricReplicatorBridge,
rt: E,
rplctr: IFabricReplicatorBridge<E>,
}

impl IFabricPrimaryReplicatorBridge {
pub fn create(
rplctr: Box<dyn PrimaryReplicator>,
rt: Handle,
) -> IFabricPrimaryReplicatorBridge {
impl<E> IFabricPrimaryReplicatorBridge<E>
where
E: Executor,
{
pub fn create(rplctr: Box<dyn PrimaryReplicator>, rt: E) -> IFabricPrimaryReplicatorBridge<E> {
let inner = Arc::new(rplctr);

// hack to construct a replicator bridge.
Expand All @@ -326,7 +341,10 @@ impl IFabricPrimaryReplicatorBridge {
}

// TODO: this impl has duplicate code with replicator bridge
impl IFabricReplicator_Impl for IFabricPrimaryReplicatorBridge {
impl<E> IFabricReplicator_Impl for IFabricPrimaryReplicatorBridge<E>
where
E: Executor,
{
fn BeginOpen(
&self,
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
Expand Down Expand Up @@ -399,7 +417,10 @@ impl IFabricReplicator_Impl for IFabricPrimaryReplicatorBridge {
}
}

impl IFabricPrimaryReplicator_Impl for IFabricPrimaryReplicatorBridge {
impl<E> IFabricPrimaryReplicator_Impl for IFabricPrimaryReplicatorBridge<E>
where
E: Executor,
{
fn BeginOnDataLoss(
&self,
callback: ::core::option::Option<&super::IFabricAsyncOperationCallback>,
Expand Down Expand Up @@ -538,28 +559,31 @@ impl IFabricPrimaryReplicator_Impl for IFabricPrimaryReplicatorBridge {
// bridge from safe service instance to com
#[implement(IFabricStatefulServiceReplica)]

pub struct IFabricStatefulServiceReplicaBridge<R>
pub struct IFabricStatefulServiceReplicaBridge<E, R>
where
E: Executor,
R: StatefulServiceReplica + 'static,
{
inner: Arc<R>,
rt: Handle,
rt: E,
}

impl<R> IFabricStatefulServiceReplicaBridge<R>
impl<E, R> IFabricStatefulServiceReplicaBridge<E, R>
where
E: Executor,
R: StatefulServiceReplica,
{
pub fn create(rplctr: R, rt: Handle) -> IFabricStatefulServiceReplicaBridge<R> {
pub fn create(rplctr: R, rt: E) -> IFabricStatefulServiceReplicaBridge<E, R> {
IFabricStatefulServiceReplicaBridge {
inner: Arc::new(rplctr),
rt,
}
}
}

impl<R> IFabricStatefulServiceReplica_Impl for IFabricStatefulServiceReplicaBridge<R>
impl<E, R> IFabricStatefulServiceReplica_Impl for IFabricStatefulServiceReplicaBridge<E, R>
where
E: Executor,
R: StatefulServiceReplica + 'static,
{
fn BeginOpen(
Expand Down
Loading

0 comments on commit 3f2bef6

Please sign in to comment.