Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitor API back #317

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ jobs:
- name: Test ractor_cluster with async_trait
package: ractor_cluster
flags: -F async-trait
- name: Test ractor with the monitor API
package: ractor
flags: -F monitors

steps:
- uses: actions/checkout@main
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Install `ractor` by adding the following to your Cargo.toml dependencies.

```toml
[dependencies]
ractor = "0.13"
ractor = "0.14"
```

The minimum supported Rust version (MSRV) of `ractor` is `1.64`. However to utilize the native `async fn` support in traits and not rely on the `async-trait` crate's desugaring functionliaty, you need to be on Rust version `>= 1.75`. The stabilization of `async fn` in traits [was recently added](https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html).
Expand All @@ -77,6 +77,8 @@ The minimum supported Rust version (MSRV) of `ractor` is `1.64`. However to util

1. `cluster`, which exposes various functionality required for `ractor_cluster` to set up and manage a cluster of actors over a network link. This is work-in-progress and is being tracked in [#16](https://github.com/slawlor/ractor/issues/16).
2. `async-std`, which enables usage of `async-std`'s asynchronous runtime instead of the `tokio` runtime. **However** `tokio` with the `sync` feature remains a dependency because we utilize the messaging synchronization primatives from `tokio` regardless of runtime as they are not specific to the `tokio` runtime. This work is tracked in [#173](https://github.com/slawlor/ractor/pull/173). You can remove default features to "minimize" the tokio dependencies to just the synchronization primatives.
3. `monitors`, Adds support for an erlang-style monitoring api which is an alternative to direct linkage. Akin to [Process Monitors](https://www.erlang.org/doc/system/ref_man_processes.html#monitors)
4. `message_span_propogation`, Propagates the span through the message between actors to keep tracing context.

## Working with Actors

Expand Down
3 changes: 2 additions & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -19,6 +19,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[features]
### Other features
cluster = []
monitors = []
message_span_propogation = []
tokio_runtime = ["tokio/time", "tokio/rt", "tokio/macros", "tokio/tracing"]
blanket_serde = ["serde", "pot", "cluster"]
Expand Down
19 changes: 19 additions & 0 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,25 @@ impl ActorCell {
self.inner.tree.clear_supervisor();
}

/// Monitor the provided [super::Actor] for supervision events. An actor in `ractor` can
/// only have a single supervisor, denoted by the `link` function, however they
/// may have multiple `monitors`. Monitor's receive copies of the [SupervisionEvent]s,
/// with non-cloneable information removed.
///
/// * `who`: The actor to monitor
#[cfg(feature = "monitors")]
pub fn monitor(&self, who: ActorCell) {
who.inner.tree.set_monitor(self.clone());
}

/// Stop monitoring the provided [super::Actor] for supervision events.
///
/// * `who`: The actor to stop monitoring
#[cfg(feature = "monitors")]
pub fn unmonitor(&self, who: ActorCell) {
who.inner.tree.remove_monitor(self.get_id());
}

/// Kill this [super::Actor] forcefully (terminates async work)
pub fn kill(&self) {
let _ = self.inner.send_signal(Signal::Kill);
Expand Down
21 changes: 21 additions & 0 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ impl SupervisionEvent {
pub fn actor_id(&self) -> Option<super::actor_id::ActorId> {
self.actor_cell().map(|cell| cell.get_id())
}

/// Clone the supervision event, without requiring inner data
/// be cloneable. This means that the actor error (if present) is converted
/// to a string and copied as well as the state upon termination being not
/// propogated. If the state were cloneable, we could propogate it, however
/// that restriction is overly restrictive, so we've avoided it.
#[cfg(feature = "monitors")]
pub(crate) fn clone_no_data(&self) -> Self {
match self {
Self::ActorStarted(who) => Self::ActorStarted(who.clone()),
Self::ActorFailed(who, what) => {
Self::ActorFailed(who.clone(), From::from(format!("{what}")))
}
Self::ProcessGroupChanged(what) => Self::ProcessGroupChanged(what.clone()),
Self::ActorTerminated(who, _state, msg) => {
Self::ActorTerminated(who.clone(), None, msg.as_ref().cloned())
}
#[cfg(feature = "cluster")]
Self::PidLifecycleEvent(evt) => Self::PidLifecycleEvent(evt.clone()),
}
}
}

impl Debug for SupervisionEvent {
Expand Down
79 changes: 66 additions & 13 deletions ractor/src/actor/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,39 @@
//! when a child actor starts, stops, or panics (when possible). The supervisor can then decide
//! how to handle the event. Should it restart the actor, leave it dead, potentially die itself
//! notifying the supervisor's supervisor? That's up to the implementation of the [super::Actor]
//!
//! This is currently an initial implementation of [Erlang supervisors](https://www.erlang.org/doc/man/supervisor.html)
//! which will be expanded upon as the library develops.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;

use super::{actor_cell::ActorCell, messages::SupervisionEvent};
use crate::ActorId;

/// A supervision tree
#[derive(Default, Debug)]
pub(crate) struct SupervisionTree {
children: Arc<Mutex<HashMap<ActorId, ActorCell>>>,
supervisor: Arc<Mutex<Option<ActorCell>>>,
children: Mutex<Option<HashMap<ActorId, ActorCell>>>,
supervisor: Mutex<Option<ActorCell>>,
#[cfg(feature = "monitors")]
monitors: Mutex<Option<HashMap<ActorId, ActorCell>>>,
}

impl SupervisionTree {
/// Push a child into the tere
pub(crate) fn insert_child(&self, child: ActorCell) {
self.children.lock().unwrap().insert(child.get_id(), child);
let mut guard = self.children.lock().unwrap();
if let Some(map) = &mut *(guard) {
map.insert(child.get_id(), child);
} else {
*guard = Some(HashMap::from_iter([(child.get_id(), child)]));
}
}

/// Remove a specific actor from the supervision tree (e.g. actor died)
pub(crate) fn remove_child(&self, child: ActorId) {
self.children.lock().unwrap().remove(&child);
let mut guard = self.children.lock().unwrap();
if let Some(map) = &mut *(guard) {
map.remove(&child);
}
}

/// Push a parent into the tere
Expand All @@ -53,13 +60,40 @@ impl SupervisionTree {
self.supervisor.lock().unwrap().clone()
}

/// Set a monitor of this supervision tree
#[cfg(feature = "monitors")]
pub(crate) fn set_monitor(&self, who: ActorCell) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
map.insert(who.get_id(), who);
} else {
*guard = Some(HashMap::from_iter([(who.get_id(), who)]))
}
}

/// Remove a specific monitor from the supervision tree
#[cfg(feature = "monitors")]
pub(crate) fn remove_monitor(&self, who: ActorId) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
map.remove(&who);
if map.is_empty() {
*guard = None;
}
}
}

/// Terminate all your supervised children and unlink them
/// from the supervision tree since the supervisor is shutting down
/// and can't deal with superivison events anyways
pub(crate) fn terminate_all_children(&self) {
let mut guard = self.children.lock().unwrap();
let cells = guard.values().cloned().collect::<Vec<_>>();
guard.clear();
let cells = if let Some(map) = &mut *guard {
map.values().cloned().collect()
} else {
vec![]
};
*guard = None;
// drop the guard to not deadlock on double-link
drop(guard);
for cell in cells {
Expand Down Expand Up @@ -141,20 +175,39 @@ impl SupervisionTree {
/// Return all linked children
pub(crate) fn get_children(&self) -> Vec<ActorCell> {
let guard = self.children.lock().unwrap();
guard.values().cloned().collect()
if let Some(map) = &*guard {
map.values().cloned().collect()
} else {
vec![]
}
}

/// Send a notification to the supervisor.
///
/// CAVEAT: Monitors get notified first, in order to save an unnecessary
/// clone if there are no monitors.
pub(crate) fn notify_supervisor(&self, evt: SupervisionEvent) {
#[cfg(feature = "monitors")]
if let Some(monitors) = &mut *(self.monitors.lock().unwrap()) {
// We notify the monitors on a best-effort basis, and if we fail to send the event, we remove
// the monitor
monitors.retain(|_, v| v.send_supervisor_evt(evt.clone_no_data()).is_ok());
}

if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
let _ = parent.send_supervisor_evt(evt);
_ = parent.send_supervisor_evt(evt);
}
}

/// Retrieve the number of supervised children
#[cfg(test)]
pub(crate) fn get_num_children(&self) -> usize {
self.children.lock().unwrap().len()
let guard = self.children.lock().unwrap();
if let Some(map) = &*guard {
map.len()
} else {
0
}
}

/// Retrieve the number of supervised children
Expand Down
107 changes: 107 additions & 0 deletions ractor/src/actor/tests/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,3 +1567,110 @@ async fn draining_children_will_shutdown_parent_too() {
// Child's post-stop should have been called.
assert_eq!(1, flag.load(Ordering::Relaxed));
}

#[crate::concurrency::test]
#[tracing_test::traced_test]
#[cfg(feature = "monitors")]
async fn test_simple_monitor() {
struct Peer;
struct Monitor {
counter: Arc<AtomicU8>,
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Peer {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
_: Self::Msg,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
myself.stop(Some("oh no!".to_string()));
Ok(())
}
}

#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl Actor for Monitor {
type Msg = ();
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
_: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle_supervisor_evt(
&self,
_: ActorRef<Self::Msg>,
evt: SupervisionEvent,
_: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let SupervisionEvent::ActorTerminated(_who, _state, Some(msg)) = evt {
if msg.as_str() == "oh no!" {
self.counter.fetch_add(1, Ordering::Relaxed);
}
}
Ok(())
}
}

let count = Arc::new(AtomicU8::new(0));

let (p, ph) = Actor::spawn(None, Peer, ())
.await
.expect("Failed to start peer");
let (m, mh) = Actor::spawn(
None,
Monitor {
counter: count.clone(),
},
(),
)
.await
.expect("Faield to start monitor");

m.monitor(p.get_cell());

// stopping the peer should notify the monitor, who can capture the state
p.cast(()).expect("Failed to contact peer");
periodic_check(
|| count.load(Ordering::Relaxed) == 1,
Duration::from_secs(1),
)
.await;
ph.await.unwrap();

let (p, ph) = Actor::spawn(None, Peer, ())
.await
.expect("Failed to start peer");
m.monitor(p.get_cell());
m.unmonitor(p.get_cell());

p.cast(()).expect("Failed to contact peer");
ph.await.unwrap();

// The count doesn't increment when the peer exits (we give some time
// to schedule the supervision evt)
crate::concurrency::sleep(Duration::from_millis(100)).await;
assert_eq!(1, count.load(Ordering::Relaxed));

m.stop(None);
mh.await.unwrap();
}
5 changes: 4 additions & 1 deletion ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! ```toml
//! [dependencies]
//! ractor = "0.13"
//! ractor = "0.14"
//! ```
//!
//! The minimum supported Rust version (MSRV) is 1.64. However if you disable the `async-trait` feature, then you need Rust >= 1.75 due to the native
Expand Down Expand Up @@ -129,6 +129,9 @@
//! to its supervisor yet. However failures in `post_start`, `handle`, `handle_supervisor_evt`, `post_stop` will notify the supervisor should a failure
//! occur. See [crate::Actor] documentation for more information
//!
//! There is additionally a "monitor" API which gives non-direct-supervision logic style monitoring akin to Erlang's [process monitors](https://www.erlang.org/doc/system/ref_man_processes.html#monitors).
//! This functionality is opt-in via feature `monitors` on the `ractor` crate.
//!
//! ## Messaging actors
//!
//! The means of communication between actors is that they pass messages to each other. A developer can define any message type which is `Send + 'static` and it
Expand Down
4 changes: 3 additions & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor <slawlor>"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand All @@ -15,6 +15,8 @@ build = "src/build.rs"
rust-version = "1.64"

[features]
monitors = ["ractor/monitors"]
message_span_propogation = ["ractor/message_span_propogation"]
async-trait = ["dep:async-trait", "ractor/async-trait"]

default = []
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor <slawlor>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down
Loading