Skip to content

Commit

Permalink
Gate monitor API behind a feature, update crate version, add CI cover…
Browse files Browse the repository at this point in the history
…age on monitors, and update API and README docs
  • Loading branch information
slawlor committed Jan 8, 2025
1 parent 860d82d commit 59b4af4
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 60 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ jobs:
- name: Test ractor_cluster with async_trait
package: ractor_cluster
flags: -F async-trait
- name: Test ractor with the monitor API
package: ractor
flags: -F monitors

steps:
- uses: actions/checkout@main
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Install `ractor` by adding the following to your Cargo.toml dependencies.

```toml
[dependencies]
ractor = "0.13"
ractor = "0.14"
```

The minimum supported Rust version (MSRV) of `ractor` is `1.64`. However to utilize the native `async fn` support in traits and not rely on the `async-trait` crate's desugaring functionliaty, you need to be on Rust version `>= 1.75`. The stabilization of `async fn` in traits [was recently added](https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html).
Expand All @@ -77,6 +77,8 @@ The minimum supported Rust version (MSRV) of `ractor` is `1.64`. However to util

1. `cluster`, which exposes various functionality required for `ractor_cluster` to set up and manage a cluster of actors over a network link. This is work-in-progress and is being tracked in [#16](https://github.com/slawlor/ractor/issues/16).
2. `async-std`, which enables usage of `async-std`'s asynchronous runtime instead of the `tokio` runtime. **However** `tokio` with the `sync` feature remains a dependency because we utilize the messaging synchronization primatives from `tokio` regardless of runtime as they are not specific to the `tokio` runtime. This work is tracked in [#173](https://github.com/slawlor/ractor/pull/173). You can remove default features to "minimize" the tokio dependencies to just the synchronization primatives.
3. `monitors`, Adds support for an erlang-style monitoring api which is an alternative to direct linkage. Akin to [Process Monitors](https://www.erlang.org/doc/system/ref_man_processes.html#monitors)
4. `message_span_propogation`, Propagates the span through the message between actors to keep tracing context.

## Working with Actors

Expand Down
3 changes: 2 additions & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -19,6 +19,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[features]
### Other features
cluster = []
monitors = []
message_span_propogation = []
tokio_runtime = ["tokio/time", "tokio/rt", "tokio/macros", "tokio/tracing"]
blanket_serde = ["serde", "pot", "cluster"]
Expand Down
11 changes: 2 additions & 9 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,26 +394,19 @@ impl ActorCell {
/// with non-cloneable information removed.
///
/// * `who`: The actor to monitor
#[cfg(feature = "monitors")]
pub fn monitor(&self, who: ActorCell) {
who.inner.tree.set_monitor(self.clone());
self.inner.tree.mark_monitored(who);
}

/// Stop monitoring the provided [super::Actor] for supervision events.
///
/// * `who`: The actor to stop monitoring
#[cfg(feature = "monitors")]
pub fn unmonitor(&self, who: ActorCell) {
self.inner.tree.unmark_monitored(who.get_id());
who.inner.tree.remove_monitor(self.get_id());
}

/// Clear all the [self::Actor]s which are monitored by this [self::Actor]
pub fn clear_monitors(&self) {
for id in self.inner.tree.monitored_actors() {
self.unmonitor(id);
}
}

/// Kill this [super::Actor] forcefully (terminates async work)
pub fn kill(&self) {
let _ = self.inner.send_signal(Signal::Kill);
Expand Down
1 change: 1 addition & 0 deletions ractor/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl SupervisionEvent {
/// to a string and copied as well as the state upon termination being not
/// propogated. If the state were cloneable, we could propogate it, however
/// that restriction is overly restrictive, so we've avoided it.
#[cfg(feature = "monitors")]
pub(crate) fn clone_no_data(&self) -> Self {
match self {
Self::ActorStarted(who) => Self::ActorStarted(who.clone()),
Expand Down
3 changes: 0 additions & 3 deletions ractor/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,6 @@ where
// notify supervisors of the actor's death
myself.notify_supervisor_and_monitors(evt);

// clear any monitor actors
myself.clear_monitors();

// unlink superisors
if let Some(sup) = supervisor {
myself.unlink(sup);
Expand Down
51 changes: 8 additions & 43 deletions ractor/src/actor/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
//! when a child actor starts, stops, or panics (when possible). The supervisor can then decide
//! how to handle the event. Should it restart the actor, leave it dead, potentially die itself
//! notifying the supervisor's supervisor? That's up to the implementation of the [super::Actor]
//!
//! This is currently an initial implementation of [Erlang supervisors](https://www.erlang.org/doc/man/supervisor.html)
//! which will be expanded upon as the library develops.
use std::collections::HashMap;
use std::sync::Mutex;
Expand All @@ -25,8 +22,8 @@ use crate::ActorId;
pub(crate) struct SupervisionTree {
children: Mutex<Option<HashMap<ActorId, ActorCell>>>,
supervisor: Mutex<Option<ActorCell>>,
#[cfg(feature = "monitors")]
monitors: Mutex<Option<HashMap<ActorId, ActorCell>>>,
monitored: Mutex<Option<HashMap<ActorId, ActorCell>>>,
}

impl SupervisionTree {
Expand Down Expand Up @@ -64,6 +61,7 @@ impl SupervisionTree {
}

/// Set a monitor of this supervision tree
#[cfg(feature = "monitors")]
pub(crate) fn set_monitor(&self, who: ActorCell) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
Expand All @@ -73,28 +71,8 @@ impl SupervisionTree {
}
}

/// Mark that this actor is monitoring some other actors
pub(crate) fn mark_monitored(&self, who: ActorCell) {
let mut guard = self.monitored.lock().unwrap();
if let Some(map) = &mut *guard {
map.insert(who.get_id(), who);
} else {
*guard = Some(HashMap::from_iter([(who.get_id(), who)]))
}
}

/// Mark that this actor is no longer monitoring some other actors
pub(crate) fn unmark_monitored(&self, who: ActorId) {
let mut guard = self.monitored.lock().unwrap();
if let Some(map) = &mut *guard {
map.remove(&who);
if map.is_empty() {
*guard = None;
}
}
}

/// Remove a specific monitor from the supervision tree
#[cfg(feature = "monitors")]
pub(crate) fn remove_monitor(&self, who: ActorId) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
Expand All @@ -105,20 +83,6 @@ impl SupervisionTree {
}
}

/// Get the [ActorCell]s of the monitored actors this actor monitors
pub(crate) fn monitored_actors(&self) -> Vec<ActorCell> {
let guard = self.monitored.lock().unwrap();
let set = {
if let Some(monitors) = &*(guard) {
monitors.values().cloned().collect()
} else {
vec![]
}
};
drop(guard);
set
}

/// Terminate all your supervised children and unlink them
/// from the supervision tree since the supervisor is shutting down
/// and can't deal with superivison events anyways
Expand Down Expand Up @@ -223,10 +187,11 @@ impl SupervisionTree {
/// CAVEAT: Monitors get notified first, in order to save an unnecessary
/// clone if there are no monitors.
pub(crate) fn notify_supervisor(&self, evt: SupervisionEvent) {
if let Some(monitors) = &*(self.monitors.lock().unwrap()) {
for monitor in monitors.values() {
_ = monitor.send_supervisor_evt(evt.clone_no_data());
}
#[cfg(feature = "monitors")]
if let Some(monitors) = &mut *(self.monitors.lock().unwrap()) {
// We notify the monitors on a best-effort basis, and if we fail to send the event, we remove
// the monitor
monitors.retain(|_, v| v.send_supervisor_evt(evt.clone_no_data()).is_ok());
}

if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
Expand Down
1 change: 1 addition & 0 deletions ractor/src/actor/tests/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,7 @@ async fn draining_children_will_shutdown_parent_too() {

#[crate::concurrency::test]
#[tracing_test::traced_test]
#[cfg(feature = "monitors")]
async fn test_simple_monitor() {
struct Peer;
struct Monitor {
Expand Down
5 changes: 4 additions & 1 deletion ractor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! ```toml
//! [dependencies]
//! ractor = "0.13"
//! ractor = "0.14"
//! ```
//!
//! The minimum supported Rust version (MSRV) is 1.64. However if you disable the `async-trait` feature, then you need Rust >= 1.75 due to the native
Expand Down Expand Up @@ -129,6 +129,9 @@
//! to its supervisor yet. However failures in `post_start`, `handle`, `handle_supervisor_evt`, `post_stop` will notify the supervisor should a failure
//! occur. See [crate::Actor] documentation for more information
//!
//! There is additionally a "monitor" API which gives non-direct-supervision logic style monitoring akin to Erlang's [process monitors](https://www.erlang.org/doc/system/ref_man_processes.html#monitors).
//! This functionality is opt-in via feature `monitors` on the `ractor` crate.
//!
//! ## Messaging actors
//!
//! The means of communication between actors is that they pass messages to each other. A developer can define any message type which is `Send + 'static` and it
Expand Down
4 changes: 3 additions & 1 deletion ractor_cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor <slawlor>"]
description = "Distributed cluster environment of Ractor actors"
documentation = "https://docs.rs/ractor"
Expand All @@ -15,6 +15,8 @@ build = "src/build.rs"
rust-version = "1.64"

[features]
monitors = ["ractor/monitors"]
message_span_propogation = ["ractor/message_span_propogation"]
async-trait = ["dep:async-trait", "ractor/async-trait"]

default = []
Expand Down
2 changes: 1 addition & 1 deletion ractor_cluster_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_cluster_derive"
version = "0.14.3"
version = "0.14.4"
authors = ["Sean Lawlor <slawlor>"]
description = "Derives for ractor_cluster"
license = "MIT"
Expand Down

0 comments on commit 59b4af4

Please sign in to comment.